diff --git a/crates/omnigraph/tests/composite_flow.rs b/crates/omnigraph/tests/composite_flow.rs index e7a70d9..a87187f 100644 --- a/crates/omnigraph/tests/composite_flow.rs +++ b/crates/omnigraph/tests/composite_flow.rs @@ -11,15 +11,41 @@ mod helpers; +use arrow_array::{Array, Int64Array}; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_compiler::ir::ParamMap; +use omnigraph_compiler::result::QueryResult; use helpers::{ MUTATION_QUERIES, count_rows, count_rows_branch, mixed_params, mutate_branch, mutate_main, query_branch, query_main, snapshot_main, version_branch, version_main, }; +/// Extract the `total` value from a `total_people` query result and +/// assert it equals `expected`. The query returns one row with one +/// `Int64` column named `total`; asserting only `num_rows() == 1` +/// would not catch a regression that returns a stale or wrong count. +fn assert_total(result: &QueryResult, expected: i64, context: &str) { + let batch = result.concat_batches().unwrap(); + assert_eq!( + batch.num_rows(), + 1, + "total_people must return exactly one summary row ({context})" + ); + let total_col = batch + .column_by_name("total") + .unwrap_or_else(|| panic!("missing `total` column ({context})")) + .as_any() + .downcast_ref::() + .unwrap_or_else(|| panic!("`total` column is not Int64 ({context})")); + assert_eq!( + total_col.value(0), + expected, + "total_people count mismatch ({context})" + ); +} + const TEST_SCHEMA: &str = include_str!("fixtures/test.pg"); const TEST_DATA: &str = include_str!("fixtures/test.jsonl"); const TEST_QUERIES: &str = include_str!("fixtures/test.gq"); @@ -672,11 +698,7 @@ async fn composite_flow_multi_branch_sequential_merges() { ) .await .unwrap(); - assert_eq!( - total_post_merges.num_rows(), - 1, - "total_people aggregation must return exactly one summary row" - ); + assert_total(&total_post_merges, 10, "post both merges, main must total 10 Persons"); // ───────────────────────────────────────────────────────────────── // Step 14: time-travel to pre-merge-a-version. Reads must return @@ -714,7 +736,9 @@ async fn composite_flow_multi_branch_sequential_merges() { ); // `.gq` query against the captured SnapshotId — the planner must // resolve `total_people` against the historical Person snapshot, - // not main's current head. + // not main's current head. Asserts the actual count value (not just + // row count) so a planner regression that resolves to current state + // would surface here as a count mismatch (10 instead of 6). let pre_a_total_via_query = db .query( ReadTarget::Snapshot(pre_merge_a_snap_id.clone()), @@ -724,10 +748,10 @@ async fn composite_flow_multi_branch_sequential_merges() { ) .await .unwrap(); - assert_eq!( - pre_a_total_via_query.num_rows(), - 1, - "time-travel total_people via query engine returns exactly one summary row" + assert_total( + &pre_a_total_via_query, + 6, + "time-travel to pre-merge-a must report 6 Persons via the query engine", ); // Edge-traversal time-travel: Grace and her Knows(Grace → Eve) edge // do not exist at pre_merge_a, so `friends_of(Grace)` must return 0 @@ -874,10 +898,10 @@ async fn composite_flow_multi_branch_sequential_merges() { ) .await .unwrap(); - assert_eq!( - post_reopen_total.num_rows(), - 1, - "total_people aggregation must work via the query engine after reopen" + assert_total( + &post_reopen_total, + 10, + "post-reopen total_people must still report 10 Persons", ); // Edge-traversal post-reopen: Grace's Knows(Grace → Eve) survived // both the merge and the reopen as a queryable graph edge.