Merge pull request #43 from ModernRelay/fix/mr-674-ephemeral-run-branches

Delete __run__ branches on every terminal state (MR-674)
This commit is contained in:
Andrew Altshuler 2026-04-21 14:35:49 +03:00 committed by GitHub
commit 102ccc05f7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 134 additions and 54 deletions

View file

@ -505,9 +505,7 @@ impl Omnigraph {
}
for run in self.list_runs().await? {
if run.target_branch == branch
&& matches!(run.status, RunStatus::Running | RunStatus::Failed)
{
if run.target_branch == branch && matches!(run.status, RunStatus::Running) {
return Err(OmniError::manifest_conflict(format!(
"cannot delete branch '{}' while run '{}' targeting it is {}",
branch,
@ -591,12 +589,21 @@ impl Omnigraph {
.into_iter()
.filter(|run| {
run.target_branch == branch
&& matches!(run.status, RunStatus::Published | RunStatus::Aborted)
&& matches!(
run.status,
RunStatus::Published | RunStatus::Aborted | RunStatus::Failed
)
})
.map(|run| run.run_branch)
.collect::<Vec<_>>();
let live_branches: HashSet<String> =
self.coordinator.all_branches().await?.into_iter().collect();
for run_branch in terminal_run_branches {
if !live_branches.contains(&run_branch) {
continue;
}
match self.delete_branch_storage_only(&run_branch).await {
Ok(()) => {}
Err(OmniError::Manifest(err)) if err.kind == ManifestErrorKind::NotFound => {}
@ -776,9 +783,7 @@ impl Omnigraph {
let run = self.get_run(run_id).await?;
match run.status {
RunStatus::Running | RunStatus::Failed => {
let updated = run.with_status(RunStatus::Aborted, None)?;
self.coordinator.append_run_record(&updated).await?;
Ok(updated)
self.terminate_run(&run, RunStatus::Aborted, None).await
}
RunStatus::Published => Err(OmniError::manifest_conflict(format!(
"run '{}' is already published",
@ -795,11 +800,7 @@ impl Omnigraph {
self.ensure_schema_state_valid().await?;
let run = self.get_run(run_id).await?;
match run.status {
RunStatus::Running => {
let updated = run.with_status(RunStatus::Failed, None)?;
self.coordinator.append_run_record(&updated).await?;
Ok(updated)
}
RunStatus::Running => self.terminate_run(&run, RunStatus::Failed, None).await,
RunStatus::Failed => Ok(run),
RunStatus::Published => Err(OmniError::manifest_conflict(format!(
"run '{}' is already published",
@ -812,6 +813,22 @@ impl Omnigraph {
}
}
/// Append a terminal-state run record and delete the `__run__` branch.
/// The status record is authoritative; the branch is scaffolding. Delete
/// errors are swallowed — a later `branch_delete` of the target will
/// retry via `cleanup_terminal_run_branches_for_target`.
async fn terminate_run(
&mut self,
run: &RunRecord,
status: RunStatus,
published_snapshot_id: Option<String>,
) -> Result<RunRecord> {
let updated = run.with_status(status, published_snapshot_id)?;
self.coordinator.append_run_record(&updated).await?;
let _ = self.delete_branch_storage_only(&updated.run_branch).await;
Ok(updated)
}
pub async fn publish_run(&mut self, run_id: &RunId) -> Result<SnapshotId> {
self.publish_run_as(run_id, None).await
}
@ -869,11 +886,12 @@ impl Omnigraph {
self.audit_actor_id = previous_actor;
publish_result?;
let published_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
let updated = run.with_status(
self.terminate_run(
&run,
RunStatus::Published,
Some(published_snapshot_id.as_str().to_string()),
)?;
self.coordinator.append_run_record(&updated).await?;
)
.await?;
Ok(published_snapshot_id)
}
@ -1723,19 +1741,17 @@ edge WorksAt: Person -> Company
}
#[tokio::test]
async fn test_apply_schema_succeeds_after_load_creates_published_run_branch() {
// Regression for MR-670: schema apply used to fail after any load or
// change because published __run__ branches count as "non-main" in
// the blocking-branch check, and there is no CLI path to clean them
// up (branch_delete rejects internal refs; run abort rejects
// Published runs). Published run branches are intentionally retained
// for post-publish inspection — schema apply now filters them out
// instead of requiring their deletion.
async fn test_apply_schema_succeeds_after_load() {
// MR-670 + MR-674: schema apply used to be blocked by leftover
// __run__ branches. MR-670 added a defense-in-depth filter that
// skips internal system branches. MR-674 made run branches
// ephemeral on every terminal state, so in practice no __run__
// branch survives publish — but the filter still guards the
// invariant.
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
// A load goes through a __run__ branch which remains after publish.
crate::loader::load_jsonl(
&mut db,
r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
@ -1744,17 +1760,13 @@ edge WorksAt: Person -> Company
.await
.unwrap();
// Confirm at the coordinator level that a published run branch did
// get created and persists after publish.
let all_branches = db.coordinator.all_branches().await.unwrap();
assert!(
all_branches.iter().any(|b| is_internal_run_branch(b)),
"expected at least one internal run branch after load, got: {:?}",
!all_branches.iter().any(|b| is_internal_run_branch(b)),
"MR-674: run branch should be deleted after publish, got: {:?}",
all_branches
);
// Schema apply should succeed — the filter skips internal system
// branches, including __run__ ones.
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",

View file

@ -8,7 +8,7 @@ use lance::Dataset;
use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{Omnigraph, ReadTarget, RunStatus};
use omnigraph::error::OmniError;
use omnigraph::error::{ManifestErrorKind, OmniError};
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::*;
@ -170,7 +170,7 @@ async fn publish_run_merges_internal_branch_into_target_and_marks_record() {
}
#[tokio::test]
async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspection() {
async fn abort_run_leaves_target_unchanged_and_deletes_run_branch() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let run = db.begin_run("main", Some("abort-test")).await.unwrap();
@ -197,7 +197,7 @@ async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspec
.unwrap();
assert_eq!(main_qr.num_rows(), 0);
let run_qr = db
let err = db
.query(
ReadTarget::branch(run.run_branch.as_str()),
TEST_QUERIES,
@ -205,8 +205,13 @@ async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspec
&params(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(run_qr.num_rows(), 1);
.unwrap_err();
assert!(
matches!(err, OmniError::Manifest(ref e) if e.kind == ManifestErrorKind::NotFound)
|| matches!(err, OmniError::Lance(_)),
"run branch should be gone after abort, got: {}",
err
);
}
#[tokio::test]
@ -292,21 +297,22 @@ async fn public_load_preserves_staged_edge_ids_on_publish() {
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
let run = db.begin_run("main", Some("preserve-ids-load")).await.unwrap();
db.load(&run.run_branch, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let runs = latest_runs(uri).await;
let run_branch = runs[0].run_branch.clone();
let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
let mut run_ids = collect_column_strings(
&read_table_branch(&db, run_branch.as_str(), "edge:Knows").await,
let mut staged_ids = collect_column_strings(
&read_table_branch(&db, run.run_branch.as_str(), "edge:Knows").await,
"id",
);
staged_ids.sort();
db.publish_run(&run.run_id).await.unwrap();
let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
main_ids.sort();
run_ids.sort();
assert_eq!(main_ids, run_ids);
assert_eq!(main_ids, staged_ids);
}
#[tokio::test]
@ -381,11 +387,14 @@ async fn public_mutation_uses_hidden_transactional_run_and_publishes_it() {
#[tokio::test]
async fn public_mutation_preserves_staged_edge_ids_on_publish() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = init_and_load(&dir).await;
let run = db
.begin_run("main", Some("preserve-ids-mutation"))
.await
.unwrap();
db.mutate(
"main",
run.run_branch.as_str(),
MUTATION_QUERIES,
"add_friend",
&params(&[("$from", "Alice"), ("$to", "Diana")]),
@ -393,17 +402,17 @@ async fn public_mutation_preserves_staged_edge_ids_on_publish() {
.await
.unwrap();
let runs = latest_runs(uri).await;
let latest = runs.last().unwrap();
let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
let mut run_ids = collect_column_strings(
&read_table_branch(&db, latest.run_branch.as_str(), "edge:Knows").await,
let mut staged_ids = collect_column_strings(
&read_table_branch(&db, run.run_branch.as_str(), "edge:Knows").await,
"id",
);
staged_ids.sort();
db.publish_run(&run.run_id).await.unwrap();
let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
main_ids.sort();
run_ids.sort();
assert_eq!(main_ids, run_ids);
assert_eq!(main_ids, staged_ids);
}
#[tokio::test]
@ -531,3 +540,62 @@ async fn public_mutation_records_actor_on_run_and_published_commit() {
.unwrap();
assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
}
#[tokio::test]
async fn run_branches_do_not_accumulate_across_repeated_loads() {
// MR-674: run branches are transactional scaffolding. Every terminal
// state (Published, Aborted, Failed) deletes the branch. Verifying the
// invariant end-to-end: after 10 publishes and one abort, only main
// should remain.
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
for i in 0..10 {
let payload = format!(
r#"{{"type":"Person","data":{{"name":"p{}","age":{}}}}}"#,
i, i
);
load_jsonl(&mut db, &payload, LoadMode::Append)
.await
.unwrap();
}
let aborted_run = db.begin_run("main", Some("abort-me")).await.unwrap();
db.abort_run(&aborted_run.run_id).await.unwrap();
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
let all_branches = Omnigraph::open(uri)
.await
.unwrap()
.branch_list()
.await
.unwrap();
assert_eq!(all_branches, vec!["main".to_string()]);
}
#[tokio::test]
async fn failed_load_deletes_run_branch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let bad = r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"edge":"Knows","from":"Alice","to":"Missing"}"#;
let _ = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;
let runs = latest_runs(uri).await;
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].status, "failed");
let err = db
.snapshot_of(ReadTarget::branch(runs[0].run_branch.as_str()))
.await
.unwrap_err();
assert!(
matches!(err, OmniError::Manifest(ref e) if e.kind == ManifestErrorKind::NotFound)
|| matches!(err, OmniError::Lance(_)),
"failed run's branch should be gone, got: {}",
err
);
}