fix: self-heal manifest-unreferenced branch forks (stop wedged branches) (#231)

* chore: correct stale global-lock comments

The global Arc<RwLock<Omnigraph>> that once serialized every server write was
removed — the server holds the engine as a lockless Arc<Omnigraph> and write
methods are &self, so the per-(table_key, branch) write queues are now the
actual write-serialization mechanism (in-process only).

Correct comments that still claimed the global lock is 'still in place' /
'today', or framed the queues as MR-686 scaffolding: write_queue.rs module doc,
exec/merge.rs, db/omnigraph/schema_apply.rs, db/manifest/recovery.rs, and the
bench_concurrent_http.rs example (which also wrongly stated mutate_as is
&mut self). workload.rs is left as-is — its 'previous global RwLock' wording is
accurate history.

* test: regression for self-healing a manifest-unreferenced fork

An interrupted first-write fork (create_branch succeeded, the manifest publish
did not) leaves a fully-formed Lance branch ref the manifest never references.
The branch stays a valid manifest branch, so cleanup's reconciler never
reclaims it, and today the next write to that table wedges with 'incomplete
prior delete; run cleanup'.

Forge that exact residue (a live 'feature' branch + a directly-created
'feature' ref on the Person table the manifest doesn't reference) and assert
the next load AND mutate self-heal. Deterministic and local — no S3 or timing,
since the forge IS the post-crash state. Adds a shared node_table_uri helper.

This commit is RED: it reproduces the bug and fails against the unfixed engine
with the predicted symptom. The fix follows in the next commit.

* fix: self-heal manifest-unreferenced branch forks

The first write to a table on a branch lazily forks it via Lance create_branch,
a durable two-phase op that advances Lance state BEFORE the atomic manifest
publish. If the writer dies or its request future is cancelled between the fork
and the publish, the branch ref is fully formed but the manifest never
references it. The next write re-enters the fork path, create_branch collides,
and the engine wedged with 'orphaned table state ... incomplete prior delete;
run cleanup' — which cleanup could not even fix, because the branch is still a
live manifest branch. This hit load, mutate, ingest, and the merge fork path
(one shared engine chokepoint), so a routine deploy restart or client
disconnect could wedge a branch.

Fix: treat the per-table fork ref as derived state of the manifest. fork_branch_
from_state returns a typed ForkOutcome instead of a human 'incomplete prior
delete' error; on RefAlreadyExists the db layer reclaims the manifest-
unreferenced fork (force_delete_branch + re-fork, exactly once) and proceeds.
A live committed fork is still routed to a retryable conflict before the fork
path, so concurrent first-writes stay correct.

Reclaim is only safe if no in-process writer can be mid-fork, so the write
entry points (load, mutate) acquire the per-(table, branch) write queues for
all touched tables up front — before the fork, held through the publish — when
forking a non-main branch. commit_all accepts these pre-held guards instead of
re-acquiring (the queue is non-re-entrant). The merge fork path already holds
the queue and self-heals through the shared wrapper. Cross-process in-flight
forks remain the documented one-winner-CAS gap.

Mechanical prep folded in: mutation IR lowering is hoisted so the touched-table
set is known before execution; commit_all gains the held_guards parameter.

Flips recreate_over_orphaned_fork_before_cleanup_is_actionable to assert
self-heal; fork_collision_with_live_concurrent_fork_is_retryable still holds.
Docs: writes.md cancelled-future note, invariants.md cross-process known gap.

* fix(cleanup): reconcile per-table manifest-unreferenced forks

reconcile_orphaned_branches keyed orphans on the branch NAME (absent from the
manifest), so it only reclaimed forks from a fully-deleted branch. A fork left
on a still-live branch by an interrupted first-write was never reclaimed — the
backstop the handoff expected cleanup to provide did not cover that case.

Broaden it to a per-table authority test: a Lance branch B on table T is an
orphan iff B is not a live manifest branch (delete-leftover) OR the manifest's
branch-B snapshot does not place T on B (interrupted first-write). Per-branch
snapshots are resolved once and cached across tables. Legitimately-forked
tables, main, and internal/system branches are never reclaimed; children are
dropped before parents to avoid Lance's referenced-parent RefConflict. The
commit-graph half stays whole-branch (per-table doesn't apply there).

This is the guaranteed-convergence backstop to the write-path self-heal: it
reclaims any fork the write path never revisits, and is what Lance's own
create_branch docstring asks embedders to provide for zombie/orphan refs.

* fix: reclaim self-validates against fresh manifest authority

The fork reclaim force-deletes a Lance branch ref, gated on the caller's proof
that the manifest does not place the table on the branch. But the first-write
path obtains that proof via snapshot_for_branch, which returns the coordinator's
CACHED snapshot when the handle is bound to the branch (an embedded handle on
the branch, or branch_merge's target swap). If that snapshot is stale and a
concurrent writer already published a legitimate fork, the reclaim would
force-delete it and re-fork from source, stranding the manifest at a version the
recreated ref no longer has.

Make the destructive primitive own its safety precondition: re-derive it from a
FRESH manifest read (fresh_snapshot_for_branch, which bypasses the cache)
immediately before force-deleting. If fresh authority shows the table is on the
branch, refuse with a retryable conflict instead of destroying a valid fork.
Correct for any caller regardless of snapshot staleness. Also stop branching on
Lance's exact RefConflict prose (loosened match; typed-variant is the durable
follow-up). Addresses PR review (Codex P1, Greptile P2).

* fix: cover delete-cascade edges in up-front fork-queue acquisition

A node delete cascades to every edge table touching that node (execute_delete_
node), forking those edge tables during execution. But touched_table_keys
derived the up-front fork-queue set from the IR ops alone (just node:Type), so a
branch delete that forks node + cascade edges held only the node queue —
commit_all then saw cascade-edge keys it had no guard for.

The touched set is a pure function of (IR ops + catalog), so compute the
COMPLETE set: op types plus, for delete-node ops, the cascade edges derived the
same way the executor derives them (from_type/to_type match). Pre-computed now
equals actual by construction.

Also promote commit_all's held-guard coverage check out of debug_assert into an
all-builds check that fails the write with a typed manifest_internal error: a
load-bearing serialization invariant must fail loudly+safely in release, not
silently proceed unguarded if a future execution path ever touches a table
outside the pre-computed set.

Adds branch_cascade_delete_forks_node_and_edges_under_held_queues, which drives
the cascade path on a branch (the gap the existing insert/load tests missed).
Addresses PR review (Cursor medium, Greptile P2).

* fix(cleanup): serialize fork reclaim against in-process live writers

The broadened per-table reconciler force_delete'd an orphan candidate on a LIVE
branch without holding the per-(table, branch) write queue. An in-process
first-write fork in its fork->publish window holds that queue and has not yet
advanced the manifest, so it looks exactly like an origin-2 orphan — concurrent
cleanup could delete the ref the writer still holds and is about to publish.
(The old branch-name-based reconciler did not have this race: a deleted branch
cannot have a live first-write.)

Bring the reconciler under the same invariant the write-path reclaim already
obeys: never force_delete a fork ref without holding the (table, branch) write
queue AND confirming, under it, from a fresh read, that the ref is still
manifest-unreferenced. Acquire one key at a time (no lock-order inversion vs
multi-table acquire_many writers); if the writer published meanwhile, the fresh
re-check sees the table on the branch and skips. Cross-process writers remain
the documented one-winner-CAS gap. Addresses PR review (Cursor high).

* fix: classify create_branch failure by ref existence, not by failure

fork_branch_from_state mapped ANY create_branch failure to RefAlreadyExists,
routing transient I/O / version / Lance-internal errors into the destructive
reclaim path and masking the real error as a retryable conflict.

Branch on the actual fact instead: on create_branch failure, check whether the
ref exists (list_branches). Only a genuinely pre-existing ref — a fully-formed
manifest-unreferenced fork — is a reclaim candidate; any other failure
propagates with fidelity. We deliberately do NOT force-delete on a not-found-ref
failure: it is indistinguishable from a transient error on a fresh create, and
force-deleting there is the overreach the fresh-authority guard already removed.
A phase-1-only Lance zombie (rarer; create_branch interrupted mid its two
internal phases) surfaces as the propagated error for manual reclaim.
Addresses PR review (Cursor medium).

* fix(cleanup): skip (not delete) on a transient re-check error for a live branch

The reconcile pre-delete re-check treated ANY fresh_snapshot error as 'still an
orphan' and proceeded to force_delete. A transient manifest read failure on a
LIVE branch could therefore destroy a fork the manifest still considers
legitimate — inconsistent with the write-path reclaim (aborts on the same error)
and the candidate scan (skips on snapshot failure).

Distinguish the two origins under the queue: a branch absent from the manifest
authority (origin 1) is a confirmed orphan and is deleted without a fresh read
(no live writer can hold a deleted branch's queue); a LIVE branch (origin 2)
gets the fresh re-check and, on a transient read error, is SKIPPED — never
destroyed on ambiguity — converging on a later cleanup. Same don't-destroy-on-
ambiguous-error principle as the create_branch failure classification.
Addresses PR review (Cursor medium).

* fix(cleanup): unify fork-ref reclaim on fresh authority under the queue

Consolidates the reconcile/reclaim hardening from PR review (the earlier per-site
commits were collapsed when reconciling with the main merge). Both destructive
fork-ref sites — the write-path reclaim and the cleanup reconciler — now share
one classifier, classify_fork_ref -> ForkRefStatus { Legitimate, Orphan,
Indeterminate }, evaluated from FRESH manifest authority under the held
(table, branch) write queue. A fork ref is destroyed ONLY on a confirmed Orphan;
a Legitimate (concurrent writer published a real fork) or Indeterminate
(transient read) status is never destroyed — the write path maps it to a
retryable conflict, cleanup maps it to skip. This closes, by construction:

- reclaim trusting a possibly-cached caller proof (Codex P1);
- reconcile racing an in-process live fork without the queue (Cursor);
- delete-on-transient-error in the re-check (Cursor/Greptile);
- origin-1 trusting a stale live_branches capture for a created-since branch
  (Cursor/Greptile P1).

Having one classifier removes the duplication that let the two sites drift.
ForkOutcome is made pub to match the sealed trait method returning it. Verified
green on Lance 7.0.0 (full engine suite + 48/48 failpoints).

* test(cleanup): pin classify_fork_ref decision (Legitimate / Orphan / ghost)

Both fork-ref reclaim sites (write-path reclaim + cleanup reconciler) route
their destroy/skip decision through classify_fork_ref, but it had no direct
test — reverting the fresh-authority logic was not test-detectable. Add a
deterministic in-source unit test that forges each state and asserts the status:
a manifest-placed fork -> Legitimate (never destroyed); a ref the manifest does
not place on the branch -> Orphan; a ref for a branch absent from the manifest
-> Orphan (ghost reclaim preserved). This makes the core fresh-authority
decision behind every reclaim fix revert-detectable in one place.

(The Indeterminate arm — transient read on a live branch -> skip — needs an
injected read failure and is left to the failpoints suite; the cross-process
cleanup-vs-writer and cached-snapshot reclaim races are the documented
one-winner-CAS gap, not reachable same-process bugs, so they are not faked here.)

* test(cleanup): pin the Indeterminate (transient re-check) reclaim arm

Closes the last untested classify_fork_ref arm. Adds a 'classify.fresh_read'
failpoint (no-op without the failpoints feature) that simulates a transient
failure of the fresh-authority read, and a failpoints test driving it through
cleanup: a genuine origin-2 orphan on a LIVE branch whose fresh re-check fails
classifies as Indeterminate, so the reconciler SKIPS it (never destroys on an
inconclusive read) and reclaims it on the next run once the read succeeds.

This makes the don't-destroy-on-ambiguity rule revert-detectable end-to-end.
The only paths now left untested are the cross-process cleanup-vs-writer and
reclaim-vs-publish races — the documented one-winner-CAS gap (cleanup is
&mut self / CLI-only, so no reachable same-process race), not faked here.

* test(server): avoid stale schema apply route handle

* fix(cleanup): report indeterminate fork authority clearly
This commit is contained in:
Ragnor Comerford 2026-06-15 22:17:25 +02:00 committed by GitHub
parent d2340f19e9
commit 6a2dfa7325
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 1167 additions and 162 deletions

View file

@ -7,7 +7,7 @@
**Lakehouse native graph engine built for context assembly**
Omnigraph acts as operational state & coordination layer for agents.
Omnigraph acts as operational state & coordination layer for agents.
Hundreds of agents can enrich the graph on parallel isolated branches and changes can be reviewed and merged safely.
- Git-style versioning & branching

View file

@ -644,7 +644,6 @@ impl From<CliLoadMode> for LoadMode {
}
}
}
impl CliLoadMode {
pub(crate) fn as_str(self) -> &'static str {
match self {
@ -654,4 +653,3 @@ impl CliLoadMode {
}
}
}

View file

@ -561,4 +561,3 @@ fn graphs_list_against_local_uri_errors_with_remote_only_message() {
"expected a remote-server rejection in stderr; got:\n{stderr}"
);
}

View file

@ -1,14 +1,15 @@
//! Server-level concurrent HTTP benchmark for MR-686 (PR 0 baseline).
//!
//! Drives concurrent `/change` requests against an in-process Omnigraph HTTP
//! server. Measures the global `Arc<RwLock<Omnigraph>>` lock penalty on
//! current `main` so PR 1 + PR 2 can be evaluated against a real baseline.
//! server. Originally written to measure the global `Arc<RwLock<Omnigraph>>`
//! lock penalty as an MR-686 baseline; that lock has since been removed
//! (engine write APIs are `&self`, the server holds a lockless
//! `Arc<Omnigraph>`), so this now measures the concurrent write path itself
//! (per-`(table, branch)` queue contention + Lance I/O).
//!
//! Per the MR-686 plan: this is the load-bearing bench. `Omnigraph::mutate_as`
//! is `&mut self`, so an engine-level concurrent bench either serializes on the
//! borrow checker (measures nothing) or drives multiple handles (measures Lance
//! contention, not the server bottleneck). Driving the HTTP server is the only
//! way to measure the actual `RwLock<Omnigraph>` contention this work removes.
//! Driving the HTTP server is still the right level: an engine-level bench on
//! a single handle measures Lance contention, not the server's request-path
//! concurrency.
//!
//! Usage:
//! ```sh

View file

@ -778,29 +778,27 @@ async fn schema_apply_route_additive_property_preserves_existing_rows() {
// AddProperty wasn't pinned with a row-count check anywhere.
// Load N rows, apply schema adding nullable property, verify
// every row is still readable and the new column is null.
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
&fs::read_to_string(fixture("test.pg")).unwrap(),
let (temp, app) = app_for_loaded_graph_with_auth_tokens_and_policy(
&[("act-ragnor", "admin-token")],
SCHEMA_APPLY_POLICY_YAML,
)
.await;
let graph = graph_path(temp.path());
// Standard fixture data: 4 Persons + 1 Company. Load it.
// Standard fixture data is loaded before the app is built, so the server
// handle applies schema from the same manifest it is serving.
let pre_count = {
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
db.load(
"main",
&fs::read_to_string(fixture("test.jsonl")).unwrap(),
LoadMode::Append,
)
.await
.unwrap();
let snap = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
snap.entry("node:Person").expect("Person").row_count
snap.open("node:Person")
.await
.expect("Person")
.count_rows(None)
.await
.unwrap()
};
assert!(pre_count > 0, "fixture should have loaded Person rows");
@ -830,7 +828,13 @@ async fn schema_apply_route_additive_property_preserves_existing_rows() {
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
let post_count = snap.entry("node:Person").expect("Person").row_count;
let post_count = snap
.open("node:Person")
.await
.expect("Person")
.count_rows(None)
.await
.unwrap();
assert_eq!(
post_count, pre_count,
"AddProperty should preserve row count",

View file

@ -34,10 +34,10 @@ pub(crate) use namespace::open_table_head_for_write;
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar,
heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar, recover_manifest_drift,
schema_apply_serial_queue_key, write_sidecar,
RecoveryMode, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration,
SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, heal_pending_sidecars_roll_forward,
list_sidecars, new_sidecar, recover_manifest_drift, schema_apply_serial_queue_key,
write_sidecar,
};
pub use state::SubTableEntry;
#[cfg(test)]

View file

@ -793,10 +793,10 @@ pub(crate) fn schema_apply_serial_queue_key() -> crate::db::write_queue::TableQu
/// same table append extra Lance restore commits which `omnigraph
/// cleanup` reclaims.
///
/// Concurrency: today recovery runs synchronously in `Omnigraph::open`
/// *before* the engine is wrapped in the server's `Arc<RwLock<Omnigraph>>`.
/// No request handlers can race, so this sweep does NOT acquire write
/// queues. In-process callers (refresh, write entry points) must use
/// Concurrency: the open-time sweep runs synchronously in `Omnigraph::open`
/// before the engine handle is published to any caller, so no request
/// handler can race it and it does NOT acquire write queues. In-process
/// callers (refresh, write entry points) must use
/// [`heal_pending_sidecars_roll_forward`] instead, which serializes
/// against live writers via per-(table_key, branch) queue acquisition.
pub(crate) async fn recover_manifest_drift(

View file

@ -114,10 +114,11 @@ pub struct Omnigraph {
/// Read-heavy on schema introspection paths, written only by
/// `apply_schema`. Same ArcSwap rationale as `catalog`.
schema_source: Arc<ArcSwap<String>>,
/// Per-`(table_key, branch)` writer queues. Reachable from engine
/// internals (mutation finalize, schema_apply, branch_merge,
/// ensure_indices, delete_where) and from future MR-870 recovery
/// reconciler. PR 1b adds the field; callers acquire in commits 4+.
/// Per-`(table_key, branch)` writer queues — the engine's
/// write-serialization mechanism (the server holds the engine as a
/// lockless `Arc<Omnigraph>`). Reachable from engine internals
/// (mutation finalize, schema_apply, branch_merge, ensure_indices,
/// delete_where, the fork path, recovery reconciler).
write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
/// Process-wide mutex held across the swap → operate → restore window
/// in `branch_merge_impl`. Two concurrent merges with distinct targets
@ -1484,6 +1485,13 @@ impl Omnigraph {
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
}
/// Fork `table_key` onto `active_branch` from the given source state,
/// self-healing a manifest-unreferenced leftover fork if one is in the
/// way. Callers that reach this MUST already hold the per-`(table_key,
/// active_branch)` write queue (so the reclaim cannot race an in-process
/// fork) and must have confirmed via the live manifest that the table is
/// not yet on `active_branch`. Both the first-write fork path
/// (`open_owned_dataset_for_branch_write`) and `branch_merge` satisfy this.
pub(crate) async fn fork_dataset_from_entry_state(
&self,
table_key: &str,
@ -1492,7 +1500,7 @@ impl Omnigraph {
source_version: u64,
active_branch: &str,
) -> Result<SnapshotHandle> {
table_ops::fork_dataset_from_entry_state(
match table_ops::fork_dataset_from_entry_state(
self,
table_key,
full_path,
@ -1500,7 +1508,21 @@ impl Omnigraph {
source_version,
active_branch,
)
.await
.await?
{
crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
crate::storage_layer::ForkOutcome::RefAlreadyExists => {
table_ops::reclaim_orphaned_fork_and_refork(
self,
table_key,
full_path,
source_branch,
source_version,
active_branch,
)
.await
}
}
}
pub(crate) async fn reopen_for_mutation(

View file

@ -268,9 +268,7 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result<Vec<TableOptimizeStat
// the original row addresses on rewrite). The CSR/CSC graph topology index
// is rebuilt only when an edge table moved. Mirrors schema_apply's
// post-publish invalidation.
let any_committed = stats
.iter()
.any(|s| matches!(s, Ok(st) if st.committed));
let any_committed = stats.iter().any(|s| matches!(s, Ok(st) if st.committed));
let edge_committed = stats
.iter()
.any(|s| matches!(s, Ok(st) if st.committed && st.table_key.starts_with("edge:")));
@ -642,27 +640,37 @@ pub struct BranchReconcileStats {
pub failures: Vec<(String, String)>,
}
/// Drop every per-table and commit-graph Lance branch that the manifest no
/// longer references.
/// Drop every per-table and commit-graph Lance branch fork the manifest does
/// not reference.
///
/// Orphaned forks arise when a `branch_delete` flips the manifest authority
/// (atomic) but a downstream best-effort reclaim does not complete. They are
/// unreachable through any snapshot — no manifest entry can name them — yet
/// they pin their `tree/{branch}/` storage and can block reusing the branch
/// name. This is the guaranteed convergence backstop: it is idempotent and
/// derived purely from the manifest authority, so it no-ops once everything is
/// reconciled, and it would harmlessly find nothing if a future Lance atomic
/// multi-dataset branch op prevented orphans from forming.
/// Two origins produce a manifest-unreferenced fork:
/// 1. A `branch_delete` flips the manifest authority (atomic) but a
/// downstream best-effort reclaim does not complete — the whole branch is
/// gone from the manifest, but a `tree/{branch}/` ref lingers.
/// 2. A first-write fork (or a merge fork) creates the branch ref before the
/// manifest publish, then the writer dies / is cancelled — the branch is
/// still a live manifest branch, but the manifest's snapshot of it does
/// not place *this table* on the branch.
///
/// The keep-set is the full (unfiltered) manifest branch list, so system
/// branches' forks are never reclaimed; `main`/default is not a named Lance
/// branch and so is never a candidate. Referencing children are dropped before
/// parents (Lance refuses to delete a referenced parent) by ordering longest
/// branch names first.
/// The write path self-heals (2) on the next write to the table
/// (`reclaim_orphaned_fork_and_refork`); this is the guaranteed-convergence
/// backstop that also covers (1) and any table the write path never revisits.
///
/// The orphan test is therefore **per-table**, not per-branch-name: a Lance
/// branch `B` on table `T` is an orphan iff `B` is not a live manifest branch
/// at all (origin 1) OR the manifest's branch-`B` snapshot does not place `T`
/// on `B` (origin 2). A legitimately-forked table (`table_branch == Some(B)`)
/// is kept. `main` and internal/system branches are never candidates. Lance
/// refuses to force-delete a branch with referencing descendants, so children
/// are dropped before parents (longest name first). Idempotent and authority-
/// derived: no-ops once reconciled, and degrades to finding nothing if a future
/// Lance atomic multi-dataset branch op prevents orphans from forming.
pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconcileStats> {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
let keep: HashSet<String> = db
// Live manifest branches: the set whose per-table placements are
// authoritative. A branch absent here is a whole-branch (origin-1) orphan.
let live_branches: HashSet<String> = db
.coordinator
.read()
.await
@ -683,6 +691,12 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
.collect();
let mut stats = BranchReconcileStats::default();
// Per-branch snapshots are resolved once and cached across tables (few
// branches in practice); origin-2 detection consults the branch's own view.
// Failures are cached too: one branch-level read failure should not refetch
// and append duplicate per-table noise for every table that lists the ref.
let mut branch_snapshots: HashMap<String, crate::db::Snapshot> = HashMap::new();
let mut failed_branch_snapshots: HashSet<String> = HashSet::new();
// Per-table fault isolation: one table's transient failure is recorded and
// logged, never aborting the rest of the sweep.
@ -701,7 +715,104 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
continue;
}
};
for branch in orphan_branches(listed, &keep) {
// Decide per (table, branch) whether the fork is an orphan.
let mut orphans: Vec<String> = Vec::new();
for branch in listed {
// `main` is not a named Lance branch; system/internal branches
// (e.g. the schema-apply lock) own legitimate forks — never touch.
if branch == "main" || crate::db::is_internal_system_branch(&branch) {
continue;
}
let is_orphan = if !live_branches.contains(&branch) {
true // origin 1: whole branch gone from the manifest
} else {
// origin 2: live branch, but does the manifest place THIS
// table on it? Resolve (and cache) the branch's snapshot.
if failed_branch_snapshots.contains(&branch) {
continue;
}
if !branch_snapshots.contains_key(&branch) {
let branch_snapshot =
match crate::failpoints::maybe_fail("cleanup.resolve_branch_snapshot") {
Ok(()) => db.snapshot_for_branch(Some(&branch)).await,
Err(injected) => Err(injected),
};
match branch_snapshot {
Ok(snap) => {
branch_snapshots.insert(branch.clone(), snap);
}
Err(err) => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
error = %err,
"resolving branch snapshot failed during reconcile; skipping",
);
stats.failures.push((table_key.clone(), err.to_string()));
failed_branch_snapshots.insert(branch.clone());
continue;
}
}
}
branch_snapshots[&branch]
.entry(&table_key)
.map(|e| e.table_branch.as_deref() != Some(branch.as_str()))
.unwrap_or(true)
};
if is_orphan {
orphans.push(branch);
}
}
// Children before parents (longest name first) so Lance's referenced-
// parent RefConflict cannot block reclamation.
orphans.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b)));
for branch in orphans {
// Serialize against in-process live writers before destroying a ref.
// A first-write fork holds the per-(table, branch) write queue from
// before the fork through the manifest publish; on a LIVE branch its
// in-flight fork looks exactly like an origin-2 orphan (manifest not
// yet advanced). Acquire the same queue so cleanup waits for any such
// writer, then RE-VALIDATE under the queue with a fresh read: if the
// writer published in the meantime (table now placed on the branch),
// it is no longer an orphan — skip it. (Cross-process writers remain
// the documented one-winner-CAS gap.) One key held at a time → no
// lock-order inversion against multi-table `acquire_many` writers.
let _guard = db
.write_queue()
.acquire(&(table_key.clone(), Some(branch.clone())))
.await;
// Decide under the queue from FRESH authority via the shared
// classifier (same decision the write-path reclaim uses) — never
// from the sweep-start `live_branches` capture. A branch created
// AFTER that capture is absent from the stale set yet may already
// carry a legitimately-published fork (an in-process writer held
// this queue through its fork+publish; we just waited on it), so a
// stale "origin-1 ⇒ delete" shortcut would destroy a live fork.
// Only `Orphan` is reclaimed; `Indeterminate` (transient read) is
// skipped and recorded. (Cross-process writers remain the documented
// one-winner-CAS gap.) One key held at a time → no lock-order
// inversion vs multi-table `acquire_many` writers.
match super::table_ops::classify_fork_ref(db, &table_key, &branch).await {
super::table_ops::ForkRefStatus::Orphan => {}
super::table_ops::ForkRefStatus::Legitimate => continue,
super::table_ops::ForkRefStatus::Indeterminate => {
tracing::warn!(
target: "omnigraph::cleanup",
table = %table_key,
branch = %branch,
"fresh re-check inconclusive during reconcile; skipping to avoid \
destroying a possibly-live fork (will retry next cleanup)",
);
stats.failures.push((
table_key.clone(),
format!("indeterminate fork status for {branch}"),
));
continue;
}
}
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
Ok(()) => storage.force_delete_branch(&full_path, &branch).await,
Err(injected) => Err(injected),
@ -722,15 +833,17 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
}
}
// Commit-graph orphans (best-effort: the dataset may not exist on a graph
// that has never committed; any failure is isolated and retried next time).
if let Err(err) = reconcile_commit_graph_orphans(db, &keep, &mut stats).await {
// Commit-graph orphans are whole-branch (not per-table), so the simple
// "branch name not in the live set" test still applies there.
if let Err(err) = reconcile_commit_graph_orphans(db, &live_branches, &mut stats).await {
tracing::warn!(
target: "omnigraph::cleanup",
error = %err,
"commit-graph orphan reconcile failed; will retry next cleanup",
);
stats.failures.push(("_graph_commits".to_string(), err.to_string()));
stats
.failures
.push(("_graph_commits".to_string(), err.to_string()));
}
Ok(stats)
@ -758,7 +871,9 @@ async fn reconcile_commit_graph_orphans(
error = %err,
"reclaiming orphaned commit-graph branch failed; will retry next cleanup",
);
stats.failures.push(("_graph_commits".to_string(), err.to_string()));
stats
.failures
.push(("_graph_commits".to_string(), err.to_string()));
}
}
}
@ -787,3 +902,66 @@ pub(super) fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) ->
keys.sort();
keys
}
#[cfg(all(test, feature = "failpoints"))]
mod tests {
use super::*;
use crate::failpoints::ScopedFailPoint;
use crate::loader::{LoadMode, load_jsonl};
fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
#[tokio::test]
async fn reconcile_caches_live_branch_snapshot_resolution_failure() {
let _scenario = fail::FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = "node Person { name: String @key }\nnode Company { name: String @key }\n";
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Alice\"}}\n\
{\"type\":\"Company\",\"data\":{\"name\":\"Acme\"}}",
LoadMode::Merge,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
for type_name in ["Person", "Company"] {
let table_uri = node_table_uri(uri, type_name);
let mut ds = lance::Dataset::open(&table_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
}
let _fp = ScopedFailPoint::new("cleanup.resolve_branch_snapshot", "return");
let stats = reconcile_orphaned_branches(&db).await.unwrap();
assert_eq!(
stats.failures.len(),
1,
"one live-branch snapshot resolution failure should be reported once, \
not once per table: {:?}",
stats.failures
);
assert!(
stats.failures[0]
.1
.contains("cleanup.resolve_branch_snapshot"),
"the recorded failure should be the branch-snapshot resolution failure: {:?}",
stats.failures
);
assert!(
stats.reclaimed.is_empty(),
"unreadable live-branch refs must be left for the next cleanup run"
);
}
}

View file

@ -428,10 +428,10 @@ where
// manifest publish via `commit_changes_with_actor` below.
//
// Schema-apply already holds the graph-wide `__schema_apply_lock__`
// sentinel branch, so under PR 1b's intermediate state these
// per-table acquisitions are uncontended. They exist for symmetry
// with future MR-870 recovery, which will need queue acquisition
// before any `Dataset::restore` it issues for SchemaApply sidecars.
// sentinel branch, so these per-table acquisitions are uncontended in
// practice. They exist for symmetry with the recovery reconciler, which
// acquires the same queues before any `Dataset::restore` it issues for
// SchemaApply sidecars.
let mut schema_apply_queue_keys: Vec<(String, Option<String>)> = recovery_pins
.iter()
.map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))

View file

@ -164,9 +164,8 @@ pub(super) async fn ensure_indices_for_branch(
// that needs index work. Held across the per-table commit loop and
// the manifest publish at the end of this function. Sorted-order
// acquisition prevents lock-order inversion against concurrent
// multi-table writers (mutation finalize, branch_merge, future
// MR-870 recovery). Under PR 1b's intermediate state (global server
// RwLock still in place), this acquisition is uncontended.
// multi-table writers (mutation finalize, branch_merge, the fork
// path, recovery).
let queue_keys: Vec<(String, Option<String>)> = recovery_pins
.iter()
.map(|pin| (pin.table_key.clone(), pin.table_branch.clone()))
@ -582,8 +581,14 @@ pub(super) async fn open_owned_dataset_for_branch_write(
));
}
}
fork_dataset_from_entry_state(
db,
// The fork advances Lance state before the manifest publish. The
// caller holds the per-(table, active_branch) write queue from
// before this fork through the publish, so a leftover ref is a
// manifest-unreferenced fork (interrupted prior fork, or
// delete+recreate), not a live in-process fork. The wrapper
// self-heals it (reclaim + re-fork); see
// `Omnigraph::fork_dataset_from_entry_state`.
db.fork_dataset_from_entry_state(
table_key,
full_path,
source_branch,
@ -611,7 +616,7 @@ pub(super) async fn fork_dataset_from_entry_state(
source_branch: Option<&str>,
source_version: u64,
active_branch: &str,
) -> Result<SnapshotHandle> {
) -> Result<crate::storage_layer::ForkOutcome<SnapshotHandle>> {
db.storage()
.fork_branch_from_state(
full_path,
@ -623,6 +628,172 @@ pub(super) async fn fork_dataset_from_entry_state(
.await
}
/// Classification of a Lance branch ref `B` on table `T` against FRESH manifest
/// authority — the single decision both fork-ref reclaim sites share: the
/// write-path reclaim ([`reclaim_orphaned_fork_and_refork`]) and the cleanup
/// reconciler (`optimize::reconcile_orphaned_branches`). Having one classifier
/// keeps the two destructive sites from drifting (the bug history: each was
/// hardened separately and the other lagged).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ForkRefStatus {
/// The manifest places `T` on `B` — a legitimate fork. Never destroy.
Legitimate,
/// The manifest does not reference this fork (`T` not on `B`, or `B` absent
/// from the manifest entirely). Reclaimable.
Orphan,
/// Fresh authority could not be established (a transient read failure on a
/// live branch). Ambiguous — do not destroy; the caller retries / converges.
Indeterminate,
}
/// Classify a fork ref from FRESH manifest authority (bypasses the coordinator
/// cache). MUST be called with the per-`(table, branch)` write queue held, so
/// the classification is stable against in-process writers for the caller's
/// critical section. Both reclaim sites map the result to their own action
/// (write path: reclaim vs retryable; cleanup: delete vs skip), but the
/// destroy-only-on-`Orphan` rule is enforced here, once.
pub(crate) async fn classify_fork_ref(
db: &Omnigraph,
table_key: &str,
branch: &str,
) -> ForkRefStatus {
// `classify.fresh_read` failpoint: simulate a transient failure of the
// fresh-authority read (no-op without the `failpoints` feature). Lets a
// test exercise the Indeterminate path — a read failure on a live branch
// must classify as Indeterminate (skip), never Orphan (destroy).
let fresh = match crate::failpoints::maybe_fail("classify.fresh_read") {
Ok(()) => db.fresh_snapshot_for_branch(Some(branch)).await,
Err(injected) => Err(injected),
};
match fresh {
Ok(snap) => {
let placed = snap
.entry(table_key)
.map(|e| e.table_branch.as_deref() == Some(branch))
.unwrap_or(false);
if placed {
ForkRefStatus::Legitimate
} else {
// Branch resolves but the manifest does not place this table on
// it — a manifest-unreferenced fork.
ForkRefStatus::Orphan
}
}
// Branch did not resolve. `all_branches` lists `_refs/branches/` live, so
// absent there = genuinely no such manifest branch (origin-1 orphan);
// present (or a list error) = transient read — never destroy on that.
Err(_) => match db.coordinator.read().await.all_branches().await {
Ok(fresh) if !fresh.iter().any(|b| b == branch) => ForkRefStatus::Orphan,
_ => ForkRefStatus::Indeterminate,
},
}
}
/// Reclaim a manifest-unreferenced fork and re-fork in its place.
///
/// Reached when `fork_branch_from_state` reports `RefAlreadyExists`. This is a
/// destructive op (it force-deletes a Lance branch ref), so it owns its own
/// safety precondition rather than trusting the caller's: it re-derives, via
/// [`classify_fork_ref`], that the manifest does not place this table on
/// `active_branch`. The caller's earlier proof may have come from the
/// coordinator's *cached* branch snapshot (`resolved_branch_target` returns
/// the cache when the handle is bound to `active_branch` — an embedded handle
/// on the branch, or `branch_merge`'s target swap); trusting it could
/// force-delete a fork a concurrent writer just legitimately published. Only
/// once fresh authority confirms the ref is unreferenced does it drop the ref
/// (idempotent `force_delete_branch`) and re-fork, exactly once.
///
/// If fresh authority shows the table IS on `active_branch` (a legitimate
/// concurrent fork), or a second collision occurs after reclaim (a foreign-
/// process writer recreated the ref — the documented one-winner-CAS gap), it
/// surfaces a retryable conflict; on retry the winner's fork is visible and
/// the no-fork path runs.
pub(super) async fn reclaim_orphaned_fork_and_refork(
db: &Omnigraph,
table_key: &str,
full_path: &str,
source_branch: Option<&str>,
source_version: u64,
active_branch: &str,
) -> Result<SnapshotHandle> {
// Self-validate against FRESH authority before destroying anything. Only an
// Orphan is reclaimable; a Legitimate status (a concurrent writer published
// a real fork despite the caller's possibly-cached proof) or an
// Indeterminate one (transient read) surfaces a retryable conflict rather
// than stranding the manifest at a version the recreated ref won't have.
match classify_fork_ref(db, table_key, active_branch).await {
ForkRefStatus::Orphan => {}
ForkRefStatus::Legitimate => {
let actual = db
.fresh_snapshot_for_branch(Some(active_branch))
.await
.ok()
.and_then(|s| s.entry(table_key).map(|e| e.table_version))
.unwrap_or(source_version);
return Err(OmniError::manifest_expected_version_mismatch(
table_key,
source_version,
actual,
));
}
ForkRefStatus::Indeterminate => {
return Err(OmniError::manifest_conflict(format!(
"could not verify whether branch '{active_branch}' still owns an orphaned \
fork for table '{table_key}' because fresh manifest authority was \
unavailable; refresh and retry"
)));
}
}
crate::failpoints::maybe_fail("fork.before_reclaim")?;
db.storage()
.force_delete_branch(full_path, active_branch)
.await
.map_err(|e| {
// Lance refuses to delete a branch with dependent child branches
// even under force (RefConflict). Unreachable for a leaf first-write
// fork (the cleanup reconciler also drops children before parents),
// but surface it actionably if it ever happens. We match loosely on
// "referenc" rather than the exact prose, which is not a Lance API
// contract; a typed RefConflict variant through `force_delete_branch`
// is the durable follow-up.
if e.to_string().contains("referenc") {
OmniError::manifest_conflict(format!(
"branch '{active_branch}' cannot reclaim the leftover fork for \
table '{table_key}' because it has dependent child branches; \
delete the child branches (or run `omnigraph cleanup`) first"
))
} else {
e
}
})?;
match fork_dataset_from_entry_state(
db,
table_key,
full_path,
source_branch,
source_version,
active_branch,
)
.await?
{
crate::storage_layer::ForkOutcome::Created(ds) => Ok(ds),
crate::storage_layer::ForkOutcome::RefAlreadyExists => {
let live = db.fresh_snapshot_for_branch(Some(active_branch)).await?;
let actual = live
.entry(table_key)
.map(|e| e.table_version)
.unwrap_or(source_version);
Err(OmniError::manifest_expected_version_mismatch(
table_key,
source_version,
actual,
))
}
}
}
pub(super) async fn reopen_for_mutation(
db: &Omnigraph,
table_key: &str,
@ -1127,3 +1298,78 @@ pub(super) async fn ensure_commit_graph_initialized(db: &Omnigraph) -> Result<()
pub(super) async fn invalidate_graph_index(db: &Omnigraph) {
db.runtime_cache.invalidate_all().await;
}
#[cfg(test)]
mod classify_fork_ref_tests {
//! Direct coverage of [`classify_fork_ref`] — the single fresh-authority
//! decision both fork-ref reclaim sites (write-path reclaim + cleanup
//! reconciler) route through. Pins each deterministic status so reverting
//! the fresh-authority logic at either site fails here. (The `Indeterminate`
//! arm needs an injected transient read and is covered under the
//! `failpoints` suite.)
use super::*;
use crate::db::Omnigraph;
use crate::loader::LoadMode;
const SCHEMA: &str = "node Person { name: String @key }\nnode Company { name: String @key }\n";
/// On-disk dataset path for a node table, taken from the manifest entry
/// (the same path the engine uses) so the test forges against the real ref.
async fn node_path(db: &Omnigraph, branch: &str, table_key: &str) -> String {
let snap = db.snapshot_for_branch(Some(branch)).await.unwrap();
let entry = snap.entry(table_key).unwrap();
format!("{}/{}", db.root_uri, entry.table_path)
}
#[tokio::test]
async fn classify_distinguishes_legitimate_unreferenced_and_ghost() {
let dir = tempfile::tempdir().unwrap();
let db = Omnigraph::init(dir.path().to_str().unwrap(), SCHEMA)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
// Legitimate: a real write forks Company onto `feature`, and the
// manifest places Company on `feature`.
db.load_as(
"feature",
None,
r#"{"type":"Company","data":{"name":"Acme"}}"#,
LoadMode::Merge,
None,
)
.await
.unwrap();
assert_eq!(
classify_fork_ref(&db, "node:Company", "feature").await,
ForkRefStatus::Legitimate,
"a manifest-placed fork must classify as Legitimate (never destroyed)"
);
// Orphan (manifest-unreferenced): forge a `feature` ref on Person, which
// the manifest's `feature` snapshot still places on main.
let person = node_path(&db, "feature", "node:Person").await;
{
let mut ds = lance::Dataset::open(&person).await.unwrap();
let v = ds.version().version;
ds.create_branch("feature", v, None).await.unwrap();
}
assert_eq!(
classify_fork_ref(&db, "node:Person", "feature").await,
ForkRefStatus::Orphan,
"a ref the manifest does not place on the branch must classify as Orphan"
);
// Orphan (ghost): a ref for a branch the manifest does not have at all.
{
let mut ds = lance::Dataset::open(&person).await.unwrap();
let v = ds.version().version;
ds.create_branch("ghost", v, None).await.unwrap();
}
assert_eq!(
classify_fork_ref(&db, "node:Person", "ghost").await,
ForkRefStatus::Orphan,
"a ref for a branch absent from the manifest must classify as Orphan"
);
}
}

View file

@ -1,12 +1,15 @@
//! Per-`(table_key, branch)` writer queues — MR-686 scaffolding.
//! Per-`(table_key, branch)` writer queues.
//!
//! Today every server-layer write serializes on the global
//! `Arc<RwLock<Omnigraph>>` in `AppState`. MR-686 replaces that with
//! per-`(table_key, branch_ref)` queues so disjoint-key writes proceed
//! concurrently. This module owns the queue data structure; callers in
//! `MutationStaging::commit_all`, `branch_merge`, `schema_apply`,
//! `ensure_indices`, `delete_where`, and the future MR-870 recovery
//! reconciler acquire guards before any per-table Lance commit.
//! These queues are the engine's write-serialization mechanism: the server
//! holds the engine as a lockless `Arc<Omnigraph>` (writes are `&self`), so
//! disjoint-key writes proceed concurrently and only writes to the same
//! `(table_key, branch_ref)` serialize here. This module owns the queue
//! data structure; callers in `MutationStaging::commit_all`, `branch_merge`,
//! `schema_apply`, `ensure_indices`, `delete_where`, the fork path (first
//! write to a table on a branch — acquired before the fork, held through the
//! manifest publish), and the recovery reconciler acquire guards before any
//! per-table Lance commit. Serialization is in-process only; cross-process
//! writers on one graph remain one-winner-CAS at the manifest publish.
//!
//! ## Why exclusive `tokio::sync::Mutex<()>` per key
//!

View file

@ -1323,9 +1323,9 @@ impl Omnigraph {
// branch_merge writes only to the target branch.
//
// Held across the per-table publish loop and the manifest
// commit + record_merge_commit calls below. Under PR 1b's
// intermediate state (global server RwLock still in place),
// this acquisition is uncontended.
// commit + record_merge_commit calls below, so no concurrent
// writer to a touched (table, target_branch) can interleave
// between our commit_staged and our publish.
let active_branch_for_keys = self.active_branch().await;
let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
.iter()

View file

@ -741,14 +741,45 @@ impl Omnigraph {
// tables. Branch is threaded explicitly — no coordinator swap.
let mut staging = MutationStaging::default();
// Lower + validate up front so the touched-table set is known before
// execution. A lowering/validation error returns exactly as it did
// when this happened inside execute_named_mutation.
let ir = self.lower_named_mutation(query_source, query_name)?;
// Up-front fork-queue acquisition (see the loader for the full
// rationale): if this mutation will fork any touched table onto a
// non-main branch, acquire the per-(table, branch) write queues for
// every touched table before the first fork and hold them through the
// publish, so the orphan-fork reclaim can't race a concurrent
// in-process fork. The touched set is derived from the lowered IR.
let fork_queue_guards: Option<(
Vec<(String, Option<String>)>,
Vec<tokio::sync::OwnedMutexGuard<()>>,
)> = if let Some(active) = requested.as_deref() {
let snapshot = self.snapshot_for_branch(Some(active)).await?;
let touched: Vec<(String, Option<String>)> = self
.touched_table_keys(&ir)
.into_iter()
.map(|k| (k, Some(active.to_string())))
.collect();
let needs_fork = touched.iter().any(|(table_key, _)| {
snapshot
.entry(table_key)
.map(|e| e.table_branch.as_deref() != Some(active))
.unwrap_or(false)
});
if needs_fork {
let guards = self.write_queue().acquire_many(&touched).await;
Some((touched, guards))
} else {
None
}
} else {
None
};
let exec_result = self
.execute_named_mutation(
query_source,
query_name,
&resolved_params,
requested.as_deref(),
&mut staging,
)
.execute_named_mutation(&ir, &resolved_params, requested.as_deref(), &mut staging)
.await;
match exec_result {
@ -768,6 +799,7 @@ impl Omnigraph {
requested.as_deref(),
crate::db::manifest::SidecarKind::Mutation,
actor_id,
fork_queue_guards,
)
.await?;
// Failpoint that wedges the documented finalize→publisher
@ -817,14 +849,19 @@ impl Omnigraph {
}
}
async fn execute_named_mutation(
/// Lower + validate a named mutation query into its IR.
///
/// Hoisted out of [`Self::execute_named_mutation`] so the caller can
/// inspect the IR before execution — specifically to compute the
/// touched-table set (see [`Self::touched_table_keys`]) for up-front
/// write-queue acquisition. Performs the same find → typecheck → lower
/// → D₂ checks that execution previously did inline, so error behavior
/// is unchanged.
fn lower_named_mutation(
&self,
query_source: &str,
query_name: &str,
params: &ParamMap,
branch: Option<&str>,
staging: &mut MutationStaging,
) -> Result<MutationResult> {
) -> Result<omnigraph_compiler::ir::MutationIR> {
let query_decl = omnigraph_compiler::find_named_query(query_source, query_name)
.map_err(|e| OmniError::manifest(e.to_string()))?;
@ -841,7 +878,61 @@ impl Omnigraph {
let ir = lower_mutation_query(&query_decl)?;
// D₂: reject mixed insert/update + delete before any I/O.
enforce_no_mixed_destructive_constructive(&ir)?;
Ok(ir)
}
/// The COMPLETE set of `(node|edge):{type}` table keys a mutation IR can
/// touch at execution time, keyed as `MutationStaging`/`commit_all` key
/// them. Must be a superset of everything execution forks/commits, since
/// it drives the up-front fork-queue acquisition and `commit_all`'s
/// held-guard coverage check — a miss means an unserialized fork/commit.
///
/// The set is a pure function of (IR ops + catalog). For each op it mirrors
/// the execute path's node-vs-edge dispatch (`node_types` first, then
/// `edge_types`). A `delete <Node>` additionally **cascades** to every edge
/// type whose endpoint is that node (see `execute_delete_node`), forking
/// those edge tables during execution — so they are included here, derived
/// the same way the executor derives them (`from_type`/`to_type` match).
/// Unknown types are skipped (the execute path surfaces the error).
/// Sorted + deduped for one-shot `acquire_many`.
fn touched_table_keys(&self, ir: &omnigraph_compiler::ir::MutationIR) -> Vec<String> {
use omnigraph_compiler::ir::MutationOpIR;
let catalog = self.catalog();
let mut keys: Vec<String> = Vec::new();
for op in &ir.ops {
let type_name = match op {
MutationOpIR::Insert { type_name, .. }
| MutationOpIR::Update { type_name, .. }
| MutationOpIR::Delete { type_name, .. } => type_name,
};
if catalog.node_types.contains_key(type_name) {
keys.push(format!("node:{type_name}"));
// A node delete cascades to every edge touching this node type,
// forking those edge tables. Include them so the up-front
// acquisition covers the cascade (mirrors execute_delete_node).
if matches!(op, MutationOpIR::Delete { .. }) {
for (edge_name, edge_type) in &catalog.edge_types {
if edge_type.from_type == *type_name || edge_type.to_type == *type_name {
keys.push(format!("edge:{edge_name}"));
}
}
}
} else if catalog.edge_types.contains_key(type_name) {
keys.push(format!("edge:{type_name}"));
}
}
keys.sort();
keys.dedup();
keys
}
async fn execute_named_mutation(
&self,
ir: &omnigraph_compiler::ir::MutationIR,
params: &ParamMap,
branch: Option<&str>,
staging: &mut MutationStaging,
) -> Result<MutationResult> {
let mut total = MutationResult::default();
for op in &ir.ops {
let result = match op {

View file

@ -463,12 +463,28 @@ impl StagedMutation {
/// unreferenced (cleaned by `cleanup_old_versions`'s age sweep)
/// rather than being committed and creating a Lance-HEAD-ahead
/// residual.
/// `held_guards`: when the caller already holds the per-`(table_key,
/// branch)` write queues for every touched table (the fork path acquires
/// them up front, before the fork, and holds them through the manifest
/// publish), it passes `(acquired_keys, guards)` here so `commit_all`
/// reuses them instead of re-acquiring — the queue is a non-re-entrant
/// `tokio::Mutex`, so re-acquiring a held key would self-deadlock.
/// `None` (the steady-state path) means `commit_all` acquires them
/// itself. `acquired_keys` must cover every key `commit_all` would
/// acquire (debug-asserted below) — the guards from `acquire_many` don't
/// carry their keys, so the caller hands the key set alongside them. The
/// fork path guarantees coverage by keying every touched table uniformly
/// by the resolved target branch.
pub(crate) async fn commit_all(
self,
db: &crate::db::Omnigraph,
branch: Option<&str>,
sidecar_kind: SidecarKind,
actor_id: Option<&str>,
held_guards: Option<(
Vec<(String, Option<String>)>,
Vec<tokio::sync::OwnedMutexGuard<()>>,
)>,
) -> Result<(
Vec<SubTableUpdate>,
HashMap<String, u64>,
@ -483,21 +499,18 @@ impl StagedMutation {
op_kinds,
} = self;
// Acquire per-(table_key, branch) queues for every touched
// table — both staged and inline-committed. Sorted by
// `acquire_many` internally so all multi-table writers
// (mutation, branch_merge, schema_apply, future MR-870
// recovery) agree on acquisition order — prevents lock-order
// inversion deadlock.
// Per-(table_key, branch) queues for every touched table — both
// staged and inline-committed. Sorted by `acquire_many` internally
// so all multi-table writers (mutation, branch_merge, schema_apply,
// the fork path, recovery) agree on acquisition order — prevents
// lock-order inversion deadlock.
//
// For inline-committed tables (delete-only mutations), Lance
// HEAD has already advanced inside `delete_where` before
// `commit_all` runs. Holding the queue here doesn't prevent
// that interleaving (commit 6 will move queue acquisition into
// `delete_where`'s call site); it does prevent another writer
// from interleaving between our delete and our publish, which
// would otherwise leave a Lance-HEAD-ahead residual the
// delete-only sidecar (added below) would have to recover.
// For inline-committed tables (delete-only mutations), Lance HEAD
// has already advanced inside `delete_where` before `commit_all`
// runs. Holding the queue here prevents another writer from
// interleaving between our delete and our publish, which would
// otherwise leave a Lance-HEAD-ahead residual the delete-only
// sidecar (added below) would have to recover.
let mut queue_keys: Vec<(String, Option<String>)> =
Vec::with_capacity(staged.len() + inline_committed.len());
for entry in &staged {
@ -512,7 +525,30 @@ impl StagedMutation {
})?;
queue_keys.push((table_key.clone(), path.table_branch.clone()));
}
let guards = db.write_queue().acquire_many(&queue_keys).await;
// Reuse the caller's guards (fork path) when handed in, else acquire
// our own. When reusing, every key we would acquire MUST already be
// covered — re-acquiring a held non-re-entrant key would deadlock, and
// a key we'd need but DON'T hold would commit unserialized. This is a
// load-bearing safety invariant, so it is checked in ALL builds (not a
// debug_assert) and fails the write loudly+safely rather than silently
// proceeding unguarded if a future execution path ever touches a table
// outside the caller's pre-computed set.
let guards = match held_guards {
Some((acquired_keys, guards)) => {
let held: std::collections::HashSet<&(String, Option<String>)> =
acquired_keys.iter().collect();
if let Some(missing) = queue_keys.iter().find(|k| !held.contains(k)) {
return Err(OmniError::manifest_internal(format!(
"commit_all: pre-held write-queue guards do not cover touched table \
'{}' on branch {:?} the caller's up-front acquisition set diverged \
from the staged/inline set (a touched-table-set bug)",
missing.0, missing.1
)));
}
guards
}
None => db.write_queue().acquire_many(&queue_keys).await,
};
// Re-capture manifest pins under the queue (PR 2 / MR-686).
//

View file

@ -418,6 +418,45 @@ async fn load_jsonl_reader<R: BufRead>(
LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
};
// Up-front fork-queue acquisition. The first write to a table on a
// non-main branch forks it (create_branch), which advances Lance state
// before the manifest publish; the reclaim of any manifest-unreferenced
// leftover (`reclaim_orphaned_fork_and_refork`) must not race a concurrent
// in-process fork. So when this load will fork at least one touched table,
// acquire the per-(table, branch) write queues for ALL touched tables up
// front (one sorted `acquire_many`, keyed uniformly by the target branch
// so it covers what `commit_all` recomputes) and hold them through the
// publish. Main-branch loads never fork; branch loads where every touched
// table is already forked skip this and let `commit_all` acquire at commit.
let fork_queue_guards: Option<(
Vec<(String, Option<String>)>,
Vec<tokio::sync::OwnedMutexGuard<()>>,
)> = if let Some(active) = branch {
let touched: Vec<(String, Option<String>)> = node_rows
.keys()
.map(|t| (format!("node:{t}"), Some(active.to_string())))
.chain(
edge_rows
.keys()
.map(|e| (format!("edge:{e}"), Some(active.to_string()))),
)
.collect();
let needs_fork = touched.iter().any(|(table_key, _)| {
snapshot
.entry(table_key)
.map(|e| e.table_branch.as_deref() != Some(active))
.unwrap_or(false)
});
if needs_fork {
let guards = db.write_queue().acquire_many(&touched).await;
Some((touched, guards))
} else {
None
}
} else {
None
};
// Phase 2a: build and validate every node batch up front. Cheap and
// synchronous — surfaces validation errors before any S3 traffic.
let mut prepared_nodes: Vec<(String, String, RecordBatch, usize)> =
@ -551,7 +590,13 @@ async fn load_jsonl_reader<R: BufRead>(
// across the manifest publish below — see exec/mutation.rs for
// the rationale (interleaving prevention).
let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
.commit_all(db, branch, crate::db::manifest::SidecarKind::Load, actor_id)
.commit_all(
db,
branch,
crate::db::manifest::SidecarKind::Load,
actor_id,
fork_queue_guards,
)
.await?;
// Same finalize → publisher residual as mutations: per-table
// staged commits have advanced Lance HEAD, but the manifest

View file

@ -184,6 +184,26 @@ pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWr
handles.iter().map(|h| h.inner.clone()).collect()
}
/// Outcome of a per-table branch fork (`fork_branch_from_state`).
///
/// `RefAlreadyExists` means a Lance branch ref for the target already exists
/// on the dataset, so `create_branch` could not create it cleanly. By the
/// fork caller's contract — the caller re-checks the live manifest under the
/// held per-`(table, branch)` write queue and only forks when the manifest
/// does *not* place the table on the branch — such a ref is a
/// manifest-unreferenced fork (the residue of an interrupted prior fork, or a
/// delete+recreate), which the caller reclaims and re-forks. The fork
/// operation does not editorialize ("incomplete prior delete"); it returns
/// this typed signal and lets the db layer decide.
// `pub` (not `pub(crate)`) to match the visibility of the sealed
// `TableStorage::fork_branch_from_state` that returns it (and the already-`pub`
// `SnapshotHandle`); avoids a private-interfaces warning. The trait is sealed,
// so this widening does not let external code construct or branch on it.
pub enum ForkOutcome<D> {
Created(D),
RefAlreadyExists,
}
// ─── TableStorage trait ────────────────────────────────────────────────────
/// Engine-internal trait covering every Lance dataset operation an
@ -231,7 +251,7 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
table_key: &str,
source_version: u64,
target_branch: &str,
) -> Result<SnapshotHandle>;
) -> Result<ForkOutcome<SnapshotHandle>>;
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
@ -497,17 +517,22 @@ impl TableStorage for TableStore {
table_key: &str,
source_version: u64,
target_branch: &str,
) -> Result<SnapshotHandle> {
TableStore::fork_branch_from_state(
self,
dataset_uri,
source_branch,
table_key,
source_version,
target_branch,
) -> Result<ForkOutcome<SnapshotHandle>> {
Ok(
match TableStore::fork_branch_from_state(
self,
dataset_uri,
source_branch,
table_key,
source_version,
target_branch,
)
.await?
{
ForkOutcome::Created(ds) => ForkOutcome::Created(SnapshotHandle::new(ds)),
ForkOutcome::RefAlreadyExists => ForkOutcome::RefAlreadyExists,
},
)
.await
.map(SnapshotHandle::new)
}
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {

View file

@ -26,6 +26,7 @@ use std::sync::Arc;
use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
use crate::db::{Snapshot, SubTableEntry};
use crate::error::{OmniError, Result};
use crate::storage_layer::ForkOutcome;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableState {
@ -285,7 +286,7 @@ impl TableStore {
table_key: &str,
source_version: u64,
target_branch: &str,
) -> Result<Dataset> {
) -> Result<ForkOutcome<Dataset>> {
let mut source_ds = self
.open_dataset_head(dataset_uri, source_branch)
.await?
@ -294,31 +295,49 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))?;
self.ensure_expected_version(&source_ds, table_key, source_version)?;
if source_ds
if let Err(create_err) = source_ds
.create_branch(target_branch, source_version, None)
.await
.is_err()
{
// The target branch ref already exists. The caller
// (`open_owned_dataset_for_branch_write`) re-reads the live manifest
// before forking and returns a retryable error when a concurrent
// writer legitimately holds the fork, so reaching here means the
// manifest does NOT reference this fork: it is an orphan from an
// incomplete prior `branch_delete`. Surface the actionable cleanup
// error rather than guessing from Lance branch versions.
return Err(OmniError::manifest_conflict(format!(
"branch '{}' has orphaned table state for '{}' from an incomplete \
prior delete; run `omnigraph cleanup` to reclaim it before reusing \
this branch name",
target_branch, table_key
)));
// Disambiguate the failure: only a genuinely pre-existing ref is a
// reclaim candidate. Mapping EVERY create_branch failure to
// `RefAlreadyExists` would route a transient I/O / version / Lance
// internal error into the destructive reclaim path. So check whether
// the ref actually exists; if not, the failure is real — propagate
// it (preserving error fidelity) rather than force-deleting.
//
// `list_branches` reads `_refs/branches/` from the store, so it sees
// a fully-formed manifest-unreferenced fork (our common case — a
// create_branch that completed but whose manifest publish did not).
// It does NOT see a phase-1-only Lance "zombie" (tree dir written,
// no BranchContents) — but neither does `cleanup`'s reconciler, also
// list_branches-based. A zombie only forms if create_branch is
// interrupted *between its two internal phases* (a far narrower
// window than the manifest-publish gap), and it surfaces here as the
// propagated create error requiring manual reclaim. We deliberately
// do NOT force-delete on a not-found-ref failure: it is
// indistinguishable from a transient error on a fresh create, and
// force-deleting there is the destructive overreach this guard
// removes. The caller holds the per-(table, branch) write queue, so
// no in-process writer races this fork; a cross-process create
// between our check and now is the documented one-winner-CAS gap and
// propagates as a retryable error.
let ref_exists = source_ds
.list_branches()
.await
.map(|b| b.contains_key(target_branch))
.unwrap_or(false);
if ref_exists {
return Ok(ForkOutcome::RefAlreadyExists);
}
return Err(OmniError::Lance(create_err.to_string()));
}
let ds = self
.open_dataset_head(dataset_uri, Some(target_branch))
.await?;
self.ensure_expected_version(&ds, table_key, source_version)?;
Ok(ds)
Ok(ForkOutcome::Created(ds))
}
pub async fn scan_batches(&self, ds: &Dataset) -> Result<Vec<RecordBatch>> {

View file

@ -5,7 +5,9 @@ mod helpers;
use fail::FailScenario;
use futures::FutureExt;
use omnigraph::db::Omnigraph;
use omnigraph::error::{ManifestErrorKind, OmniError};
use omnigraph::failpoints::ScopedFailPoint;
use omnigraph::loader::LoadMode;
use helpers::recovery::{
FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants,
@ -127,12 +129,12 @@ async fn branch_delete_partial_failure_converges_via_cleanup() {
}
// Reusing a branch name whose delete left an orphaned fork (before `cleanup`
// reconciles it) must fail with a clear, actionable error pointing at
// `cleanup`, not the opaque `ExpectedVersionMismatch` that leaks from the fork
// path. The recreate itself succeeds; the first write to the previously-forked
// table is where the stale orphan collides.
// reconciles it) must SELF-HEAL on the next write — the write reclaims the
// manifest-unreferenced fork and re-forks, rather than wedging with "incomplete
// prior delete; run cleanup". (This test was the inverse before the fork-as-
// idempotent-reconcile fix; its flip is the signal the bug class is closed.)
#[tokio::test]
async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() {
async fn recreate_over_orphaned_fork_self_heals_without_cleanup() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
@ -158,10 +160,10 @@ async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() {
}
// Recreate the name and write to the previously-forked table WITHOUT a
// cleanup in between.
// cleanup in between. The write must self-heal the stale orphan fork.
main.branch_create("feature").await.unwrap();
let mut feature2 = Omnigraph::open(&uri).await.unwrap();
let err = helpers::mutate_branch(
helpers::mutate_branch(
&mut feature2,
"feature",
MUTATION_QUERIES,
@ -169,20 +171,83 @@ async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() {
&mixed_params(&[("$name", "Frank")], &[("$age", 41)]),
)
.await
.expect_err("write should collide with the stale orphaned fork");
.expect("recreate-over-orphan write must self-heal, not require cleanup");
let msg = err.to_string();
assert!(
msg.contains("cleanup")
&& (msg.contains("orphan") || msg.contains("incomplete prior delete")),
"expected an actionable orphaned-fork error pointing at cleanup, got: {msg}"
);
assert!(
!msg.contains("expected manifest table version"),
"should not surface the opaque ExpectedVersionMismatch, got: {msg}"
// The recreated branch forks FRESH from main: the deleted branch's Eve is
// gone and only the new Frank is added on top of main's seed. A count of
// main + 2 would mean Eve resurrected from the stale fork (the bug).
let main_people = helpers::count_rows(&main, "node:Person").await;
let feature_people = helpers::count_rows_branch(&feature2, "feature", "node:Person").await;
assert_eq!(
feature_people,
main_people + 1,
"self-healed feature must fork fresh from main (+Frank only); \
main={main_people}, feature={feature_people} (main+2 Eve resurrected)"
);
}
// The write-path orphan reclaim shares the same fresh-authority classifier as
// cleanup. If that classifier is Indeterminate (transient read on a live
// branch), the write must return a clear retryable authority-read conflict and
// leave the ref in place. It must not squeeze the ambiguity through
// ExpectedVersionMismatch with expected == actual, which lies about the cause.
#[tokio::test]
async fn recreate_over_orphaned_fork_reports_indeterminate_authority_read() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let db = helpers::init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
}
let row = r#"{"type":"Person","data":{"name":"Grace","age":37}}"#;
{
let _fp = ScopedFailPoint::new("classify.fresh_read", "return");
let err = db
.load_as("feature", None, row, LoadMode::Merge, None)
.await
.expect_err("indeterminate authority read must fail retryably");
match &err {
OmniError::Manifest(manifest) => {
assert_eq!(manifest.kind, ManifestErrorKind::Conflict);
assert!(
manifest.details.is_none(),
"indeterminate authority read is not an expected-version mismatch: {manifest:?}"
);
}
other => panic!("expected manifest conflict, got {other:?}"),
}
let message = err.to_string();
assert!(
message.contains("could not verify")
&& message.contains("fresh manifest authority was unavailable")
&& message.contains("refresh and retry"),
"error should name the unavailable authority read, got: {message}"
);
assert!(
!message.contains("expected manifest table version"),
"indeterminate authority must not be reported as a version mismatch: {message}"
);
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"ambiguous orphan status must leave the fork for a later retry"
);
}
db.load_as("feature", None, row, LoadMode::Merge, None)
.await
.expect("when fresh authority is available, the orphan is reclaimed and write converges");
}
// cleanup is the guaranteed convergence backstop, so one table's transient
// failure must not abort the whole sweep. Inject a one-shot version-GC failure
// for a single table and assert: cleanup still succeeds, the failure is
@ -330,6 +395,68 @@ async fn cleanup_reclaims_orphaned_commit_graph_branch() {
}
}
// `classify_fork_ref` returns `Indeterminate` when the fresh-authority read
// fails on a LIVE branch — and a destructive caller must SKIP, never delete, on
// that ambiguity. Here the reconciler has a genuine origin-2 orphan candidate
// (a manifest-unreferenced Person fork on the live `feature` branch), but the
// `classify.fresh_read` failpoint makes the fresh re-check fail: cleanup must
// leave the ref in place (cannot confirm it is unreferenced), then reclaim it on
// the next run once the read succeeds. This pins the Indeterminate arm and the
// don't-destroy-on-ambiguity rule end-to-end through cleanup.
#[tokio::test]
async fn reconcile_skips_fork_when_fresh_recheck_is_unavailable_then_converges() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = helpers::init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
// Forge a manifest-unreferenced Person fork on the live `feature` branch —
// a genuine orphan the reconciler would normally reclaim.
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"precondition: forged orphan fork present"
);
}
// With the fresh re-check failing, the fork's status is Indeterminate (the
// branch is live but unreadable) → cleanup must SKIP it, not delete.
{
let _fp = ScopedFailPoint::new("classify.fresh_read", "return");
db.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"reconcile must NOT delete a fork whose fresh re-check is inconclusive"
);
}
// Read succeeds now → cleanup confirms the orphan and reclaims it (converges).
db.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("feature"),
"next cleanup (fresh read available) must reclaim the confirmed orphan"
);
}
}
// A branch_delete whose best-effort commit-graph reclaim fails leaves a
// commit-graph "zombie" branch. Recreating that name must heal the zombie and
// succeed (branch_create force-deletes a stale commit-graph ref since the

View file

@ -54,6 +54,19 @@ pub async fn init_and_load(dir: &tempfile::TempDir) -> Omnigraph {
db
}
/// On-disk Lance dataset URI for a node type, mirroring the engine's
/// `nodes/{fnv1a(type)}` layout. Used by tests that reach the raw Lance
/// dataset to forge or inspect branch state. (Local copies exist in
/// `failpoints.rs` / `maintenance.rs`; this is the shared one for new tests.)
pub fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
/// Read all rows from a sub-table by table_key.
pub async fn read_table(db: &Omnigraph, table_key: &str) -> Vec<RecordBatch> {
let snap = snapshot_main(db).await.unwrap();

View file

@ -844,6 +844,76 @@ async fn cleanup_reconciles_orphaned_branch_forks() {
.unwrap();
}
// cleanup must reclaim a manifest-unreferenced fork even when the BRANCH is
// still live (origin 2: an interrupted first-write fork), while KEEPING a table
// that is legitimately forked on that same live branch. Before the per-table
// authority broadening, the reconciler keyed only on the branch name and so
// never reclaimed a fork on a live branch — the wedge the handoff hit.
#[tokio::test]
async fn cleanup_reconciles_live_branch_orphan_fork_but_keeps_legitimate_fork() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
// Legitimately fork Company onto the live `feature` branch (a real write).
db.load_as(
"feature",
None,
r#"{"type":"Company","data":{"name":"Acme"}}"#,
LoadMode::Merge,
None,
)
.await
.unwrap();
// Forge a manifest-unreferenced Person fork on the SAME live branch: the
// manifest's `feature` snapshot still places Person on main (Person was
// never written on feature), so this ref is an origin-2 orphan.
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"precondition: forged orphan Person fork present on the live branch"
);
}
let company_uri = node_table_uri(&uri, "Company");
let main_people = count_rows(&db, "node:Person").await;
let main_companies = count_rows(&db, "node:Company").await;
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
// Origin-2 orphan reclaimed...
{
let ds = Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("feature"),
"cleanup must reclaim the manifest-unreferenced Person fork on the live branch"
);
}
// ...but the legitimate Company fork on the same live branch is kept.
{
let ds = Dataset::open(&company_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"cleanup must NOT reclaim a legitimately-forked table on a live branch"
);
}
// main is untouched.
assert_eq!(count_rows(&db, "node:Person").await, main_people);
assert_eq!(count_rows(&db, "node:Company").await, main_companies);
}
// Regression (iss-848): a table with rows but NULL vectors (the load-before-
// embed window) must not abort index building. The vector (IVF) index cannot
// train on 0 vectors, so `create_vector_index` errors with "KMeans cannot
@ -876,9 +946,9 @@ async fn index_build_tolerates_null_vector_rows() {
// Must not abort: the untrainable vector column is deferred, the sibling
// BTREE on `n` still builds.
db.ensure_indices().await.expect(
"ensure_indices must not abort when a vector column has no trainable vectors yet",
);
db.ensure_indices()
.await
.expect("ensure_indices must not abort when a vector column has no trainable vectors yet");
}
// iss-848: `optimize` converges declared-but-unbuilt indexes. After an @index is

View file

@ -1540,3 +1540,109 @@ async fn second_sequential_update_on_same_row_succeeds() {
"Alice's age must reflect the second update"
);
}
// An interrupted first-write fork (create_branch succeeded, the manifest
// publish did not) leaves a fully-formed Lance branch ref on the table that
// the manifest never references — a "manifest-unreferenced fork". The branch
// itself stays a valid manifest branch, so `cleanup`'s reconciler (keyed on
// the manifest branch list) never reclaims it. Today the next write to that
// table on that branch re-enters the fork path, `create_branch` collides, and
// the engine wedges with "incomplete prior delete; run `omnigraph cleanup`".
//
// We forge that exact residue (a live `feature` branch + a directly-created
// `feature` ref on the Person table the manifest doesn't reference) and assert
// the next write — via both `load` and `mutate` — self-heals by reclaiming the
// orphan fork and re-forking, rather than wedging. No process death / timing
// needed: the forge is the post-crash state.
#[tokio::test]
async fn first_write_self_heals_manifest_unreferenced_fork_on_live_branch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
// Forge the manifest-unreferenced fork directly at the Lance layer.
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"precondition: forged orphan fork present on Person"
);
}
// load → must self-heal, not wedge with "incomplete prior delete".
let row = r#"{"type":"Person","data":{"name":"Zoe","age":30}}"#;
db.load_as("feature", None, row, LoadMode::Merge, None)
.await
.expect("load onto a manifest-unreferenced fork must self-heal, not wedge");
// mutate → same path, must also self-heal.
mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Yan")], &[("$age", 41)]),
)
.await
.expect("mutate onto a manifest-unreferenced fork must self-heal");
// The healed branch holds the new rows; main is untouched (still no Zoe/Yan).
let feature_people = count_rows_branch(&db, "feature", "node:Person").await;
let main_people = count_rows(&db, "node:Person").await;
assert!(
feature_people >= main_people + 2,
"feature must contain the two new rows on top of the inherited set \
(feature={feature_people}, main={main_people})"
);
}
// A node delete cascades to every edge table touching that node, forking those
// edge tables during execution. The up-front fork-queue acquisition must cover
// those cascade-forked edges, not just the node table named in the IR — else
// commit_all's held-guard coverage check fails the write (and, before the
// coverage check was promoted out of debug-only, edge commits would slip
// through unserialized). This drives the new code via a DELETE (the only
// cascading op), on a branch, as the FIRST write (so it actually forks).
#[tokio::test]
async fn branch_cascade_delete_forks_node_and_edges_under_held_queues() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
// Baseline inherited from main (Alice has 2 Knows + 1 WorksAt edge).
let main_people = count_rows(&db, "node:Person").await;
let main_knows = count_rows(&db, "edge:Knows").await;
// First write to `feature` is `delete Person Alice`, whose cascade forks
// node:Person AND edge:Knows + edge:WorksAt. Pre-fix the up-front set held
// only node:Person, so commit_all's coverage check rejected the write.
mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"remove_person",
&mixed_params(&[("$name", "Alice")], &[]),
)
.await
.expect("branch cascade-delete must hold queues for cascade-forked edge tables");
// Alice and her edges are gone on feature; main is untouched.
assert_eq!(
count_rows_branch(&db, "feature", "node:Person").await,
main_people - 1,
"feature should have Alice removed from the inherited set"
);
assert!(
count_rows_branch(&db, "feature", "edge:Knows").await < main_knows,
"feature should have Alice's cascade-deleted Knows edges removed"
);
assert_eq!(
count_rows(&db, "node:Person").await,
main_people,
"main must be untouched by the branch delete"
);
}

View file

@ -197,6 +197,22 @@ them explicit.
one-winner-CAS territory; closing this fully needs a cross-process
serialization primitive (e.g. lease-based use of the schema-apply lock
branch) — design it before promoting multi-process write topologies.
- **Fork reclaim is in-process-safe only:** the first write to a table on a
branch forks it (a Lance `create_branch` that advances state before the
manifest publish). An interrupted fork (crash, or a cancelled request
future) leaves a manifest-unreferenced branch ref. The next write self-heals
it — `reclaim_orphaned_fork_and_refork` (`force_delete_branch` + re-fork)
— but reclaim is only safe because the writer holds the per-`(table,
branch)` write queue from before the fork through the publish AND re-checks
the live manifest under it, so no *in-process* writer can be mid-fork. A
reclaim cannot serialize against a foreign-*process* in-flight fork: it may
force-delete a peer's just-created ref, which makes that peer's commit fail
and retry — the same one-winner-CAS exposure as above, not corruption. The
reclaim never fires unless in-process-queue + manifest authority both prove
the ref is manifest-unreferenced. `cleanup`'s per-table reconciler
(`reconcile_orphaned_branches`) is the guaranteed backstop for any fork the
write path never revisits. Both degrade to a no-op if Lance ships an atomic
multi-dataset branch op.
- **Local `write_text_if_match` is not a cross-process CAS:** object-store
backends use a true conditional put (ETag If-Match; the in-memory test
backend too), but upstream `object_store` leaves `PutMode::Update`

View file

@ -19,8 +19,14 @@ publisher's row-level CAS on `__manifest` is the single fence.
`__run__*` branch on an upgraded graph is swept off `__manifest` by the
v2→v3 internal-schema migration on first read-write open. (The inert
`_graph_runs.lance` bytes remain until a `delete_prefix` primitive lands.)
- Cancelled mutation futures leave **no graph-level state** — only orphaned
Lance fragments, which the existing `omnigraph cleanup` pipe reclaims.
- Cancelled mutation futures leave **no graph-visible state** — the manifest
is never advanced. They can leave two kinds of unreferenced residue, both
self-healing: orphaned Lance fragments (reclaimed by `omnigraph cleanup`),
and — on the *first* write to a table on a branch, which forks it before the
publish — a manifest-unreferenced branch ref. The next write to that table
reclaims the stale fork and re-forks (`reclaim_orphaned_fork_and_refork`),
and `cleanup`'s per-table reconciler is the guaranteed backstop; see the
fork-reclaim note in [invariants.md](invariants.md).
## Read-your-writes within a multi-statement mutation