tests: multi-branch sequential merges compositional flow

Adds `composite_flow_multi_branch_sequential_merges` covering the
agent-workflow pattern that single-merge tests in `branching.rs`
cannot reach: two feature branches diverging from main with main
writes interleaved between every diverge point, sequential merges
into main, time-travel through the resulting merge DAG, and reopen
consistency over a multi-merge history.

The script (18 numbered steps with assertions per step):
  init+load → mutate main → branch feat-a → mutate main → mutate
  feat-a → branch feat-b → mutate feat-b → mutate feat-a (with
  edge) → merge feat-a → mutate main → merge feat-b → time-travel
  to pre-merge-a + pre-merge-b → reopen + verify.

Catches eight compositional gap categories that only surface with
≥2 merges and main mutations between them: base/LCA recomputation
across two merges, manifest-pin propagation through merge commits,
time-travel through merge DAG without state bleed-through, branch-
DAG consistency, sibling-branch isolation under writes elsewhere,
post-merge main-write integration, multi-merge reopen replay, and
clean-flow recovery-sidecar absence.

`composite_flow.rs` was added to `docs/testing.md` so the before-
every-task checklist points agents at the file before duplicating
coverage.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-05 19:34:04 +02:00
parent 58a3ff0e48
commit 9fc6526ec0
No known key found for this signature in database
2 changed files with 323 additions and 2 deletions

View file

@ -16,8 +16,8 @@ use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;
use helpers::{
MUTATION_QUERIES, count_rows, mixed_params, mutate_branch, mutate_main, query_branch,
query_main, snapshot_main, version_branch,
MUTATION_QUERIES, count_rows, count_rows_branch, mixed_params, mutate_branch, mutate_main,
query_branch, query_main, snapshot_main, version_branch, version_main,
};
const TEST_SCHEMA: &str = include_str!("fixtures/test.pg");
@ -369,3 +369,323 @@ async fn composite_flow_init_load_branch_merge_time_travel_optimize_cleanup() {
.unwrap();
assert!(!final_total.batches().is_empty());
}
/// Multi-branch sequential merges with main writes interleaved between
/// every diverge point. Catches compositional regressions that single-
/// merge tests can't see:
///
/// - **Base/LCA recomputation across two merges**: feat-b's base must be
/// the main version *at feat-b's branch creation*, not main's
/// post-feat-a-merge HEAD. A regression that uses main HEAD as the
/// merge base would re-classify Eve / Grace as unknown source-only
/// rows and re-apply them.
/// - **Manifest pin propagation through merge commits**: after merge
/// feat-a → main, main's table_branch entries for Person and Knows
/// must reflect the rewrite-on-active path; the second merge needs
/// them to compute its diff correctly.
/// - **Time-travel through merge DAG**: snapshot_at_version at three
/// distinct points (pre-feat-a-merge, post-feat-a-merge-pre-helen,
/// pre-feat-b-merge) must each return the right historical state
/// without bleed-through from later commits.
/// - **Reopen consistency over a multi-merge history**: dropping the
/// handle and reopening must replay the full merge DAG cleanly with
/// no recovery sweep activity (steady state).
///
/// All other compositional concerns (single merge mechanics, conflict
/// detection, time-travel mechanics) are covered by `branching.rs` and
/// `point_in_time.rs`. This test only exercises *composition*.
#[tokio::test]
async fn composite_flow_multi_branch_sequential_merges() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// ─────────────────────────────────────────────────────────────────
// Step 1: init + load baseline (4 Person, 2 Company, 3 Knows, 2 WorksAt
// edges from test.jsonl).
// ─────────────────────────────────────────────────────────────────
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Append).await.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, 4);
assert_eq!(count_rows(&db, "edge:Knows").await, 3);
// ─────────────────────────────────────────────────────────────────
// Step 2: mutate main — insert "Alice2" before any branching. Main
// diverges from the load baseline by exactly one row.
// ─────────────────────────────────────────────────────────────────
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Alice2")], &[("$age", 31)]),
)
.await
.expect("insert Alice2 on main");
assert_eq!(count_rows(&db, "node:Person").await, 5);
// ─────────────────────────────────────────────────────────────────
// Step 3: branch_create feat-a from main. feat-a inherits main's
// 5-Person state.
// ─────────────────────────────────────────────────────────────────
db.branch_create("feat-a").await.unwrap();
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 5);
// ─────────────────────────────────────────────────────────────────
// Step 4: mutate main — insert "Bob2" AFTER feat-a was created. main
// and feat-a now diverge: main has Bob2, feat-a does not.
// ─────────────────────────────────────────────────────────────────
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob2")], &[("$age", 26)]),
)
.await
.expect("insert Bob2 on main");
assert_eq!(count_rows(&db, "node:Person").await, 6);
assert_eq!(
count_rows_branch(&db, "feat-a", "node:Person").await,
5,
"feat-a must not see main's post-branch-create writes"
);
// ─────────────────────────────────────────────────────────────────
// Step 5: mutate feat-a — insert "Eve". feat-a now also has 6 rows,
// but the *sixth* is Eve, not Bob2.
// ─────────────────────────────────────────────────────────────────
mutate_branch(
&mut db,
"feat-a",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect("insert Eve on feat-a");
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 6);
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"main must not see feat-a's writes"
);
// ─────────────────────────────────────────────────────────────────
// Step 6: branch_create feat-b from main. feat-b's base is main's
// current state (post-Bob2): 6 Persons including Bob2 but NOT Eve.
// The two branches now share neither base nor head with each other.
// ─────────────────────────────────────────────────────────────────
db.branch_create("feat-b").await.unwrap();
assert_eq!(count_rows_branch(&db, "feat-b", "node:Person").await, 6);
// ─────────────────────────────────────────────────────────────────
// Step 7: mutate feat-b — insert "Frank".
// ─────────────────────────────────────────────────────────────────
mutate_branch(
&mut db,
"feat-b",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect("insert Frank on feat-b");
assert_eq!(count_rows_branch(&db, "feat-b", "node:Person").await, 7);
// ─────────────────────────────────────────────────────────────────
// Step 8: mutate feat-a again — insert "Grace" + Knows(Grace → Eve).
// feat-a now has 7 Persons and 4 Knows edges.
// ─────────────────────────────────────────────────────────────────
mutate_branch(
&mut db,
"feat-a",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "Grace"), ("$friend", "Eve")],
&[("$age", 28)],
),
)
.await
.expect("insert Grace + Knows(Grace → Eve) on feat-a");
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 7);
assert_eq!(count_rows_branch(&db, "feat-a", "edge:Knows").await, 4);
assert_eq!(
count_rows(&db, "edge:Knows").await,
3,
"main's Knows must be untouched by feat-a's edge insert"
);
// ─────────────────────────────────────────────────────────────────
// Step 9: capture pre-merge-feat-a state. main version + main snapshot
// version (these may diverge slightly in branch_merge plumbing — both
// are useful for time-travel later).
// ─────────────────────────────────────────────────────────────────
let pre_merge_a_version = version_main(&db).await.unwrap();
let pre_merge_a_persons = count_rows(&db, "node:Person").await;
assert_eq!(pre_merge_a_persons, 6);
// ─────────────────────────────────────────────────────────────────
// Step 10: merge feat-a → main. main gains Eve, Grace, and the
// Knows(Grace → Eve) edge. main's manifest version advances.
// ─────────────────────────────────────────────────────────────────
db.branch_merge("feat-a", "main").await.unwrap();
let post_merge_a_version = version_main(&db).await.unwrap();
assert!(
post_merge_a_version > pre_merge_a_version,
"merge feat-a → main must advance main's manifest version"
);
assert_eq!(count_rows(&db, "node:Person").await, 8);
assert_eq!(count_rows(&db, "edge:Knows").await, 4);
// ─────────────────────────────────────────────────────────────────
// Step 11: mutate main AFTER the first merge — insert "Helen". This
// makes feat-b's eventual merge a non-trivial one: feat-b's base
// (created in step 6) does not include Eve / Grace / Helen, but
// main now has all three on top of Bob2.
// ─────────────────────────────────────────────────────────────────
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Helen")], &[("$age", 44)]),
)
.await
.expect("insert Helen on main post-merge");
assert_eq!(count_rows(&db, "node:Person").await, 9);
// ─────────────────────────────────────────────────────────────────
// Step 12: capture pre-merge-feat-b state. Used for time-travel
// assertions in step 14.
// ─────────────────────────────────────────────────────────────────
let pre_merge_b_version = version_main(&db).await.unwrap();
assert!(
pre_merge_b_version > post_merge_a_version,
"Helen insert must advance main's version past the merge"
);
// ─────────────────────────────────────────────────────────────────
// Step 13: merge feat-b → main. The diff base for this merge is
// feat-b's branch-creation point (step 6), NOT main's current head.
// A regression that uses main HEAD as the base would attempt to
// re-apply Eve/Grace/Helen as source-only rows or surface conflicts.
// ─────────────────────────────────────────────────────────────────
db.branch_merge("feat-b", "main").await.unwrap();
let post_merge_b_version = version_main(&db).await.unwrap();
assert!(
post_merge_b_version > pre_merge_b_version,
"merge feat-b → main must advance main's manifest version"
);
assert_eq!(
count_rows(&db, "node:Person").await,
10,
"main must contain all 10 Persons after both merges land"
);
// ─────────────────────────────────────────────────────────────────
// Step 14: time-travel to pre-merge-a-version. Reads must return
// main's pre-feat-a-merge state: 6 Persons, no Eve / Grace / Frank /
// Helen. Catches snapshot leakage from later commits.
// ─────────────────────────────────────────────────────────────────
let pre_a_snap = db.snapshot_at_version(pre_merge_a_version).await.unwrap();
let pre_a_persons = pre_a_snap
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_a_persons, 6,
"time-travel to pre-merge-a must show exactly 6 Persons"
);
let pre_a_knows = pre_a_snap
.open("edge:Knows")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_a_knows, 3,
"time-travel to pre-merge-a must show exactly 3 Knows edges (no Grace → Eve)"
);
// ─────────────────────────────────────────────────────────────────
// Step 15: time-travel to pre-merge-b-version. Reads must show
// post-feat-a-merge state (Eve, Grace, Helen present) but NOT Frank.
// ─────────────────────────────────────────────────────────────────
let pre_b_snap = db.snapshot_at_version(pre_merge_b_version).await.unwrap();
let pre_b_persons = pre_b_snap
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_b_persons, 9,
"time-travel to pre-merge-b must show 9 Persons (post-feat-a-merge + Helen, pre-feat-b-merge)"
);
// ─────────────────────────────────────────────────────────────────
// Step 16: query feat-b at its current head — feat-b is unchanged
// by main's merges; it still shows its own 7-row state.
// ─────────────────────────────────────────────────────────────────
assert_eq!(
count_rows_branch(&db, "feat-b", "node:Person").await,
7,
"feat-b's own snapshot must be unaffected by main's merge of feat-a"
);
// ─────────────────────────────────────────────────────────────────
// Step 17: a feature-side query exercises the read path on a branch
// whose base predates a completed merge (feat-b's base is pre-feat-a).
// ─────────────────────────────────────────────────────────────────
let frank_on_feat_b = query_branch(
&mut db,
"feat-b",
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Frank")], &[]),
)
.await
.unwrap();
assert!(
!frank_on_feat_b.batches().is_empty(),
"feat-b must still see its own Frank insert"
);
// ─────────────────────────────────────────────────────────────────
// Step 18: drop + reopen. Steady state — no recovery sidecars on
// disk, manifest replays cleanly, all branches and tables visible.
// ─────────────────────────────────────────────────────────────────
drop(db);
let db = Omnigraph::open(uri).await.unwrap();
assert_eq!(
count_rows(&db, "node:Person").await,
10,
"main Person count must persist across reopen"
);
assert_eq!(
count_rows(&db, "edge:Knows").await,
4,
"main Knows count must persist across reopen"
);
let branches = db.branch_list().await.unwrap();
assert!(
branches.iter().any(|b| b == "feat-a") && branches.iter().any(|b| b == "feat-b"),
"both feature branches must persist across reopen; got {:?}",
branches
);
// No recovery sidecars left behind by a clean flow.
let recovery_dir = std::path::Path::new(uri).join("__recovery");
let leftover_sidecars = if recovery_dir.exists() {
std::fs::read_dir(&recovery_dir).unwrap().count()
} else {
0
};
assert_eq!(
leftover_sidecars, 0,
"clean compositional flow must not leave recovery sidecars on disk"
);
}

View file

@ -34,6 +34,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
| `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the four per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`). |
| `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path |
| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories). |
## Fixtures