mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
engine: branch-merge revalidates target snapshot under queue
This commit is contained in:
parent
a6d244e648
commit
708e170dc5
2 changed files with 125 additions and 57 deletions
|
|
@ -2427,6 +2427,12 @@ mod matrix {
|
|||
use std::time::Duration;
|
||||
use tokio::sync::Barrier;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct OpStatus {
|
||||
pub status: StatusCode,
|
||||
pub body: Vec<u8>,
|
||||
}
|
||||
|
||||
pub(super) struct Harness {
|
||||
pub _temp: tempfile::TempDir,
|
||||
pub app: Router,
|
||||
|
|
@ -2523,12 +2529,12 @@ mod matrix {
|
|||
}
|
||||
|
||||
/// Run two ops concurrently with barrier alignment + 15s deadlock
|
||||
/// timeout. Returns `(status_a, status_b)`. Panics on timeout.
|
||||
/// timeout. Returns `(op_a, op_b)`. Panics on timeout.
|
||||
pub async fn run_pair(
|
||||
&self,
|
||||
op_a: impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<StatusCode>,
|
||||
op_b: impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<StatusCode>,
|
||||
) -> (StatusCode, StatusCode) {
|
||||
op_a: impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<OpStatus>,
|
||||
op_b: impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<OpStatus>,
|
||||
) -> (OpStatus, OpStatus) {
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
let h_a = op_a(self.app.clone(), Arc::clone(&barrier));
|
||||
let h_b = op_b(self.app.clone(), Arc::clone(&barrier));
|
||||
|
|
@ -2684,12 +2690,12 @@ mod matrix {
|
|||
}
|
||||
|
||||
// Helpers that build the closures for `run_pair`. Each takes a
|
||||
// Router + Barrier and returns a JoinHandle yielding the status.
|
||||
// Router + Barrier and returns a JoinHandle yielding the status/body.
|
||||
|
||||
pub(super) fn op_merge(
|
||||
source: String,
|
||||
target: String,
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<StatusCode> {
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<OpStatus> {
|
||||
move |app: Router, barrier: Arc<Barrier>| {
|
||||
tokio::spawn(async move {
|
||||
barrier.wait().await;
|
||||
|
|
@ -2698,17 +2704,23 @@ mod matrix {
|
|||
target: Some(target),
|
||||
})
|
||||
.unwrap();
|
||||
app.oneshot(
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/branches/merge")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(body))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.status()
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let status = response.status();
|
||||
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
||||
OpStatus {
|
||||
status,
|
||||
body: body.to_vec(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -2717,7 +2729,7 @@ mod matrix {
|
|||
branch: String,
|
||||
name: String,
|
||||
age: i32,
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<StatusCode> {
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<OpStatus> {
|
||||
move |app: Router, barrier: Arc<Barrier>| {
|
||||
tokio::spawn(async move {
|
||||
barrier.wait().await;
|
||||
|
|
@ -2728,17 +2740,23 @@ mod matrix {
|
|||
branch: Some(branch),
|
||||
})
|
||||
.unwrap();
|
||||
app.oneshot(
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/change")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(body))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.status()
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let status = response.status();
|
||||
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
||||
OpStatus {
|
||||
status,
|
||||
body: body.to_vec(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -2746,7 +2764,7 @@ mod matrix {
|
|||
pub(super) fn op_branch_create(
|
||||
from: String,
|
||||
name: String,
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<StatusCode> {
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<OpStatus> {
|
||||
move |app: Router, barrier: Arc<Barrier>| {
|
||||
tokio::spawn(async move {
|
||||
barrier.wait().await;
|
||||
|
|
@ -2755,37 +2773,49 @@ mod matrix {
|
|||
name,
|
||||
})
|
||||
.unwrap();
|
||||
app.oneshot(
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/branches")
|
||||
.method(Method::POST)
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(body))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.status()
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let status = response.status();
|
||||
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
||||
OpStatus {
|
||||
status,
|
||||
body: body.to_vec(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn op_branch_delete(
|
||||
name: String,
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<StatusCode> {
|
||||
) -> impl FnOnce(Router, Arc<Barrier>) -> tokio::task::JoinHandle<OpStatus> {
|
||||
move |app: Router, barrier: Arc<Barrier>| {
|
||||
tokio::spawn(async move {
|
||||
barrier.wait().await;
|
||||
app.oneshot(
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri(format!("/branches/{}", name))
|
||||
.method(Method::DELETE)
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.status()
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let status = response.status();
|
||||
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
||||
OpStatus {
|
||||
status,
|
||||
body: body.to_vec(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -2820,8 +2850,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] merge a", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] merge b", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] merge a", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] merge b", cell);
|
||||
h.assert_persons("target-a-cella", cell, &["EveA-cella"], &["FrankB-cella"])
|
||||
.await;
|
||||
h.assert_persons("target-b-cella", cell, &["FrankB-cella"], &["EveA-cella"])
|
||||
|
|
@ -2846,8 +2876,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_merge("src-y-cellb".to_string(), "main".to_string()),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] merge x", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] merge y", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] merge x", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] merge y", cell);
|
||||
h.assert_persons("main", cell, &["Xavier-cellb", "Yvonne-cellb"], &[])
|
||||
.await;
|
||||
h.assert_post_op_sentinel(cell, "sentinel-cellb").await;
|
||||
|
|
@ -2876,8 +2906,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] merge into tgt-1", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] merge into tgt-2", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] merge into tgt-1", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] merge into tgt-2", cell);
|
||||
h.assert_persons("tgt-1-cellc", cell, &["Sharon-cellc"], &[])
|
||||
.await;
|
||||
h.assert_persons("tgt-2-cellc", cell, &["Sharon-cellc"], &[])
|
||||
|
|
@ -2885,10 +2915,9 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
h.assert_post_op_sentinel(cell, "sentinel-cellc").await;
|
||||
}
|
||||
|
||||
// Cell d: Merge × Change, both touching main. Per-(table, branch)
|
||||
// queue inside commit_all serializes them; both succeed; main
|
||||
// contains both the merged source's contribution and the inserted
|
||||
// sentinel.
|
||||
// Cell d: Merge × Change, both touching main. C2 permits both
|
||||
// succeed, or exactly one clean 409 if the merge detects target
|
||||
// movement after planning but before acquiring the queue.
|
||||
{
|
||||
let cell = "d:merge×change:into-target";
|
||||
let h = matrix::Harness::new().await;
|
||||
|
|
@ -2901,10 +2930,25 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_change_insert("main".to_string(), "FrankD-celld".to_string(), 33),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] merge", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] change", cell);
|
||||
h.assert_persons("main", cell, &["EveD-celld", "FrankD-celld"], &[])
|
||||
.await;
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell);
|
||||
assert!(
|
||||
sa.status == StatusCode::OK || sa.status == StatusCode::CONFLICT,
|
||||
"[{}] merge must be 200 or clean 409, got {}",
|
||||
cell,
|
||||
sa.status
|
||||
);
|
||||
if sa.status == StatusCode::OK {
|
||||
h.assert_persons("main", cell, &["EveD-celld", "FrankD-celld"], &[])
|
||||
.await;
|
||||
} else {
|
||||
let error: ErrorOutput = serde_json::from_slice(&sa.body).unwrap();
|
||||
let conflict = error
|
||||
.manifest_conflict
|
||||
.expect("merge 409 must include manifest_conflict");
|
||||
assert_eq!(conflict.table_key, "node:Person", "[{}] conflict table", cell);
|
||||
h.assert_persons("main", cell, &["FrankD-celld"], &["EveD-celld"])
|
||||
.await;
|
||||
}
|
||||
h.assert_post_op_sentinel(cell, "sentinel-celld").await;
|
||||
}
|
||||
|
||||
|
|
@ -2924,8 +2968,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_branch_create("main".to_string(), "fork-celle".to_string()),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] merge", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] branch_create_from", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] merge", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] branch_create_from", cell);
|
||||
// Main definitely has Eve.
|
||||
h.assert_persons("main", cell, &["Eve-celle"], &[]).await;
|
||||
// fork-celle was forked off main at SOME version; main's current
|
||||
|
|
@ -2964,8 +3008,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] gamma create", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] delta create", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] gamma create", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] delta create", cell);
|
||||
// gamma forks off alpha → must contain Eve.
|
||||
h.assert_persons("gamma-cellf", cell, &["Eve-cellf"], &[]).await;
|
||||
// delta forks off beta → must NOT contain Eve.
|
||||
|
|
@ -2987,8 +3031,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_branch_delete("doomed-cellg".to_string()),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] create newborn", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] delete doomed", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] create newborn", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] delete doomed", cell);
|
||||
// newborn-cellg exists with main's content.
|
||||
h.assert_persons("newborn-cellg", cell, &["Alice"], &[]).await;
|
||||
h.assert_post_op_sentinel(cell, "sentinel-cellg").await;
|
||||
|
|
@ -3008,8 +3052,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_branch_delete("doomed2-cellh".to_string()),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] delete 1", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] delete 2", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] delete 1", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] delete 2", cell);
|
||||
// Verify both gone via /branches list (snapshot would still work
|
||||
// for a deleted branch via parent fallback in some paths, so we
|
||||
// use the explicit list).
|
||||
|
|
@ -3062,8 +3106,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_change_insert("main".to_string(), "Pat-celli".to_string(), 44),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] delete", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] change", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] delete", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell);
|
||||
h.assert_persons("main", cell, &["Pat-celli"], &[]).await;
|
||||
h.assert_post_op_sentinel(cell, "sentinel-celli").await;
|
||||
}
|
||||
|
|
@ -3081,8 +3125,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_change_insert("main".to_string(), "Quincy-cellj".to_string(), 55),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] branch_create", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] change", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] branch_create", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell);
|
||||
h.assert_persons("main", cell, &["Quincy-cellj"], &[]).await;
|
||||
// twin-cellj has either pre-change view (no Quincy) or
|
||||
// post-change view (with Quincy); either is valid.
|
||||
|
|
@ -3105,8 +3149,8 @@ async fn concurrent_branch_ops_morphological_matrix() {
|
|||
matrix::op_change_insert("main".to_string(), "Steve-cellk".to_string(), 37),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(sa, StatusCode::OK, "[{}] merge", cell);
|
||||
assert_eq!(sb, StatusCode::OK, "[{}] change", cell);
|
||||
assert_eq!(sa.status, StatusCode::OK, "[{}] merge", cell);
|
||||
assert_eq!(sb.status, StatusCode::OK, "[{}] change", cell);
|
||||
h.assert_persons("main", cell, &["Rita-cellk", "Steve-cellk"], &[])
|
||||
.await;
|
||||
|
||||
|
|
|
|||
|
|
@ -1233,6 +1233,30 @@ impl Omnigraph {
|
|||
.collect();
|
||||
let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
|
||||
|
||||
let post_queue_snapshot = self.snapshot().await;
|
||||
for table_key in &ordered_table_keys {
|
||||
let Some(candidate) = candidates.get(table_key) else {
|
||||
continue;
|
||||
};
|
||||
if !matches!(
|
||||
candidate,
|
||||
CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
let expected = target_snapshot.entry(table_key).map(|e| e.table_version);
|
||||
let current = post_queue_snapshot
|
||||
.entry(table_key)
|
||||
.map(|e| e.table_version);
|
||||
if expected != current {
|
||||
return Err(OmniError::manifest_expected_version_mismatch(
|
||||
table_key.clone(),
|
||||
expected.unwrap_or(0),
|
||||
current.unwrap_or(0),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
|
||||
.iter()
|
||||
.filter_map(|table_key| {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue