From 708e170dc5d491ddd2ed0480506bed42191a18ac Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 9 May 2026 20:16:12 +0000 Subject: [PATCH] engine: branch-merge revalidates target snapshot under queue --- crates/omnigraph-server/tests/server.rs | 158 +++++++++++++++--------- crates/omnigraph/src/exec/merge.rs | 24 ++++ 2 files changed, 125 insertions(+), 57 deletions(-) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 77e8cd6..f97927c 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2427,6 +2427,12 @@ mod matrix { use std::time::Duration; use tokio::sync::Barrier; + #[derive(Debug)] + pub(super) struct OpStatus { + pub status: StatusCode, + pub body: Vec, + } + pub(super) struct Harness { pub _temp: tempfile::TempDir, pub app: Router, @@ -2523,12 +2529,12 @@ mod matrix { } /// Run two ops concurrently with barrier alignment + 15s deadlock - /// timeout. Returns `(status_a, status_b)`. Panics on timeout. + /// timeout. Returns `(op_a, op_b)`. Panics on timeout. pub async fn run_pair( &self, - op_a: impl FnOnce(Router, Arc) -> tokio::task::JoinHandle, - op_b: impl FnOnce(Router, Arc) -> tokio::task::JoinHandle, - ) -> (StatusCode, StatusCode) { + op_a: impl FnOnce(Router, Arc) -> tokio::task::JoinHandle, + op_b: impl FnOnce(Router, Arc) -> tokio::task::JoinHandle, + ) -> (OpStatus, OpStatus) { let barrier = Arc::new(Barrier::new(2)); let h_a = op_a(self.app.clone(), Arc::clone(&barrier)); let h_b = op_b(self.app.clone(), Arc::clone(&barrier)); @@ -2684,12 +2690,12 @@ mod matrix { } // Helpers that build the closures for `run_pair`. Each takes a - // Router + Barrier and returns a JoinHandle yielding the status. + // Router + Barrier and returns a JoinHandle yielding the status/body. pub(super) fn op_merge( source: String, target: String, - ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { + ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { move |app: Router, barrier: Arc| { tokio::spawn(async move { barrier.wait().await; @@ -2698,17 +2704,23 @@ mod matrix { target: Some(target), }) .unwrap(); - app.oneshot( + let response = app + .oneshot( Request::builder() .uri("/branches/merge") .method(Method::POST) .header("content-type", "application/json") .body(Body::from(body)) .unwrap(), - ) - .await - .unwrap() - .status() + ) + .await + .unwrap(); + let status = response.status(); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + OpStatus { + status, + body: body.to_vec(), + } }) } } @@ -2717,7 +2729,7 @@ mod matrix { branch: String, name: String, age: i32, - ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { + ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { move |app: Router, barrier: Arc| { tokio::spawn(async move { barrier.wait().await; @@ -2728,17 +2740,23 @@ mod matrix { branch: Some(branch), }) .unwrap(); - app.oneshot( + let response = app + .oneshot( Request::builder() .uri("/change") .method(Method::POST) .header("content-type", "application/json") .body(Body::from(body)) .unwrap(), - ) - .await - .unwrap() - .status() + ) + .await + .unwrap(); + let status = response.status(); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + OpStatus { + status, + body: body.to_vec(), + } }) } } @@ -2746,7 +2764,7 @@ mod matrix { pub(super) fn op_branch_create( from: String, name: String, - ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { + ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { move |app: Router, barrier: Arc| { tokio::spawn(async move { barrier.wait().await; @@ -2755,37 +2773,49 @@ mod matrix { name, }) .unwrap(); - app.oneshot( + let response = app + .oneshot( Request::builder() .uri("/branches") .method(Method::POST) .header("content-type", "application/json") .body(Body::from(body)) .unwrap(), - ) - .await - .unwrap() - .status() + ) + .await + .unwrap(); + let status = response.status(); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + OpStatus { + status, + body: body.to_vec(), + } }) } } pub(super) fn op_branch_delete( name: String, - ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { + ) -> impl FnOnce(Router, Arc) -> tokio::task::JoinHandle { move |app: Router, barrier: Arc| { tokio::spawn(async move { barrier.wait().await; - app.oneshot( + let response = app + .oneshot( Request::builder() .uri(format!("/branches/{}", name)) .method(Method::DELETE) .body(Body::empty()) .unwrap(), - ) - .await - .unwrap() - .status() + ) + .await + .unwrap(); + let status = response.status(); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + OpStatus { + status, + body: body.to_vec(), + } }) } } @@ -2820,8 +2850,8 @@ async fn concurrent_branch_ops_morphological_matrix() { ), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] merge a", cell); - assert_eq!(sb, StatusCode::OK, "[{}] merge b", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] merge a", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] merge b", cell); h.assert_persons("target-a-cella", cell, &["EveA-cella"], &["FrankB-cella"]) .await; h.assert_persons("target-b-cella", cell, &["FrankB-cella"], &["EveA-cella"]) @@ -2846,8 +2876,8 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_merge("src-y-cellb".to_string(), "main".to_string()), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] merge x", cell); - assert_eq!(sb, StatusCode::OK, "[{}] merge y", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] merge x", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] merge y", cell); h.assert_persons("main", cell, &["Xavier-cellb", "Yvonne-cellb"], &[]) .await; h.assert_post_op_sentinel(cell, "sentinel-cellb").await; @@ -2876,8 +2906,8 @@ async fn concurrent_branch_ops_morphological_matrix() { ), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] merge into tgt-1", cell); - assert_eq!(sb, StatusCode::OK, "[{}] merge into tgt-2", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] merge into tgt-1", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] merge into tgt-2", cell); h.assert_persons("tgt-1-cellc", cell, &["Sharon-cellc"], &[]) .await; h.assert_persons("tgt-2-cellc", cell, &["Sharon-cellc"], &[]) @@ -2885,10 +2915,9 @@ async fn concurrent_branch_ops_morphological_matrix() { h.assert_post_op_sentinel(cell, "sentinel-cellc").await; } - // Cell d: Merge × Change, both touching main. Per-(table, branch) - // queue inside commit_all serializes them; both succeed; main - // contains both the merged source's contribution and the inserted - // sentinel. + // Cell d: Merge × Change, both touching main. C2 permits both + // succeed, or exactly one clean 409 if the merge detects target + // movement after planning but before acquiring the queue. { let cell = "d:merge×change:into-target"; let h = matrix::Harness::new().await; @@ -2901,10 +2930,25 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_change_insert("main".to_string(), "FrankD-celld".to_string(), 33), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] merge", cell); - assert_eq!(sb, StatusCode::OK, "[{}] change", cell); - h.assert_persons("main", cell, &["EveD-celld", "FrankD-celld"], &[]) - .await; + assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell); + assert!( + sa.status == StatusCode::OK || sa.status == StatusCode::CONFLICT, + "[{}] merge must be 200 or clean 409, got {}", + cell, + sa.status + ); + if sa.status == StatusCode::OK { + h.assert_persons("main", cell, &["EveD-celld", "FrankD-celld"], &[]) + .await; + } else { + let error: ErrorOutput = serde_json::from_slice(&sa.body).unwrap(); + let conflict = error + .manifest_conflict + .expect("merge 409 must include manifest_conflict"); + assert_eq!(conflict.table_key, "node:Person", "[{}] conflict table", cell); + h.assert_persons("main", cell, &["FrankD-celld"], &["EveD-celld"]) + .await; + } h.assert_post_op_sentinel(cell, "sentinel-celld").await; } @@ -2924,8 +2968,8 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_branch_create("main".to_string(), "fork-celle".to_string()), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] merge", cell); - assert_eq!(sb, StatusCode::OK, "[{}] branch_create_from", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] merge", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] branch_create_from", cell); // Main definitely has Eve. h.assert_persons("main", cell, &["Eve-celle"], &[]).await; // fork-celle was forked off main at SOME version; main's current @@ -2964,8 +3008,8 @@ async fn concurrent_branch_ops_morphological_matrix() { ), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] gamma create", cell); - assert_eq!(sb, StatusCode::OK, "[{}] delta create", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] gamma create", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] delta create", cell); // gamma forks off alpha → must contain Eve. h.assert_persons("gamma-cellf", cell, &["Eve-cellf"], &[]).await; // delta forks off beta → must NOT contain Eve. @@ -2987,8 +3031,8 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_branch_delete("doomed-cellg".to_string()), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] create newborn", cell); - assert_eq!(sb, StatusCode::OK, "[{}] delete doomed", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] create newborn", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] delete doomed", cell); // newborn-cellg exists with main's content. h.assert_persons("newborn-cellg", cell, &["Alice"], &[]).await; h.assert_post_op_sentinel(cell, "sentinel-cellg").await; @@ -3008,8 +3052,8 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_branch_delete("doomed2-cellh".to_string()), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] delete 1", cell); - assert_eq!(sb, StatusCode::OK, "[{}] delete 2", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] delete 1", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] delete 2", cell); // Verify both gone via /branches list (snapshot would still work // for a deleted branch via parent fallback in some paths, so we // use the explicit list). @@ -3062,8 +3106,8 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_change_insert("main".to_string(), "Pat-celli".to_string(), 44), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] delete", cell); - assert_eq!(sb, StatusCode::OK, "[{}] change", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] delete", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell); h.assert_persons("main", cell, &["Pat-celli"], &[]).await; h.assert_post_op_sentinel(cell, "sentinel-celli").await; } @@ -3081,8 +3125,8 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_change_insert("main".to_string(), "Quincy-cellj".to_string(), 55), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] branch_create", cell); - assert_eq!(sb, StatusCode::OK, "[{}] change", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] branch_create", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell); h.assert_persons("main", cell, &["Quincy-cellj"], &[]).await; // twin-cellj has either pre-change view (no Quincy) or // post-change view (with Quincy); either is valid. @@ -3105,8 +3149,8 @@ async fn concurrent_branch_ops_morphological_matrix() { matrix::op_change_insert("main".to_string(), "Steve-cellk".to_string(), 37), ) .await; - assert_eq!(sa, StatusCode::OK, "[{}] merge", cell); - assert_eq!(sb, StatusCode::OK, "[{}] change", cell); + assert_eq!(sa.status, StatusCode::OK, "[{}] merge", cell); + assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell); h.assert_persons("main", cell, &["Rita-cellk", "Steve-cellk"], &[]) .await; diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index e81fb0b..e911ad0 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1233,6 +1233,30 @@ impl Omnigraph { .collect(); let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await; + let post_queue_snapshot = self.snapshot().await; + for table_key in &ordered_table_keys { + let Some(candidate) = candidates.get(table_key) else { + continue; + }; + if !matches!( + candidate, + CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState + ) { + continue; + } + let expected = target_snapshot.entry(table_key).map(|e| e.table_version); + let current = post_queue_snapshot + .entry(table_key) + .map(|e| e.table_version); + if expected != current { + return Err(OmniError::manifest_expected_version_mismatch( + table_key.clone(), + expected.unwrap_or(0), + current.unwrap_or(0), + )); + } + } + let recovery_pins: Vec = ordered_table_keys .iter() .filter_map(|table_key| {