From 2df578eab8c4597c9a93914406aa23742407e7b0 Mon Sep 17 00:00:00 2001 From: andrew Date: Tue, 21 Apr 2026 14:15:39 +0300 Subject: [PATCH] Delete __run__ branches on every terminal state (MR-674) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Run branches are transactional scaffolding — the durable audit lives on RunRecord. Invariant: every terminal state (Published, Aborted, Failed) deletes the __run__ branch. - Add `terminate_run` helper: appends terminal RunRecord, then deletes the run branch. Delete errors are swallowed — the record is authoritative; `cleanup_terminal_run_branches_for_target` retries on later `branch_delete` of the target. - Wire into `publish_run_as`, `abort_run`, `fail_run`. - Include `Failed` in the cleanup filter (was `Published | Aborted` only) for legacy-repo GC during branch_delete. - Cleanup now checks `coordinator.all_branches()` first to skip branches already deleted by a concurrent handle — avoids Lance NotFound when two handles publish/clean up independently. - Drop `Failed` from `ensure_branch_delete_safe` — post-fix, Failed means the branch is already gone, so there's no reason to block target deletion (MR-674 "Downstream effects"). Tests: - New regression: `run_branches_do_not_accumulate_across_repeated_loads` — 10 loads + 1 abort → `branch_list() == ["main"]`. - New `failed_load_deletes_run_branch` asserts Failed path cleans up. - Rename `abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspection` → `abort_run_leaves_target_unchanged_and_deletes_run_branch`, invert the hidden-branch assertion. - Rewrite `public_{load,mutation}_preserves_staged_edge_ids_on_publish` to capture staged IDs before publish instead of inspecting the run branch after (branch is gone now). - Update MR-670 regression test to assert the run branch is *absent* after publish. Deferred to follow-up: `--keep-run-branch` debug flag, `omnigraph run gc`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/db/omnigraph.rs | 72 ++++++++++------- crates/omnigraph/tests/runs.rs | 116 +++++++++++++++++++++------ 2 files changed, 134 insertions(+), 54 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 15e2d11..04e64c4 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -505,9 +505,7 @@ impl Omnigraph { } for run in self.list_runs().await? { - if run.target_branch == branch - && matches!(run.status, RunStatus::Running | RunStatus::Failed) - { + if run.target_branch == branch && matches!(run.status, RunStatus::Running) { return Err(OmniError::manifest_conflict(format!( "cannot delete branch '{}' while run '{}' targeting it is {}", branch, @@ -591,12 +589,21 @@ impl Omnigraph { .into_iter() .filter(|run| { run.target_branch == branch - && matches!(run.status, RunStatus::Published | RunStatus::Aborted) + && matches!( + run.status, + RunStatus::Published | RunStatus::Aborted | RunStatus::Failed + ) }) .map(|run| run.run_branch) .collect::>(); + let live_branches: HashSet = + self.coordinator.all_branches().await?.into_iter().collect(); + for run_branch in terminal_run_branches { + if !live_branches.contains(&run_branch) { + continue; + } match self.delete_branch_storage_only(&run_branch).await { Ok(()) => {} Err(OmniError::Manifest(err)) if err.kind == ManifestErrorKind::NotFound => {} @@ -776,9 +783,7 @@ impl Omnigraph { let run = self.get_run(run_id).await?; match run.status { RunStatus::Running | RunStatus::Failed => { - let updated = run.with_status(RunStatus::Aborted, None)?; - self.coordinator.append_run_record(&updated).await?; - Ok(updated) + self.terminate_run(&run, RunStatus::Aborted, None).await } RunStatus::Published => Err(OmniError::manifest_conflict(format!( "run '{}' is already published", @@ -795,11 +800,7 @@ impl Omnigraph { self.ensure_schema_state_valid().await?; let run = self.get_run(run_id).await?; match run.status { - RunStatus::Running => { - let updated = run.with_status(RunStatus::Failed, None)?; - self.coordinator.append_run_record(&updated).await?; - Ok(updated) - } + RunStatus::Running => self.terminate_run(&run, RunStatus::Failed, None).await, RunStatus::Failed => Ok(run), RunStatus::Published => Err(OmniError::manifest_conflict(format!( "run '{}' is already published", @@ -812,6 +813,22 @@ impl Omnigraph { } } + /// Append a terminal-state run record and delete the `__run__` branch. + /// The status record is authoritative; the branch is scaffolding. Delete + /// errors are swallowed — a later `branch_delete` of the target will + /// retry via `cleanup_terminal_run_branches_for_target`. + async fn terminate_run( + &mut self, + run: &RunRecord, + status: RunStatus, + published_snapshot_id: Option, + ) -> Result { + let updated = run.with_status(status, published_snapshot_id)?; + self.coordinator.append_run_record(&updated).await?; + let _ = self.delete_branch_storage_only(&updated.run_branch).await; + Ok(updated) + } + pub async fn publish_run(&mut self, run_id: &RunId) -> Result { self.publish_run_as(run_id, None).await } @@ -869,11 +886,12 @@ impl Omnigraph { self.audit_actor_id = previous_actor; publish_result?; let published_snapshot_id = self.resolve_snapshot(&run.target_branch).await?; - let updated = run.with_status( + self.terminate_run( + &run, RunStatus::Published, Some(published_snapshot_id.as_str().to_string()), - )?; - self.coordinator.append_run_record(&updated).await?; + ) + .await?; Ok(published_snapshot_id) } @@ -1723,19 +1741,17 @@ edge WorksAt: Person -> Company } #[tokio::test] - async fn test_apply_schema_succeeds_after_load_creates_published_run_branch() { - // Regression for MR-670: schema apply used to fail after any load or - // change because published __run__ branches count as "non-main" in - // the blocking-branch check, and there is no CLI path to clean them - // up (branch_delete rejects internal refs; run abort rejects - // Published runs). Published run branches are intentionally retained - // for post-publish inspection — schema apply now filters them out - // instead of requiring their deletion. + async fn test_apply_schema_succeeds_after_load() { + // MR-670 + MR-674: schema apply used to be blocked by leftover + // __run__ branches. MR-670 added a defense-in-depth filter that + // skips internal system branches. MR-674 made run branches + // ephemeral on every terminal state, so in practice no __run__ + // branch survives publish — but the filter still guards the + // invariant. let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); - // A load goes through a __run__ branch which remains after publish. crate::loader::load_jsonl( &mut db, r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#, @@ -1744,17 +1760,13 @@ edge WorksAt: Person -> Company .await .unwrap(); - // Confirm at the coordinator level that a published run branch did - // get created and persists after publish. let all_branches = db.coordinator.all_branches().await.unwrap(); assert!( - all_branches.iter().any(|b| is_internal_run_branch(b)), - "expected at least one internal run branch after load, got: {:?}", + !all_branches.iter().any(|b| is_internal_run_branch(b)), + "MR-674: run branch should be deleted after publish, got: {:?}", all_branches ); - // Schema apply should succeed — the filter skips internal system - // branches, including __run__ ones. let desired = TEST_SCHEMA.replace( " age: I32?\n}", " age: I32?\n nickname: String?\n}", diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index 76fea2c..21e9aff 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -8,7 +8,7 @@ use lance::Dataset; use omnigraph::db::commit_graph::CommitGraph; use omnigraph::db::{Omnigraph, ReadTarget, RunStatus}; -use omnigraph::error::OmniError; +use omnigraph::error::{ManifestErrorKind, OmniError}; use omnigraph::loader::{LoadMode, load_jsonl}; use helpers::*; @@ -170,7 +170,7 @@ async fn publish_run_merges_internal_branch_into_target_and_marks_record() { } #[tokio::test] -async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspection() { +async fn abort_run_leaves_target_unchanged_and_deletes_run_branch() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; let run = db.begin_run("main", Some("abort-test")).await.unwrap(); @@ -197,7 +197,7 @@ async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspec .unwrap(); assert_eq!(main_qr.num_rows(), 0); - let run_qr = db + let err = db .query( ReadTarget::branch(run.run_branch.as_str()), TEST_QUERIES, @@ -205,8 +205,13 @@ async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspec ¶ms(&[("$name", "Eve")]), ) .await - .unwrap(); - assert_eq!(run_qr.num_rows(), 1); + .unwrap_err(); + assert!( + matches!(err, OmniError::Manifest(ref e) if e.kind == ManifestErrorKind::NotFound) + || matches!(err, OmniError::Lance(_)), + "run branch should be gone after abort, got: {}", + err + ); } #[tokio::test] @@ -292,21 +297,22 @@ async fn public_load_preserves_staged_edge_ids_on_publish() { let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); - load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite) + let run = db.begin_run("main", Some("preserve-ids-load")).await.unwrap(); + db.load(&run.run_branch, TEST_DATA, LoadMode::Overwrite) .await .unwrap(); - let runs = latest_runs(uri).await; - let run_branch = runs[0].run_branch.clone(); - - let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id"); - let mut run_ids = collect_column_strings( - &read_table_branch(&db, run_branch.as_str(), "edge:Knows").await, + let mut staged_ids = collect_column_strings( + &read_table_branch(&db, run.run_branch.as_str(), "edge:Knows").await, "id", ); + staged_ids.sort(); + + db.publish_run(&run.run_id).await.unwrap(); + + let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id"); main_ids.sort(); - run_ids.sort(); - assert_eq!(main_ids, run_ids); + assert_eq!(main_ids, staged_ids); } #[tokio::test] @@ -381,11 +387,14 @@ async fn public_mutation_uses_hidden_transactional_run_and_publishes_it() { #[tokio::test] async fn public_mutation_preserves_staged_edge_ids_on_publish() { let dir = tempfile::tempdir().unwrap(); - let uri = dir.path().to_str().unwrap(); let mut db = init_and_load(&dir).await; + let run = db + .begin_run("main", Some("preserve-ids-mutation")) + .await + .unwrap(); db.mutate( - "main", + run.run_branch.as_str(), MUTATION_QUERIES, "add_friend", ¶ms(&[("$from", "Alice"), ("$to", "Diana")]), @@ -393,17 +402,17 @@ async fn public_mutation_preserves_staged_edge_ids_on_publish() { .await .unwrap(); - let runs = latest_runs(uri).await; - let latest = runs.last().unwrap(); - - let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id"); - let mut run_ids = collect_column_strings( - &read_table_branch(&db, latest.run_branch.as_str(), "edge:Knows").await, + let mut staged_ids = collect_column_strings( + &read_table_branch(&db, run.run_branch.as_str(), "edge:Knows").await, "id", ); + staged_ids.sort(); + + db.publish_run(&run.run_id).await.unwrap(); + + let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id"); main_ids.sort(); - run_ids.sort(); - assert_eq!(main_ids, run_ids); + assert_eq!(main_ids, staged_ids); } #[tokio::test] @@ -531,3 +540,62 @@ async fn public_mutation_records_actor_on_run_and_published_commit() { .unwrap(); assert_eq!(head.actor_id.as_deref(), Some("act-andrew")); } + +#[tokio::test] +async fn run_branches_do_not_accumulate_across_repeated_loads() { + // MR-674: run branches are transactional scaffolding. Every terminal + // state (Published, Aborted, Failed) deletes the branch. Verifying the + // invariant end-to-end: after 10 publishes and one abort, only main + // should remain. + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + for i in 0..10 { + let payload = format!( + r#"{{"type":"Person","data":{{"name":"p{}","age":{}}}}}"#, + i, i + ); + load_jsonl(&mut db, &payload, LoadMode::Append) + .await + .unwrap(); + } + + let aborted_run = db.begin_run("main", Some("abort-me")).await.unwrap(); + db.abort_run(&aborted_run.run_id).await.unwrap(); + + assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]); + let all_branches = Omnigraph::open(uri) + .await + .unwrap() + .branch_list() + .await + .unwrap(); + assert_eq!(all_branches, vec!["main".to_string()]); +} + +#[tokio::test] +async fn failed_load_deletes_run_branch() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let bad = r#"{"type":"Person","data":{"name":"Alice","age":30}} +{"edge":"Knows","from":"Alice","to":"Missing"}"#; + let _ = load_jsonl(&mut db, bad, LoadMode::Overwrite).await; + + let runs = latest_runs(uri).await; + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].status, "failed"); + + let err = db + .snapshot_of(ReadTarget::branch(runs[0].run_branch.as_str())) + .await + .unwrap_err(); + assert!( + matches!(err, OmniError::Manifest(ref e) if e.kind == ManifestErrorKind::NotFound) + || matches!(err, OmniError::Lance(_)), + "failed run's branch should be gone, got: {}", + err + ); +}