MR-847: pin Lance restore semantics empirically (Phase 1)

Two new tests in tests/staged_writes.rs that the recovery sweep design
depends on:

- lance_restore_appends_one_commit_with_checked_out_content — verifies
  Dataset::restore() (no-args; restores currently-checked-out version)
  produces HEAD+1, not HEAD+2 as the v1 design assumed. Source confirmed
  at lance-4.0.0/src/dataset.rs:1106; this test prevents a future lance
  bump from silently breaking the recovery rollback math.

- lance_restore_loses_to_concurrent_append_via_orphaning — pins the
  concurrency hazard motivating MR-847's open-time-only invocation
  strategy: check_restore_txn (lance-4.0.0/src/io/commit/conflict_
  resolver.rs:986) returns Ok against Append/Update/Delete/CreateIndex/
  Merge/etc., so a Restore commits successfully even when a concurrent
  legitimate writer just landed an Append — silently orphaning the
  Append's data from the active timeline. MR-847 sidesteps via running
  recovery only at Omnigraph::open (before any other writers race);
  MR-856 (continuous-recovery reconciler) must guard via per-(table,
  branch) queue acquisition once MR-686 lands.

These two tests together pin the foundation for MR-847's correctness
claims and document the load-bearing constraint MR-856 will inherit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-02 21:33:39 +02:00
parent 8726ffe0a3
commit 1e70028293
No known key found for this signature in database

View file

@ -758,3 +758,190 @@ async fn create_vector_index_advances_head_inline_documents_residual() {
);
assert!(store.has_vector_index(&ds, "embedding").await.unwrap());
}
/// Empirical pin of `Dataset::restore` semantics for MR-847.
///
/// MR-847's recovery sweep depends on the `restore` invariant: from
/// HEAD = `h`, calling `Dataset::checkout_version(p).await?` then
/// `Dataset::restore().await?` produces a NEW commit at HEAD = `h + 1`
/// (NOT `h + 2` as the v1 design draft assumed) with content == content
/// at version `p`.
///
/// The Lance source confirms this — `restore()` (no args) takes the
/// currently-checked-out version's content and applies it via
/// `apply_commit` against the latest manifest, advancing HEAD by one.
/// See lance-4.0.0 `src/dataset.rs:1106` and the transaction-spec
/// example at https://lance.org/format/table/transaction/.
///
/// If the lance bump (4.0.0 → 4.x) ever changes this delta or the call
/// signature, the recovery sweep's rollback path breaks; this test
/// surfaces the regression at compile/test time rather than under
/// production drift recovery.
#[tokio::test]
async fn lance_restore_appends_one_commit_with_checked_out_content() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Build version history: v1 = {alice}, v2 = {alice, bob}, v3 = {alice, bob, carol}.
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
assert_eq!(ds.version().version, 1);
store
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
assert_eq!(ds.version().version, 2);
store
.append_batch(&uri, &mut ds, person_batch(&[("carol", Some(40))]))
.await
.unwrap();
assert_eq!(ds.version().version, 3);
let head_before = ds.version().version;
// Recovery's rollback shape: open + checkout(p) + restore().
let head_ds = Dataset::open(&uri).await.unwrap();
let mut to_restore = head_ds.checkout_version(1).await.unwrap();
assert_eq!(to_restore.manifest.version, 1);
to_restore.restore().await.unwrap();
// Verify against a fresh open — the previous handle's view doesn't
// tell us what other openers see.
let post = Dataset::open(&uri).await.unwrap();
assert_eq!(
post.version().version,
head_before + 1,
"Dataset::restore must append exactly one commit (HEAD + 1). If \
this assertion fires, lance changed restore semantics re-read \
lance src/dataset.rs::restore and update the MR-847 design AND \
the recovery sweep's rollback path before proceeding."
);
// Content equality: the restored HEAD must match version 1 (just alice).
let scanner = post.scan();
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(
ids,
vec!["alice".to_string()],
"post-restore content must equal version 1's content; got {:?}",
ids,
);
}
/// Empirical pin of the `Dataset::restore` concurrency hazard that motivates
/// MR-847's open-time-only invocation strategy and MR-856's queue-acquisition
/// requirement.
///
/// `Dataset::restore`'s `check_restore_txn` (lance-4.0.0
/// `src/io/commit/conflict_resolver.rs:986`) returns `Ok(())` against
/// almost every other op (Append, Update, Delete, CreateIndex, Merge, …),
/// so a Restore commits successfully even with concurrent commits in
/// flight. The symmetric checks (lines 318, 473, 634, 787, 853, 947, 978,
/// 1018, 1059, 1115, 1187, 1280) classify Restore as incompatible from
/// the *other* op's POV — but the *other* op already committed before the
/// Restore arrived, so it sees no conflict. Net: the Restore appends a
/// rewind commit AFTER the legitimate concurrent Append, silently
/// orphaning that Append's data from the active timeline.
///
/// MR-847 sidesteps this by running recovery only at `Omnigraph::open`
/// (before any other writers can race). MR-856's continuous-recovery
/// reconciler must acquire per-(table_key, branch) queues for sidecar
/// tables before invoking restore — otherwise this hazard becomes
/// reachable during in-flight tenant traffic.
///
/// This test is the load-bearing constraint MR-856 must honor.
#[tokio::test]
async fn lance_restore_loses_to_concurrent_append_via_orphaning() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// v1: seed with alice.
let _ = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
// Recovery handle: opened at the latest, then checked out at v1 (the
// pin we'd "rollback" to in a real recovery scenario). This handle
// has NOT yet called restore.
let recovery_open = Dataset::open(&uri).await.unwrap();
let mut recovery_handle = recovery_open.checkout_version(1).await.unwrap();
// Concurrent legitimate writer: appends bob, advancing HEAD to v2.
// This simulates MR-686's per-table-queue model where another tenant
// wrote between recovery's open and recovery's restore call.
let mut writer_handle = Dataset::open(&uri).await.unwrap();
store
.append_batch(&uri, &mut writer_handle, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
assert_eq!(writer_handle.version().version, 2);
// Recovery now restores. Because restore's `check_restore_txn` returns
// Ok against Append, this commits at v3 with content == v1 (just alice).
recovery_handle.restore().await.unwrap();
// Re-open and inspect: HEAD is v3, content is just alice. Bob is gone
// from the active timeline.
let post = Dataset::open(&uri).await.unwrap();
assert_eq!(
post.version().version,
3,
"Restore commits at HEAD+1 even when a concurrent commit landed \
between recovery's open and recovery's restore call. If this \
assertion fails, lance changed restore-vs-append conflict \
semantics re-read check_restore_txn and update MR-847's \
concurrency analysis."
);
let scanner = post.scan();
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(
ids,
vec!["alice".to_string()],
"Concurrent Append's row 'bob' was silently orphaned by the \
Restore. Active-timeline contents == v1's contents. This is the \
hazard MR-847 sidesteps via open-time-only invocation, and MR-856 \
must guard against via per-(table, branch) queue acquisition. \
Got: {:?}",
ids,
);
// Sanity: bob's commit IS still readable via explicit checkout_version(2).
// The data isn't gone from disk — it's just unreachable from HEAD until
// cleanup_old_versions reclaims the orphan.
let v2 = Dataset::open(&uri)
.await
.unwrap()
.checkout_version(2)
.await
.unwrap();
let v2_batches: Vec<RecordBatch> = v2
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let v2_ids = collect_ids(&v2_batches);
assert_eq!(v2_ids, vec!["alice".to_string(), "bob".to_string()]);
}