diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index b7e3041..d98c302 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -2699,20 +2699,33 @@ async fn main() -> Result<()> { "table_key": s.table_key, "bytes_removed": s.bytes_removed, "old_versions_removed": s.old_versions_removed, + "error": s.error, })).collect::>(), }); print_json(&value)?; } else { let total_bytes: u64 = stats.iter().map(|s| s.bytes_removed).sum(); let total_versions: u64 = stats.iter().map(|s| s.old_versions_removed).sum(); + let failed: Vec<&str> = stats + .iter() + .filter(|s| s.error.is_some()) + .map(|s| s.table_key.as_str()) + .collect(); println!( "cleanup {} ({}) — removed {} versions ({} bytes) across {} tables", uri, policy_desc, total_versions, total_bytes, - stats.len() + stats.len() - failed.len() ); + if !failed.is_empty() { + println!( + " {} table(s) failed and will be retried on the next cleanup: {}", + failed.len(), + failed.join(", ") + ); + } } } Command::Graphs { command } => match command { diff --git a/crates/omnigraph/src/db/commit_graph.rs b/crates/omnigraph/src/db/commit_graph.rs index 565bd69..9531a64 100644 --- a/crates/omnigraph/src/db/commit_graph.rs +++ b/crates/omnigraph/src/db/commit_graph.rs @@ -169,6 +169,37 @@ impl CommitGraph { self.refresh().await } + /// Idempotently drop the commit-graph branch `name`, tolerating an + /// already-absent branch (see [`TableStore::force_delete_branch`] for the + /// same semantics). Used by the best-effort reclaim in `branch_delete` and + /// the `cleanup` orphan reconciler. `RefConflict` (referencing descendants) + /// is still surfaced. + pub async fn force_delete_branch(&mut self, name: &str) -> Result<()> { + let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri)) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + match ds.force_delete_branch(name).await { + Ok(()) => {} + Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => {} + Err(e) => return Err(OmniError::Lance(e.to_string())), + } + self.refresh().await + } + + /// List the named branches present on the commit-graph dataset. The + /// `cleanup` reconciler diffs this against the manifest branch set to find + /// orphaned commit-graph branches to reclaim. + pub async fn list_branches(&self) -> Result> { + let ds = Dataset::open(&graph_commits_uri(&self.root_uri)) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let branches = ds + .list_branches() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(branches.into_keys().collect()) + } + pub async fn append_commit( &mut self, manifest_branch: Option<&str>, @@ -345,7 +376,7 @@ impl CommitGraph { } } -fn graph_commits_uri(root_uri: &str) -> String { +pub(crate) fn graph_commits_uri(root_uri: &str) -> String { format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR) } diff --git a/crates/omnigraph/src/db/graph_coordinator.rs b/crates/omnigraph/src/db/graph_coordinator.rs index a721036..dfe2767 100644 --- a/crates/omnigraph/src/db/graph_coordinator.rs +++ b/crates/omnigraph/src/db/graph_coordinator.rs @@ -211,14 +211,47 @@ impl GraphCoordinator { let branch = normalize_branch_name(name)? .ok_or_else(|| OmniError::manifest("cannot create branch 'main'".to_string()))?; self.ensure_commit_graph_initialized().await?; + + // Manifest authority flip first. self.manifest.create_branch(&branch).await?; - failpoints::maybe_fail("branch_create.after_manifest_branch_create")?; - if let Some(commit_graph) = &mut self.commit_graph { - commit_graph.create_branch(&branch).await?; + + // Derived commit-graph branch. If anything after the authority flip + // fails, roll back the manifest branch so the branch never half-exists + // (a manifest branch with no commit-graph branch breaks the next write). + if let Err(err) = self.create_commit_graph_branch(&branch).await { + if let Err(rollback_err) = self.manifest.delete_branch(&branch).await { + tracing::warn!( + target: "omnigraph::branch_create", + branch = %branch, + error = %rollback_err, + "rollback of manifest branch failed after commit-graph create failure", + ); + } + return Err(err); } Ok(()) } + /// Create the derived commit-graph branch for `branch`, healing a zombie ref + /// left by an incomplete prior delete. The manifest branch was just created + /// fresh, so any existing commit-graph branch with this name is provably + /// orphaned and is force-dropped before recreating. + async fn create_commit_graph_branch(&mut self, branch: &str) -> Result<()> { + failpoints::maybe_fail("branch_create.after_manifest_branch_create")?; + let Some(commit_graph) = &mut self.commit_graph else { + return Ok(()); + }; + if commit_graph + .list_branches() + .await? + .iter() + .any(|existing| existing == branch) + { + commit_graph.force_delete_branch(branch).await?; + } + commit_graph.create_branch(branch).await + } + pub async fn branch_delete(&mut self, name: &str) -> Result<()> { let branch = normalize_branch_name(name)? .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?; @@ -229,20 +262,43 @@ impl GraphCoordinator { ))); } + // Manifest authority flip — the single atomic op that makes the branch + // cease to exist. Must succeed; everything after is derived state + // reclaimed best-effort. self.manifest.delete_branch(&branch).await?; + // Commit-graph branch is derived state. Reclaim best-effort with the + // idempotent force variant: a failure here (or a missing dataset) is + // reconciled by `cleanup` and must not fail the delete after the + // authority already flipped. + if let Err(err) = self.reclaim_commit_graph_branch(&branch).await { + tracing::warn!( + target: "omnigraph::branch_delete::cleanup", + branch = %branch, + error = %err, + "best-effort commit-graph branch reclaim failed; cleanup will reconcile", + ); + } + + Ok(()) + } + + /// Best-effort, idempotent reclaim of the commit-graph branch `branch`. + /// Tolerates an absent commit-graph dataset (a graph that never committed). + async fn reclaim_commit_graph_branch(&mut self, branch: &str) -> Result<()> { + failpoints::maybe_fail("branch_delete.before_commit_graph_reclaim")?; if let Some(commit_graph) = &mut self.commit_graph { - commit_graph.delete_branch(&branch).await?; + commit_graph.force_delete_branch(branch).await } else if self .storage .exists(&graph_commits_uri(self.root_uri())) .await? { let mut commit_graph = CommitGraph::open(self.root_uri()).await?; - commit_graph.delete_branch(&branch).await?; + commit_graph.force_delete_branch(branch).await + } else { + Ok(()) } - - Ok(()) } pub async fn snapshot_at_version(&self, version: u64) -> Result { diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 5c92ac3..eb58623 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -1058,11 +1058,14 @@ impl Omnigraph { Ok(()) } - async fn cleanup_deleted_branch_tables( - &self, - branch: &str, - owned_tables: &[(String, String)], - ) -> Result<()> { + /// Best-effort reclaim of the per-table Lance forks a just-deleted branch + /// owned. Runs AFTER the manifest authority flip, so the branch is already + /// gone and these forks are unreachable orphans. A failure here (transient + /// object-store error, the `branch_delete.before_table_cleanup` failpoint) + /// is logged and swallowed: the `cleanup` reconciler is the guaranteed + /// backstop that converges any leftover orphan. Uses `force_delete_branch` + /// so a partially-reclaimed retry is idempotent. + async fn cleanup_deleted_branch_tables(&self, branch: &str, owned_tables: &[(String, String)]) { let mut seen_paths = HashSet::new(); let mut cleanup_targets = owned_tables .iter() @@ -1073,15 +1076,21 @@ impl Omnigraph { for (table_key, table_path) in cleanup_targets { let dataset_uri = self.table_store.dataset_uri(&table_path); - if let Err(err) = self.table_store.delete_branch(&dataset_uri, branch).await { - return Err(OmniError::manifest_internal(format!( - "branch '{}' was deleted but cleanup failed for {}: {}", - branch, table_key, err - ))); + let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup") + { + Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await, + Err(injected) => Err(injected), + }; + if let Err(err) = outcome { + tracing::warn!( + target: "omnigraph::branch_delete::cleanup", + branch = %branch, + table = %table_key, + error = %err, + "best-effort fork reclaim failed; cleanup will reconcile the orphan", + ); } } - - Ok(()) } async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> { @@ -1105,9 +1114,12 @@ impl Omnigraph { .map(|entry| (entry.table_key.clone(), entry.table_path.clone())) .collect::>(); + // Authority flip (+ best-effort commit-graph reclaim) — must succeed. self.coordinator.write().await.branch_delete(branch).await?; + // Best-effort per-table fork reclaim; cleanup reconciles any leftover. self.cleanup_deleted_branch_tables(branch, &owned_tables) - .await + .await; + Ok(()) } pub(crate) fn normalize_branch_name(branch: &str) -> Result> { diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index e158dc7..c703836 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -64,12 +64,15 @@ pub struct TableOptimizeStats { pub committed: bool, } -/// Per-table outcome of `cleanup_all_tables`. +/// Per-table outcome of `cleanup_all_tables`. `error` is `Some` when this +/// table's version GC failed; cleanup is fault-isolated per table, so a single +/// table's failure is recorded here rather than aborting the whole sweep. #[derive(Debug, Clone)] pub struct TableCleanupStats { pub table_key: String, pub bytes_removed: u64, pub old_versions_removed: u64, + pub error: Option, } /// Run Lance `compact_files` on every node + edge table on `main`. @@ -138,6 +141,26 @@ pub async fn cleanup_all_tables( db.ensure_schema_state_valid().await?; db.ensure_schema_apply_idle("cleanup").await?; + // Reclaim orphaned branch forks (from an incomplete prior `branch_delete`) + // before version GC. Authority-derived and idempotent; the eager + // best-effort reclaim in `branch_delete` covers the common case, this is + // the guaranteed backstop. Logged for observability. + let reconciled = reconcile_orphaned_branches(db).await?; + if !reconciled.reclaimed.is_empty() { + tracing::info!( + count = reconciled.reclaimed.len(), + reclaimed = ?reconciled.reclaimed, + "cleanup reconciled orphaned branch forks" + ); + } + if !reconciled.failures.is_empty() { + tracing::warn!( + count = reconciled.failures.len(), + failures = ?reconciled.failures, + "cleanup could not reconcile some orphaned forks; will retry next cleanup" + ); + } + let before_timestamp = options.older_than.map(|d| Utc::now() - d); let keep_versions = options.keep_versions; @@ -160,36 +183,205 @@ pub async fn cleanup_all_tables( let concurrency = maint_concurrency().min(table_tasks.len()).max(1); let table_store = &db.table_store; - let results: Vec> = futures::stream::iter(table_tasks.into_iter()) + // Fault-isolated per table: a single table's GC failure is recorded on its + // stats row (`error: Some`) and logged, never aborting the healthy tables. + // cleanup is the convergence backstop, so it must do as much as it can and + // converge on re-run rather than fail wholesale (invariant 13). + let results: Vec = futures::stream::iter(table_tasks.into_iter()) .map(|(table_key, full_path)| async move { - let ds = table_store - .open_dataset_head_for_write(&table_key, &full_path, None) - .await?; - let before_version = keep_versions - .map(|n| ds.version().version.saturating_sub(n as u64)) - .filter(|v| *v > 0); - let policy = CleanupPolicy { - before_timestamp, - before_version, - delete_unverified: false, - error_if_tagged_old_versions: false, - clean_referenced_branches: false, - delete_rate_limit: None, - }; - let removed: RemovalStats = lance::dataset::cleanup::cleanup_old_versions(&ds, policy) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - Ok(TableCleanupStats { - table_key, - bytes_removed: removed.bytes_removed, - old_versions_removed: removed.old_versions, - }) + let outcome: Result = async { + crate::failpoints::maybe_fail("cleanup.table_gc")?; + let ds = table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?; + let before_version = keep_versions + .map(|n| ds.version().version.saturating_sub(n as u64)) + .filter(|v| *v > 0); + let policy = CleanupPolicy { + before_timestamp, + before_version, + delete_unverified: false, + error_if_tagged_old_versions: false, + clean_referenced_branches: false, + delete_rate_limit: None, + }; + lance::dataset::cleanup::cleanup_old_versions(&ds, policy) + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + .await; + match outcome { + Ok(removed) => TableCleanupStats { + table_key, + bytes_removed: removed.bytes_removed, + old_versions_removed: removed.old_versions, + error: None, + }, + Err(err) => { + tracing::warn!( + target: "omnigraph::cleanup", + table = %table_key, + error = %err, + "version GC failed for table; other tables unaffected", + ); + TableCleanupStats { + table_key, + bytes_removed: 0, + old_versions_removed: 0, + error: Some(err.to_string()), + } + } + } }) .buffer_unordered(concurrency) .collect() .await; - results.into_iter().collect() + Ok(results) +} + +/// Outcome of [`reconcile_orphaned_branches`]: the `(owner, branch)` pairs +/// reclaimed and the `(owner, error)` pairs that failed, where `owner` is a +/// table key (e.g. `node:Person`) or `"_graph_commits"`. Per-owner failures are +/// isolated and recorded here, not propagated — the next reconcile converges. +#[derive(Debug, Clone, Default)] +pub struct BranchReconcileStats { + pub reclaimed: Vec<(String, String)>, + pub failures: Vec<(String, String)>, +} + +/// Drop every per-table and commit-graph Lance branch that the manifest no +/// longer references. +/// +/// Orphaned forks arise when a `branch_delete` flips the manifest authority +/// (atomic) but a downstream best-effort reclaim does not complete. They are +/// unreachable through any snapshot — no manifest entry can name them — yet +/// they pin their `tree/{branch}/` storage and can block reusing the branch +/// name. This is the guaranteed convergence backstop: it is idempotent and +/// derived purely from the manifest authority, so it no-ops once everything is +/// reconciled, and it would harmlessly find nothing if a future Lance atomic +/// multi-dataset branch op prevented orphans from forming. +/// +/// The keep-set is the full (unfiltered) manifest branch list, so system +/// branches' forks are never reclaimed; `main`/default is not a named Lance +/// branch and so is never a candidate. Referencing children are dropped before +/// parents (Lance refuses to delete a referenced parent) by ordering longest +/// branch names first. +pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result { + use std::collections::HashSet; + + let keep: HashSet = db + .coordinator + .read() + .await + .all_branches() + .await? + .into_iter() + .collect(); + + let resolved = db.resolved_branch_target(None).await?; + let snapshot = resolved.snapshot; + let table_targets: Vec<(String, String)> = all_table_keys(&db.catalog()) + .into_iter() + .filter_map(|table_key| { + let entry = snapshot.entry(&table_key)?; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + Some((table_key, full_path)) + }) + .collect(); + + let mut stats = BranchReconcileStats::default(); + + // Per-table fault isolation: one table's transient failure is recorded and + // logged, never aborting the rest of the sweep. + for (table_key, full_path) in table_targets { + let listed = match db.table_store.list_branches(&full_path).await { + Ok(listed) => listed, + Err(err) => { + tracing::warn!( + target: "omnigraph::cleanup", + table = %table_key, + error = %err, + "listing branches failed during reconcile; skipping table", + ); + stats.failures.push((table_key.clone(), err.to_string())); + continue; + } + }; + for branch in orphan_branches(listed, &keep) { + let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") { + Ok(()) => db.table_store.force_delete_branch(&full_path, &branch).await, + Err(injected) => Err(injected), + }; + match outcome { + Ok(()) => stats.reclaimed.push((table_key.clone(), branch)), + Err(err) => { + tracing::warn!( + target: "omnigraph::cleanup", + table = %table_key, + branch = %branch, + error = %err, + "reclaiming orphaned fork failed; will retry next cleanup", + ); + stats.failures.push((table_key.clone(), err.to_string())); + } + } + } + } + + // Commit-graph orphans (best-effort: the dataset may not exist on a graph + // that has never committed; any failure is isolated and retried next time). + if let Err(err) = reconcile_commit_graph_orphans(db, &keep, &mut stats).await { + tracing::warn!( + target: "omnigraph::cleanup", + error = %err, + "commit-graph orphan reconcile failed; will retry next cleanup", + ); + stats.failures.push(("_graph_commits".to_string(), err.to_string())); + } + + Ok(stats) +} + +/// Commit-graph half of [`reconcile_orphaned_branches`], split out so its +/// errors can be isolated. Returns `Ok` when the commit-graph dataset is absent. +async fn reconcile_commit_graph_orphans( + db: &Omnigraph, + keep: &std::collections::HashSet, + stats: &mut BranchReconcileStats, +) -> Result<()> { + let commits_uri = crate::db::commit_graph::graph_commits_uri(db.root_uri()); + if !db.storage_adapter().exists(&commits_uri).await? { + return Ok(()); + } + let mut commit_graph = crate::db::commit_graph::CommitGraph::open(db.root_uri()).await?; + for branch in orphan_branches(commit_graph.list_branches().await?, keep) { + match commit_graph.force_delete_branch(&branch).await { + Ok(()) => stats.reclaimed.push(("_graph_commits".to_string(), branch)), + Err(err) => { + tracing::warn!( + target: "omnigraph::cleanup", + branch = %branch, + error = %err, + "reclaiming orphaned commit-graph branch failed; will retry next cleanup", + ); + stats.failures.push(("_graph_commits".to_string(), err.to_string())); + } + } + } + Ok(()) +} + +/// Filter `present` Lance branches down to those absent from the manifest +/// `keep` set, ordered children-before-parents (longest name first) so Lance's +/// referenced-parent `RefConflict` cannot block reclamation. +fn orphan_branches(present: Vec, keep: &std::collections::HashSet) -> Vec { + let mut orphans: Vec = present + .into_iter() + .filter(|branch| !keep.contains(branch)) + .collect(); + orphans.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b))); + orphans } fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec { diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 0e89c45..3ed9c43 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -483,6 +483,22 @@ pub(super) async fn open_owned_dataset_for_branch_write( Ok((ds, Some(active_branch.to_string()))) } source_branch => { + crate::failpoints::maybe_fail("fork.before_classify")?; + // Authority check before forking: re-read the live manifest. If this + // table is already forked on active_branch, a concurrent first-write + // won the race and our snapshot is stale — that is a retryable + // conflict, not an orphan. (A zombie fork is never in the manifest, + // so this only fires for a live concurrent fork.) + let live = db.snapshot_for_branch(Some(active_branch)).await?; + if let Some(entry) = live.entry(table_key) { + if entry.table_branch.as_deref() == Some(active_branch) { + return Err(OmniError::manifest_expected_version_mismatch( + table_key, + entry_version, + entry.table_version, + )); + } + } fork_dataset_from_entry_state( db, table_key, diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 46b15b0..10123b0 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -177,6 +177,45 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } + /// List the named Lance branches present on the dataset at `dataset_uri`. + /// The `cleanup` orphan reconciler diffs this against the manifest branch + /// set to find orphaned per-table forks. `main`/default is not a named + /// branch and never appears here. + pub async fn list_branches(&self, dataset_uri: &str) -> Result> { + let ds = Dataset::open(dataset_uri) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let branches = ds + .list_branches() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(branches.into_keys().collect()) + } + + /// Idempotently drop `branch` from the dataset at `dataset_uri`. + /// + /// Unlike [`delete_branch`](Self::delete_branch), this tolerates an + /// already-absent branch — both a missing contents ref (Lance's + /// `force_delete_branch` handles that) and a missing `tree/{branch}/` + /// directory (the local-store `NotFound` quirk pinned by + /// `lance_surface_guards::force_delete_branch_semantics`). Safe to call on a + /// possibly-orphaned or already-reclaimed fork. + /// + /// A branch that still has referencing descendants (`RefConflict`) is NOT + /// tolerated: that is a real ordering error and surfaces as `OmniError::Lance`. + /// Used by the eager best-effort reclaim in `cleanup_deleted_branch_tables` + /// and the `cleanup` orphan reconciler. + pub async fn force_delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> { + let mut ds = Dataset::open(dataset_uri) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + match ds.force_delete_branch(branch).await { + Ok(()) => Ok(()), + Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => Ok(()), + Err(e) => Err(OmniError::Lance(e.to_string())), + } + } + pub async fn open_dataset_at_state( &self, table_path: &str, @@ -243,21 +282,24 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string()))?; self.ensure_expected_version(&source_ds, table_key, source_version)?; - match source_ds + if source_ds .create_branch(target_branch, source_version, None) .await + .is_err() { - Ok(_) => {} - Err(create_err) => match self - .open_dataset_head(dataset_uri, Some(target_branch)) - .await - { - Ok(ds) => { - self.ensure_expected_version(&ds, table_key, source_version)?; - return Ok(ds); - } - Err(_) => return Err(OmniError::Lance(create_err.to_string())), - }, + // The target branch ref already exists. The caller + // (`open_owned_dataset_for_branch_write`) re-reads the live manifest + // before forking and returns a retryable error when a concurrent + // writer legitimately holds the fork, so reaching here means the + // manifest does NOT reference this fork: it is an orphan from an + // incomplete prior `branch_delete`. Surface the actionable cleanup + // error rather than guessing from Lance branch versions. + return Err(OmniError::manifest_conflict(format!( + "branch '{}' has orphaned table state for '{}' from an incomplete \ + prior delete; run `omnigraph cleanup` to reclaim it before reusing \ + this branch name", + target_branch, table_key + ))); } let ds = self diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 5ea71c5..149c63a 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -41,6 +41,452 @@ async fn branch_create_failpoint_triggers() { ); } +// Branch delete flips the manifest authority first, then reclaims the per-table +// forks best-effort. A failure during that reclaim (here, the +// `branch_delete.before_table_cleanup` failpoint, standing in for a transient +// object-store error) must NOT fail the call: the branch is already gone, and +// `cleanup` reconciles the stranded fork. The branch name is reusable after. +#[tokio::test] +async fn branch_delete_partial_failure_converges_via_cleanup() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut main = helpers::init_and_load(&dir).await; + + main.branch_create("feature").await.unwrap(); + let mut feature = Omnigraph::open(&uri).await.unwrap(); + helpers::mutate_branch( + &mut feature, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + drop(feature); + + let person_uri = node_table_uri(&uri, "Person"); + { + let ds = lance::Dataset::open(&person_uri).await.unwrap(); + assert!( + ds.list_branches().await.unwrap().contains_key("feature"), + "precondition: the owned table fork exists before delete" + ); + } + + // Inject a failure during per-table cleanup, AFTER the manifest authority + // flip. branch_delete must still succeed (best-effort reclaim). + { + let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return"); + main.branch_delete("feature").await.expect( + "branch_delete is best-effort after the manifest flip: a cleanup-step \ + failure must not fail the call", + ); + } + + // Authority flipped: the branch is gone. + assert_eq!(main.branch_list().await.unwrap(), vec!["main".to_string()]); + + // The eager reclaim failed, so the orphan is stranded until cleanup. + { + let ds = lance::Dataset::open(&person_uri).await.unwrap(); + assert!( + ds.list_branches().await.unwrap().contains_key("feature"), + "failed eager reclaim should leave the orphan for cleanup to reconcile" + ); + } + + // cleanup converges: the orphan is reclaimed. + main.cleanup(omnigraph::db::CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .unwrap(); + { + let ds = lance::Dataset::open(&person_uri).await.unwrap(); + assert!( + !ds.list_branches().await.unwrap().contains_key("feature"), + "cleanup should reconcile the orphaned fork away" + ); + } + + // The name is reusable after cleanup reclaims the orphan. + main.branch_create("feature").await.unwrap(); + let mut feature2 = Omnigraph::open(&uri).await.unwrap(); + helpers::mutate_branch( + &mut feature2, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Frank")], &[("$age", 41)]), + ) + .await + .unwrap(); +} + +// Reusing a branch name whose delete left an orphaned fork (before `cleanup` +// reconciles it) must fail with a clear, actionable error pointing at +// `cleanup`, not the opaque `ExpectedVersionMismatch` that leaks from the fork +// path. The recreate itself succeeds; the first write to the previously-forked +// table is where the stale orphan collides. +#[tokio::test] +async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut main = helpers::init_and_load(&dir).await; + + main.branch_create("feature").await.unwrap(); + let mut feature = Omnigraph::open(&uri).await.unwrap(); + helpers::mutate_branch( + &mut feature, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + drop(feature); + + // Partial delete: leaves the Person fork orphaned (cleanup not yet run). + { + let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return"); + main.branch_delete("feature").await.unwrap(); + } + + // Recreate the name and write to the previously-forked table WITHOUT a + // cleanup in between. + main.branch_create("feature").await.unwrap(); + let mut feature2 = Omnigraph::open(&uri).await.unwrap(); + let err = helpers::mutate_branch( + &mut feature2, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Frank")], &[("$age", 41)]), + ) + .await + .expect_err("write should collide with the stale orphaned fork"); + + let msg = err.to_string(); + assert!( + msg.contains("cleanup") + && (msg.contains("orphan") || msg.contains("incomplete prior delete")), + "expected an actionable orphaned-fork error pointing at cleanup, got: {msg}" + ); + assert!( + !msg.contains("expected manifest table version"), + "should not surface the opaque ExpectedVersionMismatch, got: {msg}" + ); +} + +// cleanup is the guaranteed convergence backstop, so one table's transient +// failure must not abort the whole sweep. Inject a one-shot version-GC failure +// for a single table and assert: cleanup still succeeds, the failure is +// surfaced per-table in the returned stats, and the independent reconcile pass +// still reclaimed an orphan. +#[tokio::test] +async fn cleanup_isolates_single_table_failure() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut db = helpers::init_and_load(&dir).await; + + // Forge an orphaned fork on the Person table (a reconcile target). + let person_uri = node_table_uri(&uri, "Person"); + { + let mut ds = lance::Dataset::open(&person_uri).await.unwrap(); + let base = ds.version().version; + ds.create_branch("ghost", base, None).await.unwrap(); + } + + // One table's version GC fails once; the sweep must isolate it. + let _fp = ScopedFailPoint::new("cleanup.table_gc", "1*return"); + let stats = db + .cleanup(omnigraph::db::CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .expect("a single table's GC failure must not abort cleanup"); + + let errored = stats.iter().filter(|s| s.error.is_some()).count(); + assert_eq!( + errored, 1, + "exactly one table's GC failure should be surfaced in stats, got {errored}" + ); + assert!( + stats.len() >= 4, + "every node+edge table should still appear in the stats" + ); + + // The reconcile pass is independent of the GC failure, so the orphan is gone. + { + let ds = lance::Dataset::open(&person_uri).await.unwrap(); + assert!( + !ds.list_branches().await.unwrap().contains_key("ghost"), + "reconcile should reclaim the orphan despite the GC failure" + ); + } +} + +// Companion to the version-GC isolation test, exercising the OTHER cleanup +// loop: a force-delete failure while reconciling one orphaned fork must be +// isolated (logged, not propagated) so the sweep continues, and a later +// cleanup converges. This is the loop the Devin finding was about. +#[tokio::test] +async fn cleanup_isolates_reconcile_failure() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut db = helpers::init_and_load(&dir).await; + + // Forge an orphaned fork the reconcile pass will try to reclaim. + let person_uri = node_table_uri(&uri, "Person"); + { + let mut ds = lance::Dataset::open(&person_uri).await.unwrap(); + let base = ds.version().version; + ds.create_branch("ghost", base, None).await.unwrap(); + } + + // Inject a one-shot failure into the reconcile force-delete. The sweep must + // not abort. + { + let _fp = ScopedFailPoint::new("cleanup.reconcile_fork", "1*return"); + db.cleanup(omnigraph::db::CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .expect("a reconcile force-delete failure must not abort cleanup"); + } + // The blocked orphan is still present (the failure was isolated, not retried). + { + let ds = lance::Dataset::open(&person_uri).await.unwrap(); + assert!( + ds.list_branches().await.unwrap().contains_key("ghost"), + "the orphan whose reclaim was injected-to-fail should remain" + ); + } + // A second cleanup with no injected failure converges. + db.cleanup(omnigraph::db::CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .unwrap(); + { + let ds = lance::Dataset::open(&person_uri).await.unwrap(); + assert!( + !ds.list_branches().await.unwrap().contains_key("ghost"), + "the second cleanup should reconcile the orphan" + ); + } +} + +// The cleanup reconciler must reclaim orphaned commit-graph branches, not just +// per-table forks. A delete whose best-effort commit-graph reclaim fails leaves +// a commit-graph orphan; the next cleanup must drop it. +#[tokio::test] +async fn cleanup_reclaims_orphaned_commit_graph_branch() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut db = helpers::init_and_load(&dir).await; + + db.branch_create("feature").await.unwrap(); + // Delete, failing the commit-graph reclaim → commit-graph "feature" orphan + // (manifest branch gone, commit-graph branch left behind). + { + let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return"); + db.branch_delete("feature").await.unwrap(); + } + + let commits_uri = format!("{}/_graph_commits.lance", uri.trim_end_matches('/')); + { + let ds = lance::Dataset::open(&commits_uri).await.unwrap(); + assert!( + ds.list_branches().await.unwrap().contains_key("feature"), + "precondition: the commit-graph branch should be orphaned after the failed reclaim" + ); + } + + db.cleanup(omnigraph::db::CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .unwrap(); + + { + let ds = lance::Dataset::open(&commits_uri).await.unwrap(); + assert!( + !ds.list_branches().await.unwrap().contains_key("feature"), + "cleanup should reclaim the orphaned commit-graph branch" + ); + } +} + +// A branch_delete whose best-effort commit-graph reclaim fails leaves a +// commit-graph "zombie" branch. Recreating that name must heal the zombie and +// succeed (branch_create force-deletes a stale commit-graph ref since the +// manifest branch is created fresh), instead of dying on the leftover ref. +#[tokio::test] +async fn branch_create_recreates_over_commit_graph_zombie() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA) + .await + .unwrap(); + + db.branch_create("feature").await.unwrap(); + { + // Fail the best-effort commit-graph reclaim → commit-graph "feature" + // zombie survives the delete (manifest authority still flips). + let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return"); + db.branch_delete("feature").await.unwrap(); + } + assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]); + + db.branch_create("feature") + .await + .expect("branch_create should heal the zombie commit-graph branch and succeed"); + assert!( + db.branch_list() + .await + .unwrap() + .contains(&"feature".to_string()) + ); +} + +// branch_create is authority-then-derived: if the derived commit-graph branch +// cannot be created, the manifest branch (the authority) must be rolled back so +// the branch does not half-exist. The existing failpoint fires right after the +// manifest create, standing in for any post-authority failure. +#[tokio::test] +async fn branch_create_rolls_back_manifest_on_commit_graph_failure() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA) + .await + .unwrap(); + + let err = { + let _fp = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return"); + db.branch_create("feature").await.unwrap_err() + }; + assert!( + !db.branch_list() + .await + .unwrap() + .contains(&"feature".to_string()), + "branch_create must roll back the manifest branch when the derived \ + commit-graph branch fails, got error: {err}" + ); +} + +// A fork collision must be classified by the manifest authority, not by Lance +// branch versions. When a concurrent first-write legitimately wins the fork +// race, the loser sees a version mismatch — but that is a stale snapshot, not +// an orphan, so it must be a retryable "refresh and retry", never a misleading +// "run cleanup". +// +// Ordering is made deterministic (no sleeps) via a callback at the fork point: +// `compare_exchange` lets only the FIRST arrival (writer A) record readiness and +// block until released; later arrivals (writer B) fall through. The test waits +// on the readiness flag, lets B win and commit the fork, then releases A. +static FORK_A_AT_POINT: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); +static FORK_RELEASE_A: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); + +#[tokio::test(flavor = "multi_thread")] +async fn fork_collision_with_live_concurrent_fork_is_retryable() { + use std::sync::atomic::Ordering::SeqCst; + + let _scenario = FailScenario::setup(); + FORK_A_AT_POINT.store(false, SeqCst); + FORK_RELEASE_A.store(false, SeqCst); + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let main = helpers::init_and_load(&dir).await; + main.branch_create("feature").await.unwrap(); + + // First arrival (A) records readiness and blocks until released; the rest + // (B) fall through immediately. Bounded spin so a mistake can't hang forever. + fail::cfg_callback("fork.before_classify", || { + if FORK_A_AT_POINT + .compare_exchange(false, true, SeqCst, SeqCst) + .is_ok() + { + for _ in 0..2000 { + if FORK_RELEASE_A.load(SeqCst) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(5)); + } + } + }) + .unwrap(); + + let uri_a = uri.clone(); + let writer_a = tokio::spawn(async move { + let mut a = Omnigraph::open(&uri_a).await.unwrap(); + helpers::mutate_branch( + &mut a, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + }); + + // Wait (bounded) until A is parked at the fork point. + for _ in 0..600 { + if FORK_A_AT_POINT.load(SeqCst) { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + assert!( + FORK_A_AT_POINT.load(SeqCst), + "writer A never reached the fork point" + ); + + // B wins the fork and commits it. + let mut b = Omnigraph::open(&uri).await.unwrap(); + helpers::mutate_branch( + &mut b, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Frank")], &[("$age", 41)]), + ) + .await + .unwrap(); + + // Release A; it resumes, re-reads the manifest, and sees the fork is live. + FORK_RELEASE_A.store(true, SeqCst); + let err = writer_a + .await + .unwrap() + .expect_err("A's stale-snapshot fork should be a retryable conflict"); + fail::remove("fork.before_classify"); + + let msg = err.to_string(); + assert!( + !msg.contains("cleanup"), + "a live concurrent fork must not be misclassified as an orphan, got: {msg}" + ); + assert!( + msg.contains("refresh and retry") || msg.contains("expected manifest table version"), + "expected a retryable stale-view error, got: {msg}" + ); +} + #[tokio::test(flavor = "multi_thread")] async fn graph_publish_failpoint_triggers_before_commit_append() { let _scenario = FailScenario::setup(); diff --git a/crates/omnigraph/tests/lance_surface_guards.rs b/crates/omnigraph/tests/lance_surface_guards.rs index b65a808..ed1f22e 100644 --- a/crates/omnigraph/tests/lance_surface_guards.rs +++ b/crates/omnigraph/tests/lance_surface_guards.rs @@ -242,3 +242,51 @@ async fn _compile_delete_result_field_shape() -> lance::Result<()> { let _num_deleted: u64 = result.num_deleted_rows; Ok(()) } + +// --- Guard 9: force_delete_branch semantics -------------------------------- +// +// The branch-delete reconciler (`db/omnigraph/optimize.rs::reconcile_orphaned_branches`) +// and the eager best-effort reclaim in `cleanup_deleted_branch_tables` call +// `force_delete_branch` to drop orphaned branch refs. The single-authority +// design relies on three facts pinned here: +// 1. plain `delete_branch` errors on a missing ref (so the design uses the +// force variant instead); +// 2. `force_delete_branch` removes an existing (forked) branch — the orphan +// case, where a `tree/{branch}/` exists; +// 3. `force_delete_branch` on a *fully-absent* branch (no tree dir) still +// errors on the local store, because `remove_dir_all`'s NotFound is not +// caught for Lance's native error variant. `TableStore::force_delete_branch` +// wraps this to be fully idempotent. Pin the raw quirk so a future Lance +// fix (which would let us simplify the wrapper) is noticed. + +#[tokio::test] +async fn force_delete_branch_semantics() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().join("guard9.lance"); + let uri = uri.to_str().unwrap(); + let mut ds = fresh_dataset(uri).await; + + // (1) Plain delete of a never-created branch errors (RefNotFound). + assert!( + ds.delete_branch("nope").await.is_err(), + "Dataset::delete_branch on a missing ref should error; if this is now \ + Ok, the reconciler could drop the force variant." + ); + + // (2) force_delete_branch removes an existing (forked) branch. + let base = ds.version().version; + ds.create_branch("feature", base, None).await.unwrap(); + ds.force_delete_branch("feature").await.unwrap(); + assert!( + !ds.list_branches().await.unwrap().contains_key("feature"), + "force_delete_branch should remove an existing branch ref" + ); + + // (3) Quirk: force_delete on a fully-absent branch errors on the local + // store (worked around by TableStore::force_delete_branch). + assert!( + ds.force_delete_branch("never").await.is_err(), + "force_delete_branch on a fully-absent branch no longer errors — \ + TableStore::force_delete_branch's NotFound tolerance can be simplified." + ); +} diff --git a/crates/omnigraph/tests/maintenance.rs b/crates/omnigraph/tests/maintenance.rs index 3c6ab30..722bdc4 100644 --- a/crates/omnigraph/tests/maintenance.rs +++ b/crates/omnigraph/tests/maintenance.rs @@ -7,11 +7,24 @@ mod helpers; use std::time::Duration; +use lance::Dataset; use omnigraph::db::{CleanupPolicyOptions, Omnigraph}; use omnigraph::loader::{LoadMode, load_jsonl}; use helpers::{TEST_DATA, TEST_SCHEMA, count_rows, init_and_load}; +/// Filesystem URI of a node sub-table, mirroring the engine's layout +/// (FNV-1a of the type name under `nodes/`). Matches the helper in +/// `failpoints.rs`; used to inspect/forge Lance branches directly in tests. +fn node_table_uri(root: &str, type_name: &str) -> String { + let mut hash: u64 = 0xcbf2_9ce4_8422_2325; + for &b in type_name.as_bytes() { + hash ^= b as u64; + hash = hash.wrapping_mul(0x100_0000_01b3); + } + format!("{}/nodes/{hash:016x}", root.trim_end_matches('/')) +} + #[tokio::test] async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() { let dir = tempfile::tempdir().unwrap(); @@ -158,3 +171,59 @@ async fn cleanup_then_optimize_preserves_rows_and_table_remains_writable() { .unwrap(); assert_eq!(count_rows(&db, "node:Person").await, people_before); } + +#[tokio::test] +async fn cleanup_reconciles_orphaned_branch_forks() { + // An incomplete prior `branch_delete` can leave a per-table Lance branch + // that the manifest no longer references (a "zombie" fork). It is + // unreachable through any snapshot but pins its `tree/{branch}/` storage. + // `cleanup` must reconcile it away: drop every Lance branch absent from the + // manifest authority, without touching `main`. + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut db = init_and_load(&dir).await; + + let people_before = count_rows(&db, "node:Person").await; + assert!(people_before > 0, "fixture should seed Person rows"); + + // Forge an orphaned fork the manifest never knew about. + let person_uri = node_table_uri(&uri, "Person"); + { + let mut ds = Dataset::open(&person_uri).await.unwrap(); + let base = ds.version().version; + ds.create_branch("ghost", base, None).await.unwrap(); + assert!( + ds.list_branches().await.unwrap().contains_key("ghost"), + "precondition: orphaned fork staged" + ); + } + + db.cleanup(CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .unwrap(); + + // Orphan reclaimed; main untouched. + { + let ds = Dataset::open(&person_uri).await.unwrap(); + assert!( + !ds.list_branches().await.unwrap().contains_key("ghost"), + "cleanup should reconcile the orphaned 'ghost' fork away" + ); + } + assert_eq!( + count_rows(&db, "node:Person").await, + people_before, + "cleanup must not disturb main while reconciling orphans" + ); + + // Idempotent: a second cleanup with the orphan already gone is a no-op. + db.cleanup(CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .unwrap(); +} diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 70477d4..0cf295c 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -99,6 +99,7 @@ Use it this way: | Multi-table commit | Manifest CAS plus recovery sidecars; not a single Lance primitive | [writes.md](writes.md), [architecture.md](architecture.md) | | Constructive mutations | In-memory `MutationStaging`, one end-of-query table commit per touched table, then one manifest publish | [writes.md](writes.md), [execution.md](execution.md) | | Deletes | Inline-commit residual; delete-only queries allowed, mixed insert/update/delete rejected by D2 | [query-language.md](../user/query-language.md), [writes.md](writes.md) | +| Branch delete | Manifest is the single authority, flipped atomically first; per-table forks + commit-graph branch are derived state, reclaimed best-effort (`force_delete_branch`) with the `cleanup` reconciler as the guaranteed backstop. Reusing a name whose reclaim failed before `cleanup` surfaces an actionable error | [branches-commits.md](../user/branches-commits.md), [maintenance.md](../user/maintenance.md) | | Schema validation | Type checks, required fields, defaults, edge endpoint checks, and edge cardinality are enforced on write paths | [schema-language.md](../user/schema-language.md), [execution.md](execution.md) | | Unique constraints | Intra-batch and write-path checks exist; full cross-version uniqueness is still a gap | [schema-language.md](../user/schema-language.md) | | Storage trait | `TableStorage` exists as the sealed staged-write surface; full call-site migration and capability/stat surfaces are incomplete | [writes.md](writes.md), [architecture.md](architecture.md) | @@ -107,6 +108,13 @@ Use it this way: | Auth | Bearer token hashing and server-side actor resolution are implemented at the HTTP boundary | [server.md](../user/server.md), [policy.md](../user/policy.md) | | Tests | Tempdir-backed Lance tests are the current substrate; there is no `MemStorage` test backend | [testing.md](testing.md) | +The branch-delete reconciler is authority-derived: it reclaims orphaned forks +today and degrades to a no-op if Lance ships an atomic multi-dataset branch +operation, so the design composes with that future rather than blocking it. This +is the same shape as invariant 7 (indexes are derived state); prefer it over a +recovery-sidecar-style approach for any new multi-dataset metadata operation, +since the sidecar would be scaffolding to remove once the substrate closes the gap. + ## Known Gaps Do not hide these behind invariant wording. Either move them forward or keep diff --git a/docs/dev/lance.md b/docs/dev/lance.md index ef83f2c..100da6f 100644 --- a/docs/dev/lance.md +++ b/docs/dev/lance.md @@ -175,7 +175,8 @@ Migration from Lance 4.0.0 → 6.0.1 landed in this cycle (DataFusion 52 → 53, - **Lance #6658 closed** (2026-05-14) but `DeleteBuilder::execute_uncommitted` did **not** ship in v6.0.1 — binary search across the release stream shows it first appears in `v7.0.0-beta.10` (the closing commits landed on main but didn't backport to the 6.x line). Tracked as MR-A: migrate `delete_where` to staged, retire the parse-time D2 mutation rule, extend recovery sidecar coverage. **Gated on the Lance v7.x bump**, not this PR. v7.0.0-rc.1 dropped 2026-05-21. - **Lance #6666 still open** (`build_index_metadata_from_segments` public): vector-index two-phase blocked; inline `create_vector_index` residual retained. - **Lance #6877 still open** (`MergeInsertBuilder` dup-rowid): PR #109's `SourceDedupeBehavior::FirstSeen` + `check_batch_unique_by_keys` precondition stay load-bearing. +- **`Dataset::force_delete_branch`** (`branches().delete(name, force=true)`, dataset.rs:524) tolerates a missing branch-*contents* ref (vs plain `delete_branch`'s `RefNotFound`), but on the local store still errors `NotFound` if the branch `tree/` directory is fully absent (`remove_dir_all`'s NotFound is not caught for Lance's native error variant, refs.rs:526-549). Both variants still refuse a branch with referencing descendants (`RefConflict`). `TableStore::force_delete_branch` wraps this to be fully idempotent (tolerates already-absent). The single-authority branch-delete redesign uses it for orphan reclamation (eager best-effort reclaim + cleanup reconciler). Pinned by `lance_surface_guards.rs::force_delete_branch_semantics`. Branch delete is "flip the ref atomically, then `remove_dir_all(tree/{branch})`"; branch-exclusive data lives under `tree/{branch}/` so a drop reclaims it immediately without touching `main`. -Surface guards added: `crates/omnigraph/tests/lance_surface_guards.rs` (8 named guards; 3 runtime + 5 compile-only). Future Lance bumps re-run this file first as the smoke check. Two additional guards from the original plan deferred to follow-up (`manifest_cas_returns_row_level_contention_variant` needs full publisher-race harness; `table_version_metadata_byte_compatible_with_v4` needs `pub(crate)` reach extension). +Surface guards added: `crates/omnigraph/tests/lance_surface_guards.rs` (9 named guards; 4 runtime + 5 compile-only). Future Lance bumps re-run this file first as the smoke check. Two additional guards from the original plan deferred to follow-up (`manifest_cas_returns_row_level_contention_variant` needs full publisher-race harness; `table_version_metadata_byte_compatible_with_v4` needs `pub(crate)` reach extension). Bump this date stanza on the next alignment pass. diff --git a/docs/user/branches-commits.md b/docs/user/branches-commits.md index de6c653..c1894f9 100644 --- a/docs/user/branches-commits.md +++ b/docs/user/branches-commits.md @@ -8,10 +8,10 @@ Lance supports branching at the dataset level: a branch is a named lineage of ve OmniGraph builds *graph branches* on top by branching every sub-table coherently: -- `branch_create(name)` / `branch_create_from(target, name)` — disallowed name `main`; fails if branch exists; ensures the schema-apply lock is idle. +- `branch_create(name)` / `branch_create_from(target, name)` — disallowed name `main`; fails if branch exists; ensures the schema-apply lock is idle. Atomic and authority-first like `branch_delete`: it flips the `__manifest` branch (authority), then creates the derived commit-graph branch, force-dropping any orphaned commit-graph ref left by an incomplete prior delete (the manifest branch is fresh, so a same-named commit-graph branch is provably a zombie). If commit-graph creation fails, the manifest branch is rolled back so the name never half-exists. - `branch_list()` — returns public branches, **filters internal** `__run__…` and `__schema_apply_lock__` prefixes. -- `branch_delete(name)` — refuses if there are descendants or active runs on the branch; cleans up owned per-branch fragments. -- **Lazy forking**: a branch only forks a sub-table when that sub-table is first mutated on it. Pure-read branches share fragments with their source. +- `branch_delete(name)` — refuses if there are descendants or active runs on the branch. The manifest is the single authority for branch existence: deletion flips the `__manifest` branch ref first (one atomic op), after which the branch is gone from every snapshot. The owned per-table forks and the commit-graph branch are derived state, reclaimed best-effort with `force_delete_branch` after the flip. A failure during that reclaim (transient object-store error) does not fail the call or block the authority flip; the leftover forks are unreachable orphans that the [`cleanup`](maintenance.md) reconciler converges. One consequence: if a delete's best-effort reclaim fails, reusing that branch name before the next `cleanup` surfaces a clear error pointing at `cleanup` (the stale fork would otherwise collide on first write). +- **Lazy forking**: a branch only forks a sub-table when that sub-table is first mutated on it. Pure-read branches share fragments with their source. A fork collision is classified by the manifest authority, not by Lance branch versions: if the live manifest already records the fork on the active branch, a concurrent first-write won and the caller gets a retryable "refresh and retry"; if the manifest does not, a physical branch there is an orphan and the caller is pointed at `cleanup`. - `sync_branch(branch)` — re-binds the in-memory handle to the latest head of the branch. ## L2 — Commit graph (`db/commit_graph.rs`) diff --git a/docs/user/maintenance.md b/docs/user/maintenance.md index 08ae8da..9839ea1 100644 --- a/docs/user/maintenance.md +++ b/docs/user/maintenance.md @@ -14,9 +14,15 @@ - Lance `cleanup_old_versions()` per table. - Removes manifests (and their unique fragments) older than the retention policy. - `CleanupPolicyOptions { keep_versions: Option, older_than: Option }` — at least one is required. -- Returns `[TableCleanupStats { table_key, bytes_removed, old_versions_removed }]`. +- Returns `[TableCleanupStats { table_key, bytes_removed, old_versions_removed, error }]`. +- **Fault-isolated per table.** A single table's transient failure (version GC or + orphan reclaim) is recorded on that table's stats row (`error: Some(..)`, logged + via `tracing`) and never aborts the healthy tables — cleanup is the convergence + backstop, so it does as much as it can and converges on re-run. The CLI reports + any failed tables; rerun `cleanup` to retry them. - CLI guards with `--confirm`; without it, prints a preview line. - **Recovery floor:** `--keep < 3` may garbage-collect Lance versions that the open-time recovery sweep needs as a rollback target (the sweep restores to the branch's manifest-pinned table version, which is HEAD-1 in the typical Phase B → Phase C drift case). Default `--keep 10` is safe. +- **Orphaned-branch reconciliation:** before the version GC, cleanup runs `reconcile_orphaned_branches`, which `force_delete_branch`es any per-table or commit-graph Lance branch absent from the manifest branch list. These orphans arise when a `branch_delete` flips the manifest authority but a downstream best-effort reclaim does not complete (see [branches-commits.md](branches-commits.md)). The reconciler is authority-derived and idempotent (it no-ops once nothing is orphaned), runs regardless of the `keep_versions` / `older_than` values (those gate version GC only), and never reclaims `main` or system-branch forks. Reclaimed forks are logged via `tracing::info`. ## Tombstones