MR-794 step 1: address PR #67 Codex P1 — document chained-merge contract

Codex flagged that combine_committed_with_staged can return duplicates
on chained stage_merge_inserts: each call's MergeInsertBuilder runs
against the committed view (it does not see prior staged fragments), so
two staged merges whose source rows share keys both produce
Operation::Update transactions whose new_fragments contain the shared
row. The combined scan returns it twice.

The bug is intrinsic to Lance's API: there is no public way to make
MergeInsertBuilder see uncommitted fragments. Fixing the primitive
itself requires either a Lance API extension or in-memory pre-merge
logic, neither in scope for v1.

The v1 fix is a parse-time companion (D₂′) added with the engine rewire
in MR-794 step 2+: per touched table, ops must be all stage_append OR
exactly one stage_merge_insert. Multi-table queries and append-chains
remain safe; only chained merges on a single table are rejected.

This commit:
- Documents the contract on stage_merge_insert and
  combine_committed_with_staged so callers know the invariant the
  primitive relies on.
- Adds tests/staged_writes.rs with four primitive-level tests:
  - stage_append + scan_with_staged shows committed + staged
  - stage_merge_insert dedupes superseded committed fragments
    (regression for the removed_fragment_ids fix that PR #66's
    730631c added)
  - count_rows_with_staged matches scan
  - chained stage_merge_insert with shared key documents the
    duplicate-row behavior; assertion pins it so a future change either
    preserves the contract or consciously fixes it (and updates the test)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-30 23:10:19 +02:00
parent 6dc4167291
commit 4c5fa3d8b8
No known key found for this signature in database
2 changed files with 276 additions and 0 deletions

View file

@ -601,6 +601,29 @@ impl TableStore {
/// fragments. The transaction's `Operation::Update` carries the
/// fragments-to-remove and fragments-to-add; for read-your-writes we
/// expose `new_fragments` (rows that will be visible after commit).
///
/// **Contract: do not chain `stage_merge_insert` calls on the same
/// table within one query.** Each call's `MergeInsertBuilder` runs
/// against the supplied dataset's committed view — it does not see
/// fragments produced by a previous staged merge on the same table.
/// Two chained `stage_merge_insert`s whose source rows share keys will
/// each independently produce `Operation::Update` transactions whose
/// `new_fragments` contain a row for the shared key. `scan_with_staged`
/// (and `count_rows_with_staged`) will then return both — i.e.
/// **duplicates by key**.
///
/// This is intrinsic to the underlying Lance API: there is no public
/// way to make `MergeInsertBuilder` see uncommitted fragments. The
/// engine's mutation path enforces the rule "per touched table: all
/// stage_append OR exactly one stage_merge_insert" at parse time
/// (the D₂ check landing with [MR-794](https://linear.app/modernrelay/issue/MR-794)
/// step 2+ in `exec/mutation.rs`). Multi-table queries and append-chains
/// remain safe; only chained merges on a single table are rejected.
///
/// Lift path: either a Lance API extension that lets
/// `MergeInsertBuilder` accept additional staged fragments, or an
/// in-memory pre-merge here that folds prior staged batches into the
/// input stream. See `docs/runs.md` and MR-793.
pub async fn stage_merge_insert(
&self,
ds: Dataset,
@ -877,6 +900,7 @@ impl TableStore {
/// 1. committed fragments whose IDs are NOT in any staged
/// `removed_fragment_ids` (preserves committed order),
/// 2. all staged `new_fragments` in stage order.
///
/// Lance's `Scanner` does not require any particular ordering between
/// committed and staged fragments — `with_fragments` scopes the scan to
/// exactly the supplied list. The dedup matters because merge_insert
@ -884,6 +908,17 @@ impl TableStore {
/// fragment is in `new_fragments`, the original (which it supersedes) is
/// in `committed` until manifest commit, and including both would yield
/// duplicate rows.
///
/// **Inter-stage supersession is not handled here.** Each StagedWrite's
/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
/// later staged merge cannot know about an earlier staged merge's
/// fragments (Lance's `MergeInsertBuilder` runs against the committed
/// view). If two `stage_merge_insert`s on the same table produce rows
/// with the same key, the combined view returns duplicates by key. The
/// engine's mutation path enforces "per touched table: all stage_append
/// OR exactly one stage_merge_insert" at parse time (D₂ in
/// `exec/mutation.rs`) so this primitive's caller never chains merges.
/// See `stage_merge_insert` for the full contract.
fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()

View file

@ -0,0 +1,241 @@
//! Primitive-level tests for `TableStore`'s staged-write API
//! (MR-794 step 1). These exercise `stage_append`, `stage_merge_insert`,
//! `scan_with_staged`, and `count_rows_with_staged` directly against a
//! Lance dataset — no Omnigraph engine involved. The engine-level rewire
//! (MR-794 step 2+) lives in `tests/runs.rs` once it lands.
//!
//! Test surface here:
//! 1. `stage_append` + `scan_with_staged` shows committed + staged data
//! without duplicates.
//! 2. `stage_merge_insert` of a row that supersedes a committed fragment
//! surfaces only the rewritten row, not both — the
//! `removed_fragment_ids` dedup landed in PR #66's `730631c`.
//! 3. **Documented contract**: chained `stage_merge_insert` calls on the
//! same dataset whose source rows share keys produce duplicate rows in
//! `scan_with_staged`. The engine's parse-time D₂ check (MR-794 step
//! 2+) prevents callers from triggering this; this test pins the
//! primitive's behavior so a future change either (a) preserves it or
//! (b) consciously fixes it (and updates this test).
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use lance::dataset::{WhenMatched, WhenNotMatched};
use omnigraph::table_store::TableStore;
use std::sync::Arc;
fn person_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
]))
}
fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, age)| *age).collect();
RecordBatch::try_new(
person_schema(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
],
)
.unwrap()
}
fn collect_ids(batches: &[RecordBatch]) -> Vec<String> {
let mut out = Vec::new();
for b in batches {
let ids = b
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..ids.len() {
out.push(ids.value(i).to_string());
}
}
out.sort();
out
}
#[tokio::test]
async fn stage_append_is_visible_via_scan_with_staged() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Seed: one committed row.
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
// Stage a second row.
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
// scan_with_staged sees both committed alice + staged bob, no duplicates.
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
// Plain scan (no staged) still sees only committed alice — dataset HEAD
// hasn't moved.
let plain = store.scan_batches(&ds).await.unwrap();
assert_eq!(collect_ids(&plain), vec!["alice"]);
}
#[tokio::test]
async fn stage_merge_insert_dedupes_superseded_committed_fragment() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Seed: alice age 30 in one committed fragment.
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
// Stage a merge_insert that rewrites alice's row. This produces an
// Operation::Update whose removed_fragment_ids excludes the committed
// fragment that contained the old alice.
let staged = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
assert!(
!staged.removed_fragment_ids.is_empty(),
"merge_insert that rewrites a committed row must set removed_fragment_ids \
(this is the dedup invariant from PR #66 commit 730631c its absence \
was caught by Cubic/Cursor/Codex on PR #66)"
);
// scan_with_staged: alice appears exactly once, with the new age.
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(ids, vec!["alice"], "merge_insert must not surface duplicates");
// Confirm the visible row is the rewritten one.
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 1);
let ages: Vec<i32> = batches
.iter()
.flat_map(|b| {
let col = b
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
(0..col.len()).map(|i| col.value(i)).collect::<Vec<_>>()
})
.collect();
assert_eq!(ages, vec![31]);
}
#[tokio::test]
async fn count_rows_with_staged_matches_scan() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25)), ("carol", Some(40))]))
.await
.unwrap();
let count = store
.count_rows_with_staged(&ds, std::slice::from_ref(&staged), None)
.await
.unwrap();
assert_eq!(count, 3);
}
/// **Documented contract** (see `stage_merge_insert` doc): chained
/// `stage_merge_insert` calls on the same table whose source rows share
/// keys cannot dedupe across stages. Each call's `MergeInsertBuilder` runs
/// against the committed view; neither sees the other's staged fragments.
/// The combined `scan_with_staged` therefore returns the shared key
/// twice.
///
/// The engine's mutation path enforces D₂ (per touched table: all
/// stage_append OR exactly one stage_merge_insert) at parse time so this
/// scenario is unreachable through public APIs. This test pins the
/// primitive behavior — if a future change makes the primitive itself
/// dedupe across stages (e.g. via a Lance API extension or in-memory
/// pre-merge), update this assertion.
#[tokio::test]
async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Seed empty (an unrelated row keeps the schema unambiguous).
let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))]))
.await
.unwrap();
// Op-1: stage merge_insert of alice. Against committed view: alice
// doesn't exist, so this lands as a fresh insert into Operation::Update.new_fragments.
let staged_1 = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(30))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
// Op-2: stage merge_insert of alice with a different age. Also runs
// against the committed view (alice doesn't exist there either), so
// Lance produces another fresh insert. Op-2 has no knowledge of
// op-1's staged fragments.
let staged_2 = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
// scan_with_staged sees committed (seed) + op-1.new (alice age=30) +
// op-2.new (alice age=31). Alice appears twice — the documented
// contract violation that D₂ prevents at the engine layer.
let batches = store
.scan_with_staged(&ds, &[staged_1, staged_2], None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
let alice_count = ids.iter().filter(|id| *id == "alice").count();
assert_eq!(
alice_count, 2,
"chained stage_merge_insert with shared key produces duplicates — \
this is the contract documented on stage_merge_insert. If you're \
here because this assertion failed: either (a) the primitive was \
improved to dedupe across stages (good update to assert == 1) \
or (b) something subtler broke (investigate before changing the \
assertion). See PR #67 Codex P1 thread + .context/mr-794-step2-design.md §3.1."
);
}