mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-15 01:55:13 +02:00
* MR-854: convert engine call sites to &dyn TableStorage; demote legacy methods
Phase 1b: every db.table_store.X(...) call site converts to
db.storage().X(...), reaching the storage layer through the sealed
TableStorage trait (returns &dyn TableStorage). Opaque SnapshotHandle
and StagedHandle replace bare lance::Dataset and Transaction in the
threaded values.
Phase 9: the inherent inline-commit methods on TableStore
(append_batch, merge_insert_batch{,es}, overwrite_batch,
create_btree_index, create_inverted_index) demote from pub to
pub(crate). Their only remaining direct users are table_store.rs
itself and the bulk loader's LoadMode::{Append, Overwrite, Merge}
concurrent fast-paths in loader::write_batch_to_dataset (no
two-phase shape in Lance 4.0.0 — closes after lance#6658 and #6666).
Docs:
- invariants.md \u00a7VI.23: drop "at the writer-trait surface"
qualifier; staged primitives are now the only engine surface.
- runs.md: residual matrix shrinks to delete_where and
create_vector_index (the two upstream-blocked residuals).
- forbidden_apis.rs: replace transitional language with the
current allow-list shape (table_store.rs + loader concurrent
fast-path only).
Files touched:
- changes/mod.rs, db/omnigraph.rs (+export/optimize/schema_apply/
table_ops.rs), exec/{merge,mod,mutation,staging}.rs,
loader/mod.rs, storage_layer.rs, table_store.rs,
tests/forbidden_apis.rs, docs/{invariants,runs}.md.
Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com>
* MR-854: replace test-only inline-commit append callers with local Lance helpers
After demoting TableStore::append_batch from pub to pub(crate), the
integration tests in tests/recovery.rs and tests/staged_writes.rs
that previously called store.append_batch(...) directly to simulate
HEAD-ahead-of-manifest drift can no longer access the inherent
method. Replace those calls with small in-test helpers that do a raw
Dataset::append (the same body the inherent method runs).
- tests/helpers/mod.rs gains lance_append_inline (shared helper).
- tests/staged_writes.rs gets a file-local lance_append_inline_local
(staged_writes.rs does not import helpers::).
- tests/recovery.rs drops the unused TableStore import in the one
function whose store binding became unused after the conversion.
Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com>
* MR-854: retrigger CI for flaky Test Workspace job
Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com>
* MR-854: convert remaining table_store call sites in export.rs / read_blob
Two leftover `self.table_store.X` / `db.table_store.X` call sites were
missed in the initial sweep — flagged by Devin Review on PR #86. Both
now go through the trait surface:
- `entity_from_snapshot` (db/omnigraph/export.rs): switch from
`db.table_store.open_snapshot_table` + `db.table_store.scan` to
`db.storage().open_snapshot_at_table` + `db.storage().scan`.
- `read_blob` (db/omnigraph.rs): replace
`snapshot.open(table_key)` + `self.table_store.first_row_id_for_filter`
with `self.storage().open_snapshot_at_table` +
`self.storage().first_row_id_for_filter`. The follow-up
`take_blobs` call still needs an `Arc<Dataset>` (it's a Lance blob
accessor not surfaced through the trait), so we hand off via
`SnapshotHandle::into_arc()` with a comment.
After this commit, no engine code outside `table_store.rs` reaches the
inherent `TableStore` API — the docs/runs.md and docs/invariants.md
claim is now uniformly true.
Co-Authored-By: Ragnor Comerford <ragnor.comerford@gmail.com>
* MR-854: post-rebase doc fixes (Lance 6.0.1, MR-A framing, into_dataset note)
Reviewer feedback on the rebased PR:
* docs/dev/writes.md residuals matrix: drop demoted methods from the trait-surface table (now `pub(crate)`); keep only the two genuine trait-surface residuals (`delete_where`, `create_vector_index`); reframe under MR-A (Lance v7.x bump) per docs/dev/lance.md.
* tests/forbidden_apis.rs: update transitional allow-list header to (a) drop the truncate_table mislabel (truncate_table is a Lance Dataset method, not a TableStore method — overwrite_batch's internal call), (b) reframe trait-surface residuals under MR-A / Lance #6666.
* crates/omnigraph/src/storage_layer.rs::SnapshotHandle::{into_arc, into_dataset}: add single-ref invariant doc — both consume Arc via try_unwrap-or-clone; sibling SnapshotHandle clones across an await point force a deep Dataset clone.
* Replace lance-4.0.0 version refs with lance-6.0.1 in active source/test/dev-doc comments (storage_layer.rs, table_store.rs, table_ops.rs, schema_apply.rs, merge.rs, recovery.rs, staged_writes.rs, consistency.rs, docs/dev/execution.md, docs/user/query-language.md). Historical refs in docs/releases/v0.4.1.md and the canonical "Lance 4.0.0 → 6.0.1 migration" line in docs/dev/lance.md left intact.
No engine code changes.
* MR-854: update docs/dev/invariants.md Storage trait row + gap entry
Reviewer feedback: the docs reorg landed; the invariant row now lives in
docs/dev/invariants.md with stable headings (no more numbered §VI.23).
Update two pieces to reflect MR-854 completion:
* Status table 'Storage trait' row: was 'full call-site migration ... incomplete';
now 'engine call sites all route through db.storage() (MR-854); inline-commit
inherent methods are pub(crate)-demoted; capability/stat surfaces are roadmap'.
* 'Known Gaps' 'Storage abstraction' entry: was 'older inherent TableStore call
sites and inline residuals remain'; now names the closed scope (MR-854 — call
sites migrated, methods demoted, loader fast-paths) and the remaining
trait-surface residuals under MR-A (Lance v7.x bump) and Lance #6666.
Cross-links to docs/dev/lance.md and docs/dev/writes.md so the framing stays
co-located with the canonical Lance surface tracking.
* MR-854: remove dead inline-commit methods from the storage surface
The loader concurrent fast-path (write_batch_to_dataset) is only reached
for LoadMode::Overwrite — Append/Merge route through MutationStaging — so
its Append/Merge arms were unreachable. Collapse it to overwrite-only and
drop the now-unused mode params, which removes the only callers of:
- TableStorage::append_batch + TableStorage::merge_insert_batches (trait)
- TableStore::merge_insert_batch + merge_insert_batches (inherent)
create_btree_index / create_inverted_index had zero callers anywhere
(scalar index builds use the stage_* primitives). Remove both from the
trait and the inherent impl.
Inherent append_batch stays pub(crate): overwrite_batch and recovery
tests use it. Migrate the one trait-append_batch test caller
(seed_person_row) to stage_append + commit_staged. The merge_insert
FirstSeen-workaround rationale moves from the deleted merge_insert_batch
into stage_merge_insert (now the sole merge path). No behavior change.
Also corrects the inaccurate loader residual comment (the prior text
blamed Lance #6658/#6666, which are the delete and vector-index issues,
for keeping overwrite inline; a stage_overwrite primitive already exists
and schema_apply uses it).
* MR-854: seal db.storage() to staged-only; move residuals to InlineCommitResidual
Split the three remaining inline-commit writes (overwrite_batch,
delete_where, create_vector_index) off the TableStorage trait onto a new
sealed InlineCommitResidual trait, reachable only via the explicit
Omnigraph::storage_inline_residual() accessor. db.storage() now exposes
only staged primitives + reads, so engine code cannot couple a write
with a Lance HEAD advance through the default surface — MR-793 acceptance
§1 ("no public method commits as a side effect of writing") now holds by
construction, not by review + naming.
Call sites moved to storage_inline_residual(): loader overwrite
fast-path, the three mutation delete_where paths, the branch-merge
delete, and the vector-index build. Impl bodies are unchanged (same
delegation to the pub(crate) inherent methods); this is a pure surface
reshape with no behavior change.
The residual trait holds two genuinely upstream-blocked methods
(delete_where -> Lance #6658/v7.x, create_vector_index -> Lance #6666)
plus overwrite_batch, kept for the loader's cross-table bulk-overwrite
concurrency until its staged migration lands (tracked follow-up).
* MR-854 docs: describe the staged-only seal; fix stale Lance index URLs
- writes.md / invariants.md / AGENTS.md: the inline-commit residuals now
live on InlineCommitResidual behind db.storage_inline_residual(), so
acceptance §1 holds by construction rather than 'option (b)' per-method
enumeration. Drop the inaccurate 'until Lance exposes
Operation::Overwrite { fragments }' claim (that op exists; stage_overwrite
already builds it) and reframe overwrite_batch as a removable legacy
residual gated on the loader's bulk-overwrite concurrency.
- forbidden_apis.rs: rewrite the allow-list doc for the split surface.
- lance.md: the index spec pages moved from /format/table/index/ to
/format/index/ in Lance 6.x (the old paths 404). Fix all 13 URLs.
* MR-854: fix stale lance-4.0.0 comment refs flagged in review
Addresses greptile (exec/merge.rs) and aaltshuler's stale-version blocker:
update lance-4.0.0 -> 6.0.1 in the comment/doc refs within this PR's
footprint (exec/merge.rs, exec/mutation.rs, docs/dev/writes.md). Also
corrects exec/merge.rs to cite lance#6666 (not #6658) for
build_index_metadata_from_segments — that is the vector-index segment-commit
API; #6658 is the two-phase delete. (Pre-existing 4.0.0 refs in untouched
files like architecture.md/storage.md are main's incomplete migration
cleanup, left out of scope.)
* fix(storage): stage loader overwrites
* fix(storage): stage empty schema rewrites
---------
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Ragnor Comerford <ragnor.comerford@gmail.com>
Co-authored-by: Ragnor Comerford <hello@ragnor.co>
2432 lines
88 KiB
Rust
2432 lines
88 KiB
Rust
#![cfg(feature = "failpoints")]
|
|
|
|
mod helpers;
|
|
|
|
use fail::FailScenario;
|
|
use futures::FutureExt;
|
|
use omnigraph::db::Omnigraph;
|
|
use omnigraph::failpoints::ScopedFailPoint;
|
|
|
|
use helpers::recovery::{
|
|
FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants,
|
|
branch_head_commit_id, single_sidecar_operation_id,
|
|
};
|
|
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main};
|
|
|
|
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
|
|
const SCHEMA_V2_ADDED_TYPE: &str =
|
|
"node Person { name: String @key }\nnode Company { name: String @key }\n";
|
|
|
|
fn node_table_uri(root: &str, type_name: &str) -> String {
|
|
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
|
|
for &b in type_name.as_bytes() {
|
|
hash ^= b as u64;
|
|
hash = hash.wrapping_mul(0x100_0000_01b3);
|
|
}
|
|
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn branch_create_failpoint_triggers() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let db = Omnigraph::init(uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return");
|
|
|
|
let err = db.branch_create("feature").await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: branch_create.after_manifest_branch_create")
|
|
);
|
|
}
|
|
|
|
// Branch delete flips the manifest authority first, then reclaims the per-table
|
|
// forks best-effort. A failure during that reclaim (here, the
|
|
// `branch_delete.before_table_cleanup` failpoint, standing in for a transient
|
|
// object-store error) must NOT fail the call: the branch is already gone, and
|
|
// `cleanup` reconciles the stranded fork. The branch name is reusable after.
|
|
#[tokio::test]
|
|
async fn branch_delete_partial_failure_converges_via_cleanup() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let mut main = helpers::init_and_load(&dir).await;
|
|
|
|
main.branch_create("feature").await.unwrap();
|
|
let mut feature = Omnigraph::open(&uri).await.unwrap();
|
|
helpers::mutate_branch(
|
|
&mut feature,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
drop(feature);
|
|
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
{
|
|
let ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
assert!(
|
|
ds.list_branches().await.unwrap().contains_key("feature"),
|
|
"precondition: the owned table fork exists before delete"
|
|
);
|
|
}
|
|
|
|
// Inject a failure during per-table cleanup, AFTER the manifest authority
|
|
// flip. branch_delete must still succeed (best-effort reclaim).
|
|
{
|
|
let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return");
|
|
main.branch_delete("feature").await.expect(
|
|
"branch_delete is best-effort after the manifest flip: a cleanup-step \
|
|
failure must not fail the call",
|
|
);
|
|
}
|
|
|
|
// Authority flipped: the branch is gone.
|
|
assert_eq!(main.branch_list().await.unwrap(), vec!["main".to_string()]);
|
|
|
|
// The eager reclaim failed, so the orphan is stranded until cleanup.
|
|
{
|
|
let ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
assert!(
|
|
ds.list_branches().await.unwrap().contains_key("feature"),
|
|
"failed eager reclaim should leave the orphan for cleanup to reconcile"
|
|
);
|
|
}
|
|
|
|
// cleanup converges: the orphan is reclaimed.
|
|
main.cleanup(omnigraph::db::CleanupPolicyOptions {
|
|
keep_versions: Some(1),
|
|
older_than: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
{
|
|
let ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
assert!(
|
|
!ds.list_branches().await.unwrap().contains_key("feature"),
|
|
"cleanup should reconcile the orphaned fork away"
|
|
);
|
|
}
|
|
|
|
// The name is reusable after cleanup reclaims the orphan.
|
|
main.branch_create("feature").await.unwrap();
|
|
let mut feature2 = Omnigraph::open(&uri).await.unwrap();
|
|
helpers::mutate_branch(
|
|
&mut feature2,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Frank")], &[("$age", 41)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Reusing a branch name whose delete left an orphaned fork (before `cleanup`
|
|
// reconciles it) must fail with a clear, actionable error pointing at
|
|
// `cleanup`, not the opaque `ExpectedVersionMismatch` that leaks from the fork
|
|
// path. The recreate itself succeeds; the first write to the previously-forked
|
|
// table is where the stale orphan collides.
|
|
#[tokio::test]
|
|
async fn recreate_over_orphaned_fork_before_cleanup_is_actionable() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let mut main = helpers::init_and_load(&dir).await;
|
|
|
|
main.branch_create("feature").await.unwrap();
|
|
let mut feature = Omnigraph::open(&uri).await.unwrap();
|
|
helpers::mutate_branch(
|
|
&mut feature,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
drop(feature);
|
|
|
|
// Partial delete: leaves the Person fork orphaned (cleanup not yet run).
|
|
{
|
|
let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return");
|
|
main.branch_delete("feature").await.unwrap();
|
|
}
|
|
|
|
// Recreate the name and write to the previously-forked table WITHOUT a
|
|
// cleanup in between.
|
|
main.branch_create("feature").await.unwrap();
|
|
let mut feature2 = Omnigraph::open(&uri).await.unwrap();
|
|
let err = helpers::mutate_branch(
|
|
&mut feature2,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Frank")], &[("$age", 41)]),
|
|
)
|
|
.await
|
|
.expect_err("write should collide with the stale orphaned fork");
|
|
|
|
let msg = err.to_string();
|
|
assert!(
|
|
msg.contains("cleanup")
|
|
&& (msg.contains("orphan") || msg.contains("incomplete prior delete")),
|
|
"expected an actionable orphaned-fork error pointing at cleanup, got: {msg}"
|
|
);
|
|
assert!(
|
|
!msg.contains("expected manifest table version"),
|
|
"should not surface the opaque ExpectedVersionMismatch, got: {msg}"
|
|
);
|
|
}
|
|
|
|
// cleanup is the guaranteed convergence backstop, so one table's transient
|
|
// failure must not abort the whole sweep. Inject a one-shot version-GC failure
|
|
// for a single table and assert: cleanup still succeeds, the failure is
|
|
// surfaced per-table in the returned stats, and the independent reconcile pass
|
|
// still reclaimed an orphan.
|
|
#[tokio::test]
|
|
async fn cleanup_isolates_single_table_failure() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let mut db = helpers::init_and_load(&dir).await;
|
|
|
|
// Forge an orphaned fork on the Person table (a reconcile target).
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
{
|
|
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
let base = ds.version().version;
|
|
ds.create_branch("ghost", base, None).await.unwrap();
|
|
}
|
|
|
|
// One table's version GC fails once; the sweep must isolate it.
|
|
let _fp = ScopedFailPoint::new("cleanup.table_gc", "1*return");
|
|
let stats = db
|
|
.cleanup(omnigraph::db::CleanupPolicyOptions {
|
|
keep_versions: Some(1),
|
|
older_than: None,
|
|
})
|
|
.await
|
|
.expect("a single table's GC failure must not abort cleanup");
|
|
|
|
let errored = stats.iter().filter(|s| s.error.is_some()).count();
|
|
assert_eq!(
|
|
errored, 1,
|
|
"exactly one table's GC failure should be surfaced in stats, got {errored}"
|
|
);
|
|
assert!(
|
|
stats.len() >= 4,
|
|
"every node+edge table should still appear in the stats"
|
|
);
|
|
|
|
// The reconcile pass is independent of the GC failure, so the orphan is gone.
|
|
{
|
|
let ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
assert!(
|
|
!ds.list_branches().await.unwrap().contains_key("ghost"),
|
|
"reconcile should reclaim the orphan despite the GC failure"
|
|
);
|
|
}
|
|
}
|
|
|
|
// Companion to the version-GC isolation test, exercising the OTHER cleanup
|
|
// loop: a force-delete failure while reconciling one orphaned fork must be
|
|
// isolated (logged, not propagated) so the sweep continues, and a later
|
|
// cleanup converges. This is the loop the Devin finding was about.
|
|
#[tokio::test]
|
|
async fn cleanup_isolates_reconcile_failure() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let mut db = helpers::init_and_load(&dir).await;
|
|
|
|
// Forge an orphaned fork the reconcile pass will try to reclaim.
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
{
|
|
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
let base = ds.version().version;
|
|
ds.create_branch("ghost", base, None).await.unwrap();
|
|
}
|
|
|
|
// Inject a one-shot failure into the reconcile force-delete. The sweep must
|
|
// not abort.
|
|
{
|
|
let _fp = ScopedFailPoint::new("cleanup.reconcile_fork", "1*return");
|
|
db.cleanup(omnigraph::db::CleanupPolicyOptions {
|
|
keep_versions: Some(1),
|
|
older_than: None,
|
|
})
|
|
.await
|
|
.expect("a reconcile force-delete failure must not abort cleanup");
|
|
}
|
|
// The blocked orphan is still present (the failure was isolated, not retried).
|
|
{
|
|
let ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
assert!(
|
|
ds.list_branches().await.unwrap().contains_key("ghost"),
|
|
"the orphan whose reclaim was injected-to-fail should remain"
|
|
);
|
|
}
|
|
// A second cleanup with no injected failure converges.
|
|
db.cleanup(omnigraph::db::CleanupPolicyOptions {
|
|
keep_versions: Some(1),
|
|
older_than: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
{
|
|
let ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
assert!(
|
|
!ds.list_branches().await.unwrap().contains_key("ghost"),
|
|
"the second cleanup should reconcile the orphan"
|
|
);
|
|
}
|
|
}
|
|
|
|
// The cleanup reconciler must reclaim orphaned commit-graph branches, not just
|
|
// per-table forks. A delete whose best-effort commit-graph reclaim fails leaves
|
|
// a commit-graph orphan; the next cleanup must drop it.
|
|
#[tokio::test]
|
|
async fn cleanup_reclaims_orphaned_commit_graph_branch() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let mut db = helpers::init_and_load(&dir).await;
|
|
|
|
db.branch_create("feature").await.unwrap();
|
|
// Delete, failing the commit-graph reclaim → commit-graph "feature" orphan
|
|
// (manifest branch gone, commit-graph branch left behind).
|
|
{
|
|
let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return");
|
|
db.branch_delete("feature").await.unwrap();
|
|
}
|
|
|
|
let commits_uri = format!("{}/_graph_commits.lance", uri.trim_end_matches('/'));
|
|
{
|
|
let ds = lance::Dataset::open(&commits_uri).await.unwrap();
|
|
assert!(
|
|
ds.list_branches().await.unwrap().contains_key("feature"),
|
|
"precondition: the commit-graph branch should be orphaned after the failed reclaim"
|
|
);
|
|
}
|
|
|
|
db.cleanup(omnigraph::db::CleanupPolicyOptions {
|
|
keep_versions: Some(1),
|
|
older_than: None,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
{
|
|
let ds = lance::Dataset::open(&commits_uri).await.unwrap();
|
|
assert!(
|
|
!ds.list_branches().await.unwrap().contains_key("feature"),
|
|
"cleanup should reclaim the orphaned commit-graph branch"
|
|
);
|
|
}
|
|
}
|
|
|
|
// A branch_delete whose best-effort commit-graph reclaim fails leaves a
|
|
// commit-graph "zombie" branch. Recreating that name must heal the zombie and
|
|
// succeed (branch_create force-deletes a stale commit-graph ref since the
|
|
// manifest branch is created fresh), instead of dying on the leftover ref.
|
|
#[tokio::test]
|
|
async fn branch_create_recreates_over_commit_graph_zombie() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
|
|
.await
|
|
.unwrap();
|
|
|
|
db.branch_create("feature").await.unwrap();
|
|
{
|
|
// Fail the best-effort commit-graph reclaim → commit-graph "feature"
|
|
// zombie survives the delete (manifest authority still flips).
|
|
let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return");
|
|
db.branch_delete("feature").await.unwrap();
|
|
}
|
|
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
|
|
|
|
db.branch_create("feature")
|
|
.await
|
|
.expect("branch_create should heal the zombie commit-graph branch and succeed");
|
|
assert!(
|
|
db.branch_list()
|
|
.await
|
|
.unwrap()
|
|
.contains(&"feature".to_string())
|
|
);
|
|
}
|
|
|
|
// branch_create is authority-then-derived: if the derived commit-graph branch
|
|
// cannot be created, the manifest branch (the authority) must be rolled back so
|
|
// the branch does not half-exist. The existing failpoint fires right after the
|
|
// manifest create, standing in for any post-authority failure.
|
|
#[tokio::test]
|
|
async fn branch_create_rolls_back_manifest_on_commit_graph_failure() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
|
|
.await
|
|
.unwrap();
|
|
|
|
let err = {
|
|
let _fp = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return");
|
|
db.branch_create("feature").await.unwrap_err()
|
|
};
|
|
assert!(
|
|
!db.branch_list()
|
|
.await
|
|
.unwrap()
|
|
.contains(&"feature".to_string()),
|
|
"branch_create must roll back the manifest branch when the derived \
|
|
commit-graph branch fails, got error: {err}"
|
|
);
|
|
}
|
|
|
|
// A fork collision must be classified by the manifest authority, not by Lance
|
|
// branch versions. When a concurrent first-write legitimately wins the fork
|
|
// race, the loser sees a version mismatch — but that is a stale snapshot, not
|
|
// an orphan, so it must be a retryable "refresh and retry", never a misleading
|
|
// "run cleanup".
|
|
//
|
|
// Ordering is made deterministic (no sleeps) via a callback at the fork point:
|
|
// `compare_exchange` lets only the FIRST arrival (writer A) record readiness and
|
|
// block until released; later arrivals (writer B) fall through. The test waits
|
|
// on the readiness flag, lets B win and commit the fork, then releases A.
|
|
static FORK_A_AT_POINT: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
|
|
static FORK_RELEASE_A: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn fork_collision_with_live_concurrent_fork_is_retryable() {
|
|
use std::sync::atomic::Ordering::SeqCst;
|
|
|
|
let _scenario = FailScenario::setup();
|
|
FORK_A_AT_POINT.store(false, SeqCst);
|
|
FORK_RELEASE_A.store(false, SeqCst);
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let main = helpers::init_and_load(&dir).await;
|
|
main.branch_create("feature").await.unwrap();
|
|
|
|
// First arrival (A) records readiness and blocks until released; the rest
|
|
// (B) fall through immediately. Bounded spin so a mistake can't hang forever.
|
|
fail::cfg_callback("fork.before_classify", || {
|
|
if FORK_A_AT_POINT
|
|
.compare_exchange(false, true, SeqCst, SeqCst)
|
|
.is_ok()
|
|
{
|
|
for _ in 0..2000 {
|
|
if FORK_RELEASE_A.load(SeqCst) {
|
|
break;
|
|
}
|
|
std::thread::sleep(std::time::Duration::from_millis(5));
|
|
}
|
|
}
|
|
})
|
|
.unwrap();
|
|
|
|
let uri_a = uri.clone();
|
|
let writer_a = tokio::spawn(async move {
|
|
let mut a = Omnigraph::open(&uri_a).await.unwrap();
|
|
helpers::mutate_branch(
|
|
&mut a,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
});
|
|
|
|
// Wait (bounded) until A is parked at the fork point.
|
|
for _ in 0..600 {
|
|
if FORK_A_AT_POINT.load(SeqCst) {
|
|
break;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
|
}
|
|
assert!(
|
|
FORK_A_AT_POINT.load(SeqCst),
|
|
"writer A never reached the fork point"
|
|
);
|
|
|
|
// B wins the fork and commits it.
|
|
let mut b = Omnigraph::open(&uri).await.unwrap();
|
|
helpers::mutate_branch(
|
|
&mut b,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Frank")], &[("$age", 41)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Release A; it resumes, re-reads the manifest, and sees the fork is live.
|
|
FORK_RELEASE_A.store(true, SeqCst);
|
|
let err = writer_a
|
|
.await
|
|
.unwrap()
|
|
.expect_err("A's stale-snapshot fork should be a retryable conflict");
|
|
fail::remove("fork.before_classify");
|
|
|
|
let msg = err.to_string();
|
|
assert!(
|
|
!msg.contains("cleanup"),
|
|
"a live concurrent fork must not be misclassified as an orphan, got: {msg}"
|
|
);
|
|
assert!(
|
|
msg.contains("refresh and retry") || msg.contains("expected manifest table version"),
|
|
"expected a retryable stale-view error, got: {msg}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn graph_publish_failpoint_triggers_before_commit_append() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
|
|
.await
|
|
.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("graph_publish.before_commit_append", "return");
|
|
|
|
let err = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: graph_publish.before_commit_append")
|
|
);
|
|
}
|
|
|
|
// Atomic schema apply: schema apply writes staging files first, then commits
|
|
// the manifest, then renames staging → final. Tests below inject crashes at
|
|
// the two boundaries and assert that reopening the graph yields a consistent
|
|
// state.
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
|
|
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
|
|
"got: {}",
|
|
err
|
|
);
|
|
}
|
|
|
|
// Reopen. With the sidecar protocol, a Phase B → Phase C crash
|
|
// (per-table commit_staged done; manifest publish not yet) is
|
|
// recoverable: the sidecar's `additional_registrations` carries the
|
|
// intent to register `node:Company`, schema-state recovery promotes
|
|
// the staging files, and the manifest-drift sweep publishes the
|
|
// RegisterTable + Update so the manifest catches up to the schema
|
|
// the writer already declared. The orphan-dataset-on-disk-with-no-
|
|
// manifest-entry corruption that pre-this-protocol recoveries left
|
|
// behind is closed.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
db.schema_source().as_str(),
|
|
SCHEMA_V2_ADDED_TYPE,
|
|
"live schema must reflect the rolled-forward apply (Company added)"
|
|
);
|
|
assert_no_staging_files(dir.path());
|
|
// node:Company must be registered in the manifest (queryable);
|
|
// pre-protocol recoveries left it as an orphan dataset on disk.
|
|
let company_rows = helpers::count_rows(&db, "node:Company").await;
|
|
assert_eq!(
|
|
company_rows, 0,
|
|
"node:Company must have a manifest entry post-recovery"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_recovers_post_commit_crash() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
|
|
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.after_manifest_commit"),
|
|
"got: {}",
|
|
err
|
|
);
|
|
}
|
|
|
|
// Reopen — manifest is at the new version, so recovery sweep should
|
|
// complete the rename and the live schema matches v2.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(db.schema_source().as_str(), SCHEMA_V2_ADDED_TYPE);
|
|
assert_no_staging_files(dir.path());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_recovers_partial_rename() {
|
|
// Construct a partial-rename state: _schema.pg has been renamed in
|
|
// (matching v2), but _schema.ir.json.staging and __schema_state.json.staging
|
|
// were never renamed. Recovery should detect that the live source matches
|
|
// the staging state's hash and complete the remaining renames.
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
|
|
db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap();
|
|
}
|
|
|
|
// Simulate: one of the renames (the IR or state file) didn't complete by
|
|
// copying the live ir/state files back to their staging names.
|
|
std::fs::copy(
|
|
dir.path().join("_schema.ir.json"),
|
|
dir.path().join("_schema.ir.json.staging"),
|
|
)
|
|
.unwrap();
|
|
std::fs::copy(
|
|
dir.path().join("__schema_state.json"),
|
|
dir.path().join("__schema_state.json.staging"),
|
|
)
|
|
.unwrap();
|
|
|
|
// Reopen — recovery should complete the rename (overwriting final files
|
|
// with identical staging content) and remove the staging files.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(db.schema_source().as_str(), SCHEMA_V2_ADDED_TYPE);
|
|
assert_no_staging_files(dir.path());
|
|
}
|
|
|
|
/// Prove the recovery sweep closes the "finalize → publisher residual"
|
|
/// across one open cycle.
|
|
///
|
|
/// `MutationStaging::finalize` runs `commit_staged` per touched table
|
|
/// sequentially before the publisher commits the manifest. Lance has no
|
|
/// multi-dataset atomic commit primitive, so a failure between the
|
|
/// per-table staged commits and the manifest commit leaves Lance HEAD
|
|
/// advanced on the touched tables with no manifest update.
|
|
///
|
|
/// Closing the residual: finalize writes a sidecar at
|
|
/// `__recovery/{ulid}.json` BEFORE Phase B, the failpoint fires AFTER
|
|
/// finalize but BEFORE the publisher, the engine handle is dropped, and
|
|
/// the next `Omnigraph::open` runs the recovery sweep. The sweep
|
|
/// classifies every table in the sidecar as `RolledPastExpected` (Lance
|
|
/// HEAD == expected + 1, post_commit_pin matches), decides RollForward,
|
|
/// atomically extends every manifest pin via
|
|
/// `ManifestBatchPublisher::publish`, records an audit row, and deletes
|
|
/// the sidecar.
|
|
///
|
|
/// After this test passes:
|
|
/// - The originally-attempted insert ("Eve") is visible via a normal
|
|
/// query.
|
|
/// - The next mutation succeeds without `ExpectedVersionMismatch`.
|
|
/// - `_graph_commit_recoveries.lance` carries an audit row with
|
|
/// `recovery_kind=RolledForward` and the original sidecar's
|
|
/// `actor_id` in `recovery_for_actor`.
|
|
///
|
|
/// Continuous in-process recovery (no restart needed between failure
|
|
/// and recovery) is the goal of a future background reconciler.
|
|
#[tokio::test]
|
|
async fn recovery_rolls_forward_after_finalize_publisher_failure() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
|
|
// Setup: trigger the residual.
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
|
|
// The mutation's finalize completes (commit_staged advances Lance
|
|
// HEAD on node:Person AND writes a `__recovery/{ulid}.json`
|
|
// sidecar). Then the failpoint kicks in before the publisher's
|
|
// manifest commit, so the manifest pin stays at the pre-write
|
|
// version. The sidecar persists for the next-open recovery sweep.
|
|
let err = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
// Sidecar must still exist on disk for the recovery sweep to find.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert_eq!(
|
|
sidecars.len(),
|
|
1,
|
|
"exactly one sidecar should persist after the finalize failure"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
|
|
// Drop the failpoint scope and the engine handle.
|
|
}
|
|
|
|
// Recovery: reopen runs the recovery sweep. The sweep finds the
|
|
// sidecar, classifies node:Person as RolledPastExpected, decides
|
|
// RollForward, publishes the manifest update, records the audit
|
|
// row, deletes the sidecar.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
// The originally-attempted "Eve" insert is now visible — the recovery
|
|
// sweep extended the manifest pin to include the staged commit.
|
|
let person_count = helpers::count_rows(&db, "node:Person").await;
|
|
assert_eq!(
|
|
person_count, 1,
|
|
"exactly one person (Eve) must be visible after roll-forward"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![TableExpectation::main("node:Person").follow_up_mutation(
|
|
FollowUpMutation::new(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
|
|
),
|
|
)],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let person_count = helpers::count_rows(&db, "node:Person").await;
|
|
assert_eq!(
|
|
person_count, 2,
|
|
"Frank's insert must land normally after recovery"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn inline_delete_conflict_writes_sidecar_before_rejecting() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let db = helpers::init_and_load(&dir).await;
|
|
|
|
let pre_snapshot = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
let pre_person_pin = pre_snapshot.entry("node:Person").unwrap().table_version;
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
|
|
{
|
|
let _pause_delete =
|
|
ScopedFailPoint::new("mutation.delete_node_pre_primary_delete", "pause");
|
|
let delete_params = helpers::params(&[("$name", "Alice")]);
|
|
let delete = db.mutate("main", MUTATION_QUERIES, "remove_person", &delete_params);
|
|
tokio::pin!(delete);
|
|
|
|
let mut concurrent_update_succeeded = false;
|
|
for _ in 0..50 {
|
|
if delete.as_mut().now_or_never().is_some() {
|
|
panic!("delete mutation completed before primary-delete failpoint was released");
|
|
}
|
|
let mut concurrent = Omnigraph::open_read_only(&uri).await.unwrap();
|
|
if mutate_main(
|
|
&mut concurrent,
|
|
MUTATION_QUERIES,
|
|
"set_age",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 26)]),
|
|
)
|
|
.await
|
|
.is_ok()
|
|
{
|
|
concurrent_update_succeeded = true;
|
|
break;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
|
}
|
|
assert!(
|
|
concurrent_update_succeeded,
|
|
"concurrent update must land while delete is paused"
|
|
);
|
|
fail::remove("mutation.delete_node_pre_primary_delete");
|
|
|
|
let err = delete.await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains("stale view of 'node:Person'")
|
|
|| err.to_string().contains("ExpectedVersionMismatch")
|
|
|| err.to_string().contains("expected version mismatch"),
|
|
"unexpected error: {err}",
|
|
);
|
|
}
|
|
|
|
let person_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
assert!(
|
|
person_head > pre_person_pin,
|
|
"primary inline delete must have advanced node:Person before rejecting"
|
|
);
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
4,
|
|
"manifest-conflicted delete must not remove net Person rows after recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "edge:Knows").await,
|
|
3,
|
|
"manifest-conflicted delete must not remove net Knows rows after recovery"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn recovery_rolls_forward_load_on_feature_branch() {
|
|
use omnigraph::loader::LoadMode;
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
let main_person_pin;
|
|
let feature_parent_commit_id;
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
db.branch_create("feature").await.unwrap();
|
|
db.mutate(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "BeforeLoad")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
main_person_pin = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap()
|
|
.entry("node:Person")
|
|
.expect("main must have Person")
|
|
.table_version;
|
|
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
|
|
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
let err = db
|
|
.load(
|
|
"feature",
|
|
r#"{"type":"Person","data":{"name":"FeatureLoad","age":41}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
|
|
"unexpected error: {err}"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
2,
|
|
"feature branch load row must be visible after recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
0,
|
|
"feature branch load recovery must not publish the row to main"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![
|
|
TableExpectation::branch("node:Person", "feature")
|
|
.expected_main_manifest_pin(main_person_pin)
|
|
.expected_recovery_parent_commit_id(feature_parent_commit_id)
|
|
.follow_up_mutation(FollowUpMutation::new(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
mixed_params(&[("$name", "AfterLoad")], &[("$age", 42)]),
|
|
)),
|
|
],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
3,
|
|
"follow-up feature mutation must succeed after load recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
0,
|
|
"follow-up feature mutation must not move main"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn recovery_rolls_forward_load_overwrite() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
let parent_commit_id;
|
|
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, helpers::TEST_DATA, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
parent_commit_id = branch_head_commit_id(dir.path(), "main").await.unwrap();
|
|
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
let err = db
|
|
.load(
|
|
"main",
|
|
r#"{"type":"Person","data":{"name":"OverwriteLoad","age":41}}
|
|
"#,
|
|
LoadMode::Overwrite,
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
|
|
"unexpected error: {err}"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
1,
|
|
"overwrite row must be visible after recovery rolls the load forward"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![
|
|
TableExpectation::main("node:Person")
|
|
.expected_recovery_parent_commit_id(parent_commit_id)
|
|
.follow_up_mutation(FollowUpMutation::new(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
mixed_params(&[("$name", "AfterOverwriteLoad")], &[("$age", 42)]),
|
|
)),
|
|
],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
2,
|
|
"follow-up mutation must succeed after overwrite load recovery"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn recovery_rolls_forward_ensure_indices_on_feature_branch() {
|
|
use lance::index::DatasetIndexExt;
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
use omnigraph::table_store::TableStore;
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
let feature_parent_commit_id;
|
|
let main_person_pin;
|
|
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("feature").await.unwrap();
|
|
db.mutate(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "BeforeEnsure")], &[("$age", 42)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
main_person_pin = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap()
|
|
.entry("node:Person")
|
|
.expect("main must have Person")
|
|
.table_version;
|
|
|
|
// Make the feature branch's Person table genuinely need index work
|
|
// while keeping the manifest internally consistent. The test-only
|
|
// publisher deliberately skips the normal index-rebuild preparation;
|
|
// the failed writer below is still the real `ensure_indices_on`.
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
let store = TableStore::new(&uri);
|
|
let mut ds = store
|
|
.open_dataset_head(&person_uri, Some("feature"))
|
|
.await
|
|
.unwrap();
|
|
ds.drop_index("id_idx").await.unwrap();
|
|
let dropped_index_head = ds.version().version;
|
|
db.failpoint_publish_table_head_without_index_rebuild_for_test(
|
|
"feature",
|
|
"node:Person",
|
|
Some("feature"),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
let feature_snapshot = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(
|
|
feature_snapshot
|
|
.entry("node:Person")
|
|
.expect("feature must have Person")
|
|
.table_version,
|
|
dropped_index_head,
|
|
"test setup must publish the dropped-index table head before ensure_indices runs",
|
|
);
|
|
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
|
|
|
|
{
|
|
let _failpoint =
|
|
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db.ensure_indices_on("feature").await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
drop(db);
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
2,
|
|
"feature should see inherited alice plus recovered branch-local row"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
1,
|
|
"ensure_indices branch recovery must not move main"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![
|
|
TableExpectation::branch("node:Person", "feature")
|
|
.expected_main_manifest_pin(main_person_pin)
|
|
.expected_recovery_parent_commit_id(feature_parent_commit_id)
|
|
.follow_up_mutation(FollowUpMutation::new(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
mixed_params(&[("$name", "AfterEnsure")], &[("$age", 44)]),
|
|
)),
|
|
],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
3,
|
|
"follow-up feature mutation must succeed after ensure_indices recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
1,
|
|
"follow-up feature mutation must not move main"
|
|
);
|
|
}
|
|
|
|
/// Refresh-time recovery (Option B): the in-process `Omnigraph::refresh`
|
|
/// runs roll-forward-only recovery, closing the long-running-server
|
|
/// residual without restart.
|
|
///
|
|
/// Setup: trigger `mutation.post_finalize_pre_publisher` once. The
|
|
/// sidecar persists. Without dropping the engine, call `db.refresh()`.
|
|
/// The post-condition: sidecar gone; Eve visible; subsequent mutation
|
|
/// on the same handle succeeds without restart and without
|
|
/// ExpectedVersionMismatch.
|
|
#[tokio::test]
|
|
async fn refresh_runs_roll_forward_recovery_in_process() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
|
|
// Setup: trigger the residual (sidecar persists; manifest unchanged).
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
let err = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
|
|
"unexpected error: {err}"
|
|
);
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
assert_eq!(
|
|
std::fs::read_dir(&recovery_dir).unwrap().count(),
|
|
1,
|
|
"exactly one sidecar must persist after the finalize failure"
|
|
);
|
|
}
|
|
|
|
// Recovery: explicit refresh runs roll-forward-only recovery
|
|
// in-process — no restart needed. Sidecar finds the Person drift,
|
|
// classifies RolledPastExpected, rolls forward via publisher CAS,
|
|
// and deletes the sidecar.
|
|
db.refresh().await.expect("refresh must succeed");
|
|
|
|
// Sidecar must be gone — refresh-time recovery rolled it forward.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
if recovery_dir.exists() {
|
|
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert!(
|
|
remaining.is_empty(),
|
|
"sidecar must be deleted by refresh-time roll-forward; remaining: {:?}",
|
|
remaining,
|
|
);
|
|
}
|
|
|
|
// Eve (the originally-attempted insert) is visible without restart.
|
|
let person_count = helpers::count_rows(&db, "node:Person").await;
|
|
assert_eq!(
|
|
person_count, 1,
|
|
"Eve must be visible after refresh-time roll-forward"
|
|
);
|
|
|
|
// A direct Person mutation also succeeds without ExpectedVersionMismatch.
|
|
mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
|
|
)
|
|
.await
|
|
.expect("Person insert must succeed after refresh-time recovery");
|
|
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
|
|
}
|
|
|
|
/// Refresh-time recovery must NOT call `Dataset::restore` — it can
|
|
/// silently orphan a concurrent writer's commit. Sidecars that would
|
|
/// require rollback must be left on disk for the next ReadWrite open.
|
|
///
|
|
/// Setup: synthesize a sidecar that would classify as `UnexpectedAtP1`
|
|
/// (rollback territory) — strict-match Mutation kind with
|
|
/// expected_version != manifest_pinned. Trigger refresh and assert:
|
|
/// sidecar still on disk, Lance HEAD unchanged (no restore commit).
|
|
/// Then drop + open: full sweep handles it.
|
|
#[tokio::test]
|
|
async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Bootstrap.
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Capture Person's full URI and manifest pin.
|
|
let snapshot = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
let entry = snapshot.entry("node:Person").unwrap();
|
|
let person_uri = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
|
|
let manifest_pin = entry.table_version;
|
|
|
|
// Drift Person's Lance HEAD ahead of the manifest pin (without
|
|
// touching the manifest) so the classifier can reach UnexpectedAtP1
|
|
// / UnexpectedMultistep / RolledPastExpected paths that require
|
|
// a real restore on rollback.
|
|
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
helpers::lance_delete_inline(&mut ds, "1 = 2").await;
|
|
let head_after_drift = ds.version().version;
|
|
assert_eq!(head_after_drift, manifest_pin + 1);
|
|
|
|
// Synthesize a sidecar with expected_version that DOES NOT match
|
|
// the current manifest pin AND post_commit_pin == lance_head →
|
|
// strict Mutation classifier sees lance_head == manifest_pinned + 1
|
|
// but expected != manifest_pinned → UnexpectedAtP1. decide → RollBack.
|
|
//
|
|
// expected_version must be a REAL Lance version (`restore_table_to_version`
|
|
// calls `checkout_version` on it, and an unknown version errors). Use
|
|
// manifest_pin - 1 which exists from the bootstrap commit chain.
|
|
let bogus_expected = manifest_pin - 1;
|
|
let bogus_post = head_after_drift;
|
|
let sidecar_json = format!(
|
|
r#"{{
|
|
"schema_version": 1,
|
|
"operation_id": "01H0000000000000000000RBCK",
|
|
"started_at": "0",
|
|
"branch": null,
|
|
"actor_id": "act-rollback",
|
|
"writer_kind": "Mutation",
|
|
"tables": [
|
|
{{
|
|
"table_key":"node:Person",
|
|
"table_path":"{}",
|
|
"expected_version":{},
|
|
"post_commit_pin":{}
|
|
}}
|
|
]
|
|
}}"#,
|
|
person_uri, bogus_expected, bogus_post,
|
|
);
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
std::fs::create_dir_all(&recovery_dir).unwrap();
|
|
std::fs::write(
|
|
recovery_dir.join("01H0000000000000000000RBCK.json"),
|
|
&sidecar_json,
|
|
)
|
|
.unwrap();
|
|
|
|
// Capture pre-refresh Lance HEAD on Person.
|
|
let pre_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
|
|
// Trigger refresh-time recovery directly. Sidecar is rollback-
|
|
// eligible (UnexpectedAtP1); RollForwardOnly mode defers it,
|
|
// leaving the sidecar on disk and Lance HEAD unchanged on Person.
|
|
db.refresh()
|
|
.await
|
|
.expect("refresh must succeed (deferring rollback)");
|
|
|
|
// Sidecar still on disk.
|
|
assert_eq!(
|
|
std::fs::read_dir(&recovery_dir).unwrap().count(),
|
|
1,
|
|
"rollback-eligible sidecar must be deferred to next ReadWrite open",
|
|
);
|
|
|
|
// Lance HEAD on Person unchanged — no restore ran.
|
|
let post_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
assert_eq!(
|
|
pre_head, post_head,
|
|
"refresh-time recovery must NOT call Dataset::restore on Person; \
|
|
pre_head={pre_head}, post_head={post_head}",
|
|
);
|
|
|
|
// Cross-check: drop the engine and reopen — full sweep handles
|
|
// the rollback (will use Dataset::restore safely; no concurrent
|
|
// writers at open time).
|
|
drop(db);
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
// After full-sweep recovery, the sidecar should be processed
|
|
// (deleted). Sidecar's tables are eligible for rollback (UnexpectedAtP1):
|
|
// restore happens on Person (HEAD advances by 1).
|
|
let remaining = if recovery_dir.exists() {
|
|
std::fs::read_dir(&recovery_dir).unwrap().count()
|
|
} else {
|
|
0
|
|
};
|
|
assert_eq!(
|
|
remaining, 0,
|
|
"full sweep at next open must process the deferred sidecar",
|
|
);
|
|
let final_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
assert!(
|
|
final_head > post_head,
|
|
"full sweep must run Dataset::restore (head advances); \
|
|
post_head={post_head}, final_head={final_head}",
|
|
);
|
|
// Convergence: roll-back published the restored HEAD, so the manifest pin
|
|
// tracks Lance HEAD afterward (no residual drift).
|
|
let entry_version = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap()
|
|
.entry("node:Person")
|
|
.unwrap()
|
|
.table_version;
|
|
assert_eq!(
|
|
entry_version, final_head,
|
|
"full-sweep roll-back must publish so manifest pin ({entry_version}) == Lance HEAD ({final_head})",
|
|
);
|
|
}
|
|
|
|
/// Companion to the above — confirms that a finalize→publisher failure
|
|
/// on one table leaves OTHER tables untouched. Subsequent writes to
|
|
/// non-drifted tables proceed normally; the drift is contained.
|
|
#[tokio::test]
|
|
async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
|
|
.await
|
|
.unwrap();
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
let _ = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.expect_err("synthetic failpoint must fire");
|
|
}
|
|
|
|
// node:Person drifted. node:Company didn't — try a Company write.
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type": "Company", "data": {"name": "Acme"}}"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.expect("Company write on a non-drifted table should succeed");
|
|
}
|
|
|
|
/// Acceptance test: a stage-step failure in the staged-index path
|
|
/// (`stage_create_btree_index` succeeded; `commit_staged` not yet
|
|
/// called) leaves NO Lance-HEAD drift on the existing tables.
|
|
/// Subsequent operations against those tables succeed without
|
|
/// `ExpectedVersionMismatch`.
|
|
///
|
|
/// Path: `apply_schema(v1 → v2)` adds a new node type. The
|
|
/// `added_tables` loop in `schema_apply` creates the empty dataset and
|
|
/// then calls `build_indices_on_dataset_for_catalog` →
|
|
/// `stage_and_commit_btree(..., &["id"])`. The failpoint fires
|
|
/// between `stage_create_btree_index` and `commit_staged`, so the
|
|
/// staged segments are written under `_indices/<uuid>/` but Lance HEAD
|
|
/// on the new dataset is unchanged at v=1. The schema-apply lock
|
|
/// branch is released by `apply_schema`'s outer match. Existing
|
|
/// tables (e.g. `node:Person`) are completely untouched by the new
|
|
/// node's added_tables iteration — they're outside the failed apply
|
|
/// path entirely — and we assert that mutations against them continue
|
|
/// to work.
|
|
///
|
|
/// The orphan empty dataset from the failed apply is acceptable
|
|
/// residual: it's unreferenced by `__manifest` and will be reclaimed
|
|
/// by `cleanup_old_versions` (or removed when a future apply at the
|
|
/// same target path resolves the rename).
|
|
#[tokio::test]
|
|
async fn ensure_indices_stage_btree_failure_leaves_existing_tables_writable() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Init with TEST_SCHEMA which declares Person + Knows. Indices on
|
|
// those tables get built during init.
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
|
|
// Apply a schema that adds a new node type. The added_tables loop
|
|
// will hit the failpoint between stage and commit on the new
|
|
// node:Project table's btree-on-id build. (TEST_SCHEMA already
|
|
// has Person + Company + Knows + WorksAt — pick a name that isn't
|
|
// already declared.)
|
|
let extended_schema = format!(
|
|
"{}\nnode Project {{ name: String @key }}\n",
|
|
helpers::TEST_SCHEMA
|
|
);
|
|
|
|
{
|
|
let _failpoint =
|
|
ScopedFailPoint::new("ensure_indices.post_stage_pre_commit_btree", "return");
|
|
let err = db.apply_schema(&extended_schema).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("ensure_indices.post_stage_pre_commit_btree"),
|
|
"schema apply should fail with the synthetic failpoint error, got: {err}"
|
|
);
|
|
}
|
|
|
|
// Existing tables stayed at their pre-apply versions; subsequent
|
|
// mutations against them succeed (no Lance-HEAD drift).
|
|
mutate_main(
|
|
&mut db,
|
|
helpers::MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.expect("Person mutation must succeed after the failed schema apply — existing tables are not drifted");
|
|
}
|
|
|
|
fn assert_no_staging_files(graph: &std::path::Path) {
|
|
for name in [
|
|
"_schema.pg.staging",
|
|
"_schema.ir.json.staging",
|
|
"__schema_state.json.staging",
|
|
] {
|
|
let path = graph.join(name);
|
|
assert!(
|
|
!path.exists(),
|
|
"staging file {} still exists after recovery",
|
|
path.display()
|
|
);
|
|
}
|
|
}
|
|
|
|
// =====================================================================
|
|
// Per-writer Phase B → Phase C recovery integration
|
|
// =====================================================================
|
|
//
|
|
// Each of the four migrated writers writes a sidecar BEFORE its
|
|
// per-table commit_staged loop and deletes it AFTER the manifest
|
|
// publish. The `recovery_rolls_forward_after_finalize_publisher_failure`
|
|
// test above covers MutationStaging::finalize. The three tests below
|
|
// cover the other three writers: schema_apply, branch_merge,
|
|
// ensure_indices.
|
|
//
|
|
// Each follows the same shape: trigger the writer with a failpoint
|
|
// active in the Phase B → Phase C window, drop the engine, reopen,
|
|
// assert recovery rolled forward (manifest pin advanced, audit row
|
|
// recorded, sidecar deleted) and a follow-up operation succeeds without
|
|
// ExpectedVersionMismatch.
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_without_schema_staging_rolls_back_on_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let pre_failure_version = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
version_main(&db).await.unwrap()
|
|
};
|
|
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.before_staging_write", "return");
|
|
let v2_schema = r#"node Person {
|
|
name: String @key
|
|
age: I32?
|
|
city: String?
|
|
}
|
|
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
|
|
node Tag {
|
|
label: String @key
|
|
}
|
|
|
|
edge Knows: Person -> Person {
|
|
since: Date?
|
|
}
|
|
|
|
edge WorksAt: Person -> Company
|
|
"#;
|
|
let err = db.apply_schema(v2_schema).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.before_staging_write"),
|
|
"unexpected error: {err}"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
// Roll-back now publishes the restored version, so the manifest version
|
|
// advances — but to the OLD-schema content: the migration never applied
|
|
// (asserted by count_rows + the `_schema.pg` checks below), and the sweep
|
|
// converges (`manifest == Lance HEAD`, asserted by
|
|
// assert_post_recovery_invariants's RolledBack arm).
|
|
assert!(
|
|
version_main(&db).await.unwrap() > pre_failure_version,
|
|
"roll-back publishes the restored (old-schema) version, advancing the manifest; \
|
|
pre={pre_failure_version}",
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
1,
|
|
"old-schema data must remain readable after rollback"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledBack {
|
|
tables: vec![TableExpectation::main("node:Person")],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
|
|
assert!(
|
|
!live_schema.contains("city: String?"),
|
|
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
|
|
);
|
|
assert!(
|
|
!live_schema.contains("node Tag"),
|
|
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_phase_b_failure_recovered_on_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
|
|
// Seed: a Person table with one row so the schema-apply rewritten_tables
|
|
// loop has actual work to do.
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Capture pre-failure manifest version so we can assert the recovery
|
|
// sweep advances it.
|
|
let pre_failure_version = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
version_main(&db).await.unwrap()
|
|
};
|
|
|
|
// Setup: trigger the residual via `schema_apply.after_staging_write`.
|
|
// This failpoint fires AFTER the rewritten_tables/indexed_tables loops
|
|
// (Lance HEAD advanced) AND AFTER the schema-state staging files are
|
|
// written, but BEFORE the manifest publish. The recovery sidecar persists.
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
|
|
// v2 schema: add a `city` property to Person AND add a new
|
|
// `Tag` node type. The new property triggers the rewritten_tables
|
|
// path (Phase B sidecar coverage). The new type changes the
|
|
// overall table set — required to keep `recover_schema_state_files`
|
|
// (which runs BEFORE recover_manifest_drift) happy: it can't
|
|
// disambiguate property-only migrations and would reject the
|
|
// open before the recovery sweep ever ran.
|
|
let v2_schema = r#"node Person {
|
|
name: String @key
|
|
age: I32?
|
|
city: String?
|
|
}
|
|
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
|
|
node Tag {
|
|
label: String @key
|
|
}
|
|
|
|
edge Knows: Person -> Person {
|
|
since: Date?
|
|
}
|
|
|
|
edge WorksAt: Person -> Company
|
|
"#;
|
|
let err = db.apply_schema(v2_schema).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
// Sidecar must still exist.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert_eq!(
|
|
sidecars.len(),
|
|
1,
|
|
"exactly one sidecar must persist after schema_apply failure"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
// Recovery: reopen runs the recovery sweep. Sidecar's writer_kind is
|
|
// SchemaApply (loose-match) — classifier accepts the multi-commit
|
|
// drift on Person, decision is RollForward, manifest extends to the
|
|
// current Lance HEAD.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
// Recovery sweep must have advanced the manifest pin on the rewritten
|
|
// table: roll-forward published the post-failure Lance HEAD.
|
|
let post_recovery_version = version_main(&db).await.unwrap();
|
|
assert!(
|
|
post_recovery_version > pre_failure_version,
|
|
"manifest version must advance post-recovery; pre={pre_failure_version}, \
|
|
post={post_recovery_version}",
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![TableExpectation::main("node:Person")],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Schema-apply atomicity: the live `_schema.pg` must reflect the
|
|
// NEW schema (city column on Person, Tag node type) — not the old.
|
|
// Without the schema-staging coordination, the schema-state
|
|
// recovery would have deleted the staging files (because manifest
|
|
// hadn't advanced when it ran), leaving a corrupt graph with new-
|
|
// schema data on disk but old-schema catalog.
|
|
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
|
|
assert!(
|
|
live_schema.contains("city: String?"),
|
|
"_schema.pg must reflect the NEW schema (city column added); got:\n{live_schema}",
|
|
);
|
|
assert!(
|
|
live_schema.contains("node Tag"),
|
|
"_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}",
|
|
);
|
|
|
|
// Catalog ↔ manifest agreement: the new `node:Tag` type the schema
|
|
// declares must have a manifest entry the engine can read against.
|
|
// Without registrations / tombstones in the sidecar, recovery's
|
|
// `roll_forward_all` only publishes Updates for rewritten tables;
|
|
// added tables (Tag) end up as orphan datasets on disk with no
|
|
// manifest entry, and the live schema declares a type the manifest
|
|
// doesn't know about.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let tag_rows = helpers::count_rows(&db, "node:Tag").await;
|
|
assert_eq!(
|
|
tag_rows, 0,
|
|
"node:Tag must have a manifest entry (with 0 rows) post-recovery; \
|
|
a panic here means recovery failed to register the added table"
|
|
);
|
|
}
|
|
|
|
/// `optimize` Phase B → Phase C residual: `compact_files` advanced the Lance
|
|
/// HEAD but the manifest publish hasn't run. The `Optimize` recovery sidecar
|
|
/// (loose-match, like SchemaApply/EnsureIndices) must roll the compacted version
|
|
/// forward on next open so the manifest tracks the Lance HEAD — and the healed
|
|
/// table must then accept a schema apply (the original bug's victim).
|
|
#[tokio::test]
|
|
async fn optimize_phase_b_failure_recovered_on_next_open() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
|
|
// Seed: several separate Person inserts → multiple fragments, so compaction
|
|
// has real work and advances the Lance HEAD.
|
|
{
|
|
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
|
|
db.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", name)], &[("$age", age)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
}
|
|
|
|
let pre_failure_version = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
version_main(&db).await.unwrap()
|
|
};
|
|
|
|
// Failpoint fires AFTER compact_files advanced the Lance HEAD but BEFORE the
|
|
// manifest publish. The Optimize sidecar persists (only node:Person has
|
|
// compactable fragments, so exactly one sidecar is written).
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("optimize.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db.optimize().await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: optimize.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert_eq!(
|
|
sidecars.len(),
|
|
1,
|
|
"exactly one Optimize sidecar must persist after optimize failure"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
// Recovery: reopen runs the sweep. The Optimize sidecar classifies
|
|
// RolledPastExpected (loose-match) → RollForward → manifest extends to the
|
|
// compacted Lance HEAD.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let post_recovery_version = version_main(&db).await.unwrap();
|
|
assert!(
|
|
post_recovery_version > pre_failure_version,
|
|
"manifest version must advance post-recovery (compaction rolled forward); \
|
|
pre={pre_failure_version}, post={post_recovery_version}",
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![TableExpectation::main("node:Person")],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// The healed table accepts an additive schema apply — its HEAD-vs-manifest
|
|
// precondition is satisfied because recovery published the compacted version.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let desired = helpers::TEST_SCHEMA.replace(
|
|
" age: I32?\n}",
|
|
" age: I32?\n nickname: String?\n}",
|
|
);
|
|
db.apply_schema(&desired)
|
|
.await
|
|
.expect("schema apply after optimize recovery must succeed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn branch_merge_phase_b_failure_recovered_on_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Seed main with a row, branch off, mutate BOTH sides so the merge
|
|
// produces at least one `RewriteMerged` candidate (target moved past
|
|
// base too — required for the recovery sidecar to pin anything; the
|
|
// sidecar only pins RewriteMerged candidates because they're the
|
|
// only path that always advances Lance HEAD via
|
|
// `publish_rewritten_merge_table`).
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("feature").await.unwrap();
|
|
db.mutate(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
// Mutate main too so the merge sees target ≠ base for Person —
|
|
// forces RewriteMerged classification.
|
|
mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Capture pre-failure state on main for post-recovery comparison.
|
|
let pre_failure_version = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
version_main(&db).await.unwrap()
|
|
};
|
|
|
|
// Setup: failpoint fires after the per-table publish loop completes
|
|
// but before commit_manifest_updates. Sidecar persists.
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db.branch_merge("feature", "main").await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert_eq!(
|
|
sidecars.len(),
|
|
1,
|
|
"exactly one sidecar must persist after branch_merge failure"
|
|
);
|
|
}
|
|
|
|
// Recovery: reopen runs the sweep. BranchMerge uses LOOSE
|
|
// classification — `publish_rewritten_merge_table` runs multiple
|
|
// commit_staged calls per table (stage_merge_insert + delete_where +
|
|
// index rebuilds), so post_commit_pin in the sidecar is a lower
|
|
// bound; the loose-match classifier accepts any HEAD > expected_version
|
|
// when expected_version == manifest_pinned.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
if recovery_dir.exists() {
|
|
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert!(
|
|
remaining.is_empty(),
|
|
"sidecar must be deleted; remaining: {:?}",
|
|
remaining,
|
|
);
|
|
}
|
|
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
|
|
assert!(
|
|
audit_dir.exists(),
|
|
"_graph_commit_recoveries.lance must exist after branch_merge recovery"
|
|
);
|
|
|
|
// Recovery must have advanced main's manifest pin (the merge published).
|
|
let post_recovery_version = version_main(&db).await.unwrap();
|
|
assert!(
|
|
post_recovery_version > pre_failure_version,
|
|
"manifest version must advance post-recovery; pre={pre_failure_version}, \
|
|
post={post_recovery_version}",
|
|
);
|
|
|
|
// The recovered branch_merge must record a MERGE commit (with
|
|
// `merged_parent_commit_id` set), not a plain commit. Without
|
|
// this, future merges between the same pair lose
|
|
// already-up-to-date detection. We verify by reading
|
|
// `_graph_commits.lance` and asserting the most recent commit
|
|
// tagged with the recovery actor has a non-null
|
|
// `merged_parent_commit_id`.
|
|
{
|
|
use arrow_array::{Array, StringArray};
|
|
use futures::TryStreamExt;
|
|
let commits_dir = dir.path().join("_graph_commits.lance");
|
|
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
|
|
.await
|
|
.unwrap();
|
|
let batches: Vec<arrow_array::RecordBatch> = ds
|
|
.scan()
|
|
.try_into_stream()
|
|
.await
|
|
.unwrap()
|
|
.try_collect()
|
|
.await
|
|
.unwrap();
|
|
let mut found_recovery_merge = false;
|
|
for batch in batches {
|
|
let merged = batch
|
|
.column_by_name("merged_parent_commit_id")
|
|
.expect("merged_parent_commit_id column present")
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.expect("merged_parent_commit_id is Utf8");
|
|
// The actor_id lives in _graph_commit_actors; cross-checking
|
|
// is heavier than necessary. Detecting any non-null
|
|
// merged_parent_commit_id in the post-recovery state is
|
|
// sufficient: only a recovered branch_merge can produce one
|
|
// here (we never completed a normal merge in this test).
|
|
for i in 0..merged.len() {
|
|
if !merged.is_null(i) {
|
|
found_recovery_merge = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
assert!(
|
|
found_recovery_merge,
|
|
"recovered branch_merge must record `merged_parent_commit_id` so future \
|
|
merges detect already-up-to-date — no merge-parent-tagged commit found",
|
|
);
|
|
}
|
|
drop(db);
|
|
}
|
|
|
|
/// Branch-axis variant of the branch_merge recovery test: target is a
|
|
/// non-main branch. Catches the branch-specific commit-graph head bug
|
|
/// (D2) — without `CommitGraph::open_at_branch`, the recovery sweep
|
|
/// would record the global head as the merge parent on a non-main
|
|
/// target, and future merges between the same pair would lose
|
|
/// already-up-to-date detection.
|
|
#[tokio::test]
|
|
async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
let target_parent_commit_id;
|
|
|
|
// Setup:
|
|
// main: alice
|
|
// target_branch (off main): + bob (target moved past base)
|
|
// source_branch (off main): + carol (source moved past base)
|
|
// Merge: source_branch → target_branch
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("target_branch").await.unwrap();
|
|
db.mutate(
|
|
"target_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("source_branch").await.unwrap();
|
|
db.mutate(
|
|
"source_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let main_person_pin = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
db.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap()
|
|
.entry("node:Person")
|
|
.expect("main must have Person")
|
|
.table_version
|
|
};
|
|
target_parent_commit_id = branch_head_commit_id(dir.path(), "target_branch")
|
|
.await
|
|
.unwrap();
|
|
|
|
// Setup: failpoint fires after the per-table publish loop completes
|
|
// but before commit_manifest_updates. Sidecar persists with
|
|
// branch=Some("target_branch").
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db
|
|
.branch_merge("source_branch", "target_branch")
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecar_count = std::fs::read_dir(&recovery_dir).unwrap().count();
|
|
assert_eq!(
|
|
sidecar_count, 1,
|
|
"exactly one sidecar must persist after non-main branch_merge failure"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
// Recovery: reopen runs full sweep. The BranchMerge sidecar's branch
|
|
// = Some("target_branch"); D2 fix opens a per-branch CommitGraph
|
|
// for the audit append so the merge-parent linkage is correct.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![
|
|
TableExpectation::branch("node:Person", "target_branch")
|
|
.expected_main_manifest_pin(main_person_pin)
|
|
.expected_recovery_parent_commit_id(target_parent_commit_id),
|
|
],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
/// Contract: the BranchMerge sidecar's per-table `table_branch` MUST be
|
|
/// the merge target branch (where commits land via
|
|
/// `publish_rewritten_merge_table` → `open_for_mutation` → potentially
|
|
/// `fork_dataset_from_entry_state`), NOT `entry.table_branch` (where
|
|
/// the table currently lives in the target's manifest snapshot).
|
|
///
|
|
/// `ensure_indices_for_branch` already has this invariant pinned by an
|
|
/// explicit comment at `table_ops.rs:115-120`. Without the same fix in
|
|
/// `merge.rs`, a future change to candidate selection or the publish
|
|
/// path that produces a `RewriteMerged` whose entry.table_branch
|
|
/// diverges from active_branch would silently drift Lance HEAD on the
|
|
/// target ref while recovery checks the wrong ref and no-ops the
|
|
/// rollback.
|
|
///
|
|
/// This test reads the sidecar JSON directly and asserts every per-pin
|
|
/// `table_branch` equals the active (target) branch. Even when the
|
|
/// values happen to coincide in practice (the strict candidate logic
|
|
/// keeps RewriteMerged tables on active_branch), the contract assertion
|
|
/// catches a regression that reverts to `entry.table_branch.clone()`.
|
|
#[tokio::test]
|
|
async fn branch_merge_sidecar_pins_table_branch_to_active_branch() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("target_branch").await.unwrap();
|
|
db.mutate(
|
|
"target_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("source_branch").await.unwrap();
|
|
db.mutate(
|
|
"source_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
|
|
let _ = db
|
|
.branch_merge("source_branch", "target_branch")
|
|
.await
|
|
.expect_err("failpoint must fire");
|
|
}
|
|
|
|
let operation_id = single_sidecar_operation_id(dir.path());
|
|
let sidecar_path = dir
|
|
.path()
|
|
.join("__recovery")
|
|
.join(format!("{operation_id}.json"));
|
|
let sidecar_json = std::fs::read_to_string(&sidecar_path).unwrap();
|
|
let sidecar: serde_json::Value = serde_json::from_str(&sidecar_json).unwrap();
|
|
|
|
let tables = sidecar["tables"]
|
|
.as_array()
|
|
.expect("sidecar tables must be an array");
|
|
assert!(
|
|
!tables.is_empty(),
|
|
"sidecar must pin at least one RewriteMerged table — both branches mutated Person"
|
|
);
|
|
for pin in tables {
|
|
let table_branch = pin
|
|
.get("table_branch")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or_else(|| {
|
|
panic!(
|
|
"sidecar pin must record table_branch as the merge target (active_branch); \
|
|
got pin {pin:?}"
|
|
)
|
|
});
|
|
assert_eq!(
|
|
table_branch, "target_branch",
|
|
"sidecar pin must record `table_branch` as the merge target branch (where \
|
|
commits actually land via publish_rewritten_merge_table → open_for_mutation), \
|
|
NOT entry.table_branch from the target snapshot. See merge.rs filter_map and \
|
|
the rationale comment at table_ops.rs:115-120. Got pin: {pin:?}"
|
|
);
|
|
}
|
|
}
|
|
|
|
/// `ensure_indices` only writes a sidecar when at least one table
|
|
/// genuinely needs index work (per `needs_index_work_*` helpers in
|
|
/// `db/omnigraph/table_ops.rs`). When all tables are steady-state
|
|
/// (every declared index already built, or empty tables that the loop
|
|
/// skips), the sidecar is omitted entirely.
|
|
///
|
|
/// Test setup: `load_jsonl` auto-builds indices via
|
|
/// `prepare_updates_for_commit`. So after the load, every Person/Knows
|
|
/// index is built and Company is empty. `ensure_indices` correctly
|
|
/// produces zero pins → no sidecar. The failpoint still fires (it sits
|
|
/// after the loops), so the call returns Err — but no recovery state
|
|
/// persists. Reopen is a clean no-op.
|
|
///
|
|
/// Triggering an actual sidecar persistence requires bypassing
|
|
/// `load_jsonl`'s auto-build via raw `TableStore::append_batch` — the
|
|
/// helper-direct path. That's covered structurally by the
|
|
/// `needs_index_work_*` code path and the
|
|
/// `recovery_ensure_indices_handles_empty_tables` integration test.
|
|
#[tokio::test]
|
|
async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_needed() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Seed: load_jsonl auto-builds Person's indices via
|
|
// prepare_updates_for_commit. After this, ensure_indices has no
|
|
// work to do (steady state).
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
{"type":"Person","data":{"name":"bob","age":25}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Setup: trigger the failpoint. Steady-state ensure_indices
|
|
// produces zero sidecar pins (the helpers scope pins to tables
|
|
// that genuinely need work); no sidecar is written. The failpoint
|
|
// still fires, surfacing the Err.
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db.ensure_indices().await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
// KEY ASSERTION: no sidecar persists, because the helpers
|
|
// scope pins to tables that genuinely need work. Steady-state
|
|
// = no pins = no sidecar = no recovery state = zero open-time
|
|
// overhead.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = if recovery_dir.exists() {
|
|
std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect()
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
assert!(
|
|
sidecars.is_empty(),
|
|
"steady-state ensure_indices must not leave a sidecar; got {:?}",
|
|
sidecars,
|
|
);
|
|
}
|
|
|
|
// Recovery: reopen is a clean no-op (no sidecar to recover).
|
|
let _db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
if recovery_dir.exists() {
|
|
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert!(
|
|
remaining.is_empty(),
|
|
"sidecar must remain deleted; remaining: {:?}",
|
|
remaining,
|
|
);
|
|
}
|
|
// No audit row expected — no sidecar was processed.
|
|
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
|
|
assert!(
|
|
!audit_dir.exists(),
|
|
"_graph_commit_recoveries.lance must NOT exist when no sidecar was processed"
|
|
);
|
|
}
|
|
|
|
// ─── MR-668 PR 2a: Omnigraph::init cleanup on partial failure ──────────────
|
|
//
|
|
// `init_with_storage` writes three schema artifacts before invoking
|
|
// `GraphCoordinator::init`. Without cleanup, a failure between any of those
|
|
// steps left orphan files behind, making the URI unusable for a retry of
|
|
// `init` (it would refuse because `_schema.pg` already exists). The tests
|
|
// below pin: on failpoint trigger at each of the three phase boundaries,
|
|
// the three schema files are removed before the error is returned.
|
|
//
|
|
// Coverage note: the third boundary (`init.after_coordinator_init`) only
|
|
// asserts cleanup of the schema files. Lance per-type directories and
|
|
// `__manifest/` are NOT cleaned up — that requires a recursive
|
|
// `StorageAdapter::delete_prefix` primitive deferred along with
|
|
// `DELETE /graphs/{id}` (MR-668 PR 2b). The orphan Lance directories
|
|
// after a coordinator-init-phase failure are documented as a known
|
|
// limitation.
|
|
|
|
#[tokio::test]
|
|
async fn init_failpoint_after_schema_pg_written_cleans_up_schema_file() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let _failpoint = ScopedFailPoint::new("init.after_schema_pg_written", "return");
|
|
|
|
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
|
|
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
|
|
Err(e) => e,
|
|
};
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: init.after_schema_pg_written"),
|
|
"got: {err}"
|
|
);
|
|
|
|
// Only `_schema.pg` was written at this phase boundary, but the
|
|
// cleanup attempts all three — `delete` treats not-found as Ok,
|
|
// so the other two deletes are no-ops.
|
|
assert!(
|
|
!dir.path().join("_schema.pg").exists(),
|
|
"_schema.pg must be cleaned up after init failure"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn init_failpoint_after_schema_contract_written_cleans_up_all_schema_files() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let _failpoint = ScopedFailPoint::new("init.after_schema_contract_written", "return");
|
|
|
|
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
|
|
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
|
|
Err(e) => e,
|
|
};
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: init.after_schema_contract_written"),
|
|
"got: {err}"
|
|
);
|
|
|
|
assert!(
|
|
!dir.path().join("_schema.pg").exists(),
|
|
"_schema.pg must be cleaned up"
|
|
);
|
|
assert!(
|
|
!dir.path().join("_schema.ir.json").exists(),
|
|
"_schema.ir.json must be cleaned up"
|
|
);
|
|
assert!(
|
|
!dir.path().join("__schema_state.json").exists(),
|
|
"__schema_state.json must be cleaned up"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn init_failpoint_after_coordinator_init_cleans_up_schema_files() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let _failpoint = ScopedFailPoint::new("init.after_coordinator_init", "return");
|
|
|
|
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
|
|
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
|
|
Err(e) => e,
|
|
};
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: init.after_coordinator_init"),
|
|
"got: {err}"
|
|
);
|
|
|
|
// Schema files are cleaned up by `best_effort_cleanup_init_artifacts`.
|
|
assert!(
|
|
!dir.path().join("_schema.pg").exists(),
|
|
"_schema.pg must be cleaned up after late-phase init failure"
|
|
);
|
|
assert!(
|
|
!dir.path().join("_schema.ir.json").exists(),
|
|
"_schema.ir.json must be cleaned up after late-phase init failure"
|
|
);
|
|
assert!(
|
|
!dir.path().join("__schema_state.json").exists(),
|
|
"__schema_state.json must be cleaned up after late-phase init failure"
|
|
);
|
|
|
|
// Documented limitation: Lance per-type datasets and `__manifest/`
|
|
// created by `GraphCoordinator::init` are NOT cleaned up — recursive
|
|
// deletion requires the deferred `delete_prefix` primitive. This
|
|
// assertion does NOT check for their absence; it merely documents
|
|
// the boundary by noting we don't validate orphan directories here.
|
|
// When PR 2b lands, this test can be tightened to assert the graph
|
|
// root is fully empty.
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn init_failpoint_returns_original_error_not_cleanup_error() {
|
|
// The cleanup is best-effort. If `storage.delete` fails (e.g. transient
|
|
// network blip on S3), the original init failpoint error must still
|
|
// surface — not be masked by a cleanup failure. This test triggers the
|
|
// failpoint and asserts the returned error references the failpoint,
|
|
// not the cleanup. (The cleanup currently logs via `tracing::warn`;
|
|
// we can't easily fault-inject delete failures without another seam,
|
|
// so this is a smoke test for the precedence contract.)
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let _failpoint = ScopedFailPoint::new("init.after_schema_pg_written", "return");
|
|
|
|
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
|
|
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
|
|
Err(e) => e,
|
|
};
|
|
// Failpoint message wins; no "cleanup" substring expected.
|
|
let msg = err.to_string();
|
|
assert!(
|
|
msg.contains("init.after_schema_pg_written"),
|
|
"init error must surface the failpoint cause, got: {msg}"
|
|
);
|
|
}
|