mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
* tests: add lance_surface_guards pre-flight pins for the v6 bump
Land 8 named guards in a new test file that pin Lance API surfaces
OmniGraph relies on. Each guard turns a silent-break risk (variant
rename, struct restructure, async-flip) into a red CI bar instead of
runtime drift.
Guards (mapped to the silent-break inventory from the v6 migration plan):
Runtime (#[tokio::test]):
1. lance_error_too_much_write_contention_variant_exists — pins the
variant referenced by db/manifest/publisher.rs::map_lance_publish_error.
2. manifest_location_field_shape — pins .path/.size/.e_tag/.naming_scheme
types and ManifestLocation accessor returning &Self (the access
pattern at db/manifest/metadata.rs:84-88).
6. write_params_default_does_not_set_storage_version — confirms our
explicit V2_2 pin remains load-bearing (blob v2 requirement).
Compile-only async fns (#[allow(...)] + unimplemented!() placeholders;
never run, but cargo build --tests enforces the API shape):
3. checkout_version + restore chain — pins the recovery rollback hammer
at db/manifest/recovery.rs:505-522.
4. DatasetBuilder::from_namespace().with_branch().with_version().load()
— pins the namespace builder chain at db/manifest/namespace.rs:162-174.
5. MergeInsertBuilder fluent chain — pins the manifest CAS at
db/manifest/publisher.rs:370-391, including the return shape
(Arc<Dataset>, MergeStats).
7. compact_files(&mut ds, CompactionOptions, None) — pins
db/omnigraph/optimize.rs:107.
8. DeleteResult { new_dataset, num_deleted_rows } — pins the inline
delete result shape (MR-A will repurpose this guard to the staged
two-phase variant once Lance #6658 migration lands).
This is commit 1 of the chore/lance-6.0.1 migration. Cargo bump
follows in commit 2 (will trigger the guards under v6 if any surface
drifted).
Per the migration plan at ~/.claude/plans/shimmering-percolating-duckling.md
(written this session). Two guards from the plan deferred to follow-up:
- manifest_cas_returns_row_level_contention_variant (full publisher
race integration test — needs harness scaffolding)
- table_version_metadata_byte_compatible_with_v4 (TableVersionMetadata
is pub(crate); requires test reach extension).
Verified on v4: cargo test -p omnigraph-engine --test lance_surface_guards
passes 3/3 runtime tests; cargo build -p omnigraph-engine --tests
compiles all 5 compile-only guards clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* chore(deps): bump Lance 4.0.0 → 6.0.1, DataFusion 52 → 53, Arrow 57 → 58
The Cargo bump itself. Source is intentionally untouched — this commit
will not compile. The compile errors are the work-list for subsequent
commits on this branch.
Lance updates: lance + 7 sub-crates 4.0.0 → 6.0.1. Transitive churn:
+ lance-tokenizer v6.0.1 (vendored tokenizer per Lance PR #6512)
+ object_store 0.13.x (Lance 6 brings it transitively; our explicit
pin stays at 0.12.5 for now — revisit in stages if diamond bites)
- tantivy* crates (replaced by lance-tokenizer)
Compile error landscape on this commit (11 errors):
• 1× E0432: `lance_index::DatasetIndexExt` import (Lance PR #6280
moved it to lance::index). Sites: table_store.rs:20,
db/manifest.rs:37 (the second site was missed by the pre-flight
inventory).
• 8× E0599: `create_index_builder` / `load_indices` missing on
`lance::Dataset` — all downstream of the DatasetIndexExt move.
Once the import is corrected on table_store.rs and db/manifest.rs,
these resolve automatically.
• 2× E0063: missing field `is_only_declared` in `DescribeTableResponse`
initializer at db/manifest/namespace.rs:221, 364. New Lance
namespace field per the v5 namespace restructure (PR #6186).
Surface guards (lance_surface_guards.rs, commit d571fa8) all still
compile + the 3 runtime ones pass on v6 — none of the silent-break
surfaces drifted. That's the load-bearing observation: the publisher
CAS chain, ManifestLocation field shape, checkout_version/restore,
DatasetBuilder fluent chain, MergeInsertBuilder return shape,
WriteParams::default, compact_files signature, and DeleteResult
fields are all v6-stable.
Next commits address the 11 errors per the migration plan stages
3-8.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* imports: move DatasetIndexExt to lance::index (Lance PR #6280)
Lance 5.0 (PR #6280) moved `DatasetIndexExt` out of `lance-index` into
`lance::index`. `is_system_index` and `IndexType` stayed in `lance-index`.
Mechanical update of 6 import sites:
crates/omnigraph/src/table_store.rs:20 — split into two `use` lines
crates/omnigraph-server/tests/server.rs:10 — was traits::DatasetIndexExt
crates/omnigraph/tests/search.rs:6
crates/omnigraph/tests/branching.rs:7
crates/omnigraph/tests/failpoints.rs:467
crates/omnigraph-cli/tests/cli.rs:3 — was traits::DatasetIndexExt
All 9 E0599 cascading errors on .create_index_builder / .load_indices
resolve once the trait is back in scope.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* namespace: add is_only_declared field to DescribeTableResponse
Lance namespace 6.0.0 added `is_only_declared: Option<bool>` to
`DescribeTableResponse` (lance-namespace-reqwest-client 0.7+ via the
v5.0 namespace API restructure, Lance PR #6186). Set to `Some(false)`
because every table BranchManifestNamespace returns from describe_table
is materialized — the manifest snapshot only includes entries for
tables we've already opened via Dataset::open.
Two sites in db/manifest/namespace.rs (BranchManifestNamespace +
StagedTableNamespace impls of LanceNamespace::describe_table).
Closes the last two compile errors from the v6 bump in the engine lib.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* cargo: add lance to omnigraph-cli + omnigraph-server dev-deps
Stage 3 moved DatasetIndexExt imports from `lance-index` to `lance::index`
in the cli and server test crates. Both crates only had `lance-index`
in their dev-dependencies; add `lance` alongside so the new path
resolves.
This is the last compile-error fix from the v6 bump — `cargo build
--workspace --tests` is now green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* docs: refresh Lance alignment audit for v6.0.1; bump surveyed version
Per CLAUDE.md maintenance rule 2 (same-PR docs):
- docs/dev/lance.md: replace the v4.0.1 alignment audit stanza with
the v6.0.1 audit. Captures every v5/v6 finding from this PR (the
DatasetIndexExt move, DescribeTableResponse.is_only_declared,
MergeInsertBuilder return shape, ManifestLocation field shape,
LanceFileVersion::default flip, file-reader async, tokenizer
vendor, Lance #6658/#6666/#6877 status). Cross-references each
guard in tests/lance_surface_guards.rs.
- AGENTS.md: bump "Storage substrate: Lance 4.x" → "Lance 6.x".
Note: surveyed crate version stays at 0.4.2 — substrate version
bumps are independent of OmniGraph's release version.
- crates/omnigraph/src/storage_layer.rs: update the trait module-level
doc-comment to reflect that Lance #6658 closed 2026-05-14 and
delete_where two-phase migration is MR-A (the next follow-up).
#6666 stays open; create_vector_index inline residual stays.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* tests: silence clippy::diverging_sub_expression on compile-only guards
The five `_compile_*` async fns in lance_surface_guards.rs use
`let ds: Dataset = unimplemented!()` as a placeholder so type inference
can chase the method chain we want to pin, without ever running the
function. Clippy's `diverging_sub_expression` lint flags this pattern
because the RHS diverges; that's the entire point. Added to the
per-fn `#[allow(...)]` list, alongside dead_code / unreachable_code /
unused_variables / unused_mut already there.
No behavior change. cargo test -p omnigraph-engine --test
lance_surface_guards still 3/3 green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* docs: correct #6658 status — closed but API ships in Lance v7.x, not v6.0.1
The audit stanza in docs/dev/lance.md and the storage_layer.rs trait
doc-comment both implied the public DeleteBuilder::execute_uncommitted
API shipped with Lance 6.0.1. It did not. Issue #6658 closed
2026-05-14, but binary search across the release stream confirms:
v6.0.1 ❌ no pub async fn execute_uncommitted on DeleteBuilder
v6.1.0-rc.1 ❌
v7.0.0-beta.5 ❌
v7.0.0-beta.10 ✅ first appearance
v7.0.0-rc.1 ✅
So MR-A (delete two-phase migration) is gated on the Lance v7.x bump,
not on this PR. v7.0.0-rc.1 dropped 2026-05-21; GA likely within a
week.
No behavior change. Doc-only correction.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* ci(lib): bump recursion_limit to 256 — Lance 6 trait depth on Linux
Lance 6's heavier trait surface around futures/streams in storage_layer.rs's
staged-write API pushes the rustc trait-resolution recursion limit past
the default 128 on Linux builds. CI on PR #111 surfaced this in both
`Test Workspace` and `Test omnigraph-server --features aws`:
error: queries overflow the depth limit!
= help: consider increasing the recursion limit by adding a
`#![recursion_limit = "256"]` attribute to your crate (`omnigraph`)
= note: query depth increased by 130 when computing layout of
`{async block@crates/omnigraph/src/storage_layer.rs:697:5: 697:10}`
(The async block is `stage_create_btree_index`'s body — its return type
is several layers of `impl Future<Output=Result<StagedHandle>>` deep on
top of Lance's own builder return types.)
Local macOS builds happened to short-circuit before tripping the limit,
which is why this didn't surface during the v6 bump sequence. The fix
rustc itself suggests is one line at the crate root.
No behavior change. Revisit if a future Lance bump stops needing it.
Verified: `cargo build --locked -p omnigraph-server --features aws`
compiles clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1669 lines
60 KiB
Rust
1669 lines
60 KiB
Rust
#![cfg(feature = "failpoints")]
|
|
|
|
mod helpers;
|
|
|
|
use fail::FailScenario;
|
|
use futures::FutureExt;
|
|
use omnigraph::db::Omnigraph;
|
|
use omnigraph::failpoints::ScopedFailPoint;
|
|
|
|
use helpers::recovery::{
|
|
FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants,
|
|
branch_head_commit_id, single_sidecar_operation_id,
|
|
};
|
|
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main};
|
|
|
|
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
|
|
const SCHEMA_V2_ADDED_TYPE: &str =
|
|
"node Person { name: String @key }\nnode Company { name: String @key }\n";
|
|
|
|
fn node_table_uri(root: &str, type_name: &str) -> String {
|
|
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
|
|
for &b in type_name.as_bytes() {
|
|
hash ^= b as u64;
|
|
hash = hash.wrapping_mul(0x100_0000_01b3);
|
|
}
|
|
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn branch_create_failpoint_triggers() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let db = Omnigraph::init(uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return");
|
|
|
|
let err = db.branch_create("feature").await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: branch_create.after_manifest_branch_create")
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread")]
|
|
async fn graph_publish_failpoint_triggers_before_commit_append() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
|
|
.await
|
|
.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("graph_publish.before_commit_append", "return");
|
|
|
|
let err = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: graph_publish.before_commit_append")
|
|
);
|
|
}
|
|
|
|
// Atomic schema apply: schema apply writes staging files first, then commits
|
|
// the manifest, then renames staging → final. Tests below inject crashes at
|
|
// the two boundaries and assert that reopening the repo yields a consistent
|
|
// state.
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
|
|
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
|
|
"got: {}",
|
|
err
|
|
);
|
|
}
|
|
|
|
// Reopen. With the sidecar protocol, a Phase B → Phase C crash
|
|
// (per-table commit_staged done; manifest publish not yet) is
|
|
// recoverable: the sidecar's `additional_registrations` carries the
|
|
// intent to register `node:Company`, schema-state recovery promotes
|
|
// the staging files, and the manifest-drift sweep publishes the
|
|
// RegisterTable + Update so the manifest catches up to the schema
|
|
// the writer already declared. The orphan-dataset-on-disk-with-no-
|
|
// manifest-entry corruption that pre-this-protocol recoveries left
|
|
// behind is closed.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
db.schema_source().as_str(),
|
|
SCHEMA_V2_ADDED_TYPE,
|
|
"live schema must reflect the rolled-forward apply (Company added)"
|
|
);
|
|
assert_no_staging_files(dir.path());
|
|
// node:Company must be registered in the manifest (queryable);
|
|
// pre-protocol recoveries left it as an orphan dataset on disk.
|
|
let company_rows = helpers::count_rows(&db, "node:Company").await;
|
|
assert_eq!(
|
|
company_rows, 0,
|
|
"node:Company must have a manifest entry post-recovery"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_recovers_post_commit_crash() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
|
|
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.after_manifest_commit"),
|
|
"got: {}",
|
|
err
|
|
);
|
|
}
|
|
|
|
// Reopen — manifest is at the new version, so recovery sweep should
|
|
// complete the rename and the live schema matches v2.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(db.schema_source().as_str(), SCHEMA_V2_ADDED_TYPE);
|
|
assert_no_staging_files(dir.path());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_recovers_partial_rename() {
|
|
// Construct a partial-rename state: _schema.pg has been renamed in
|
|
// (matching v2), but _schema.ir.json.staging and __schema_state.json.staging
|
|
// were never renamed. Recovery should detect that the live source matches
|
|
// the staging state's hash and complete the remaining renames.
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
|
|
db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap();
|
|
}
|
|
|
|
// Simulate: one of the renames (the IR or state file) didn't complete by
|
|
// copying the live ir/state files back to their staging names.
|
|
std::fs::copy(
|
|
dir.path().join("_schema.ir.json"),
|
|
dir.path().join("_schema.ir.json.staging"),
|
|
)
|
|
.unwrap();
|
|
std::fs::copy(
|
|
dir.path().join("__schema_state.json"),
|
|
dir.path().join("__schema_state.json.staging"),
|
|
)
|
|
.unwrap();
|
|
|
|
// Reopen — recovery should complete the rename (overwriting final files
|
|
// with identical staging content) and remove the staging files.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(db.schema_source().as_str(), SCHEMA_V2_ADDED_TYPE);
|
|
assert_no_staging_files(dir.path());
|
|
}
|
|
|
|
/// Prove the recovery sweep closes the "finalize → publisher residual"
|
|
/// across one open cycle.
|
|
///
|
|
/// `MutationStaging::finalize` runs `commit_staged` per touched table
|
|
/// sequentially before the publisher commits the manifest. Lance has no
|
|
/// multi-dataset atomic commit primitive, so a failure between the
|
|
/// per-table staged commits and the manifest commit leaves Lance HEAD
|
|
/// advanced on the touched tables with no manifest update.
|
|
///
|
|
/// Closing the residual: finalize writes a sidecar at
|
|
/// `__recovery/{ulid}.json` BEFORE Phase B, the failpoint fires AFTER
|
|
/// finalize but BEFORE the publisher, the engine handle is dropped, and
|
|
/// the next `Omnigraph::open` runs the recovery sweep. The sweep
|
|
/// classifies every table in the sidecar as `RolledPastExpected` (Lance
|
|
/// HEAD == expected + 1, post_commit_pin matches), decides RollForward,
|
|
/// atomically extends every manifest pin via
|
|
/// `ManifestBatchPublisher::publish`, records an audit row, and deletes
|
|
/// the sidecar.
|
|
///
|
|
/// After this test passes:
|
|
/// - The originally-attempted insert ("Eve") is visible via a normal
|
|
/// query.
|
|
/// - The next mutation succeeds without `ExpectedVersionMismatch`.
|
|
/// - `_graph_commit_recoveries.lance` carries an audit row with
|
|
/// `recovery_kind=RolledForward` and the original sidecar's
|
|
/// `actor_id` in `recovery_for_actor`.
|
|
///
|
|
/// Continuous in-process recovery (no restart needed between failure
|
|
/// and recovery) is the goal of a future background reconciler.
|
|
#[tokio::test]
|
|
async fn recovery_rolls_forward_after_finalize_publisher_failure() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
|
|
// Setup: trigger the residual.
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
|
|
// The mutation's finalize completes (commit_staged advances Lance
|
|
// HEAD on node:Person AND writes a `__recovery/{ulid}.json`
|
|
// sidecar). Then the failpoint kicks in before the publisher's
|
|
// manifest commit, so the manifest pin stays at the pre-write
|
|
// version. The sidecar persists for the next-open recovery sweep.
|
|
let err = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
// Sidecar must still exist on disk for the recovery sweep to find.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert_eq!(
|
|
sidecars.len(),
|
|
1,
|
|
"exactly one sidecar should persist after the finalize failure"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
|
|
// Drop the failpoint scope and the engine handle.
|
|
}
|
|
|
|
// Recovery: reopen runs the recovery sweep. The sweep finds the
|
|
// sidecar, classifies node:Person as RolledPastExpected, decides
|
|
// RollForward, publishes the manifest update, records the audit
|
|
// row, deletes the sidecar.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
// The originally-attempted "Eve" insert is now visible — the recovery
|
|
// sweep extended the manifest pin to include the staged commit.
|
|
let person_count = helpers::count_rows(&db, "node:Person").await;
|
|
assert_eq!(
|
|
person_count, 1,
|
|
"exactly one person (Eve) must be visible after roll-forward"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![TableExpectation::main("node:Person").follow_up_mutation(
|
|
FollowUpMutation::new(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
|
|
),
|
|
)],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let person_count = helpers::count_rows(&db, "node:Person").await;
|
|
assert_eq!(
|
|
person_count, 2,
|
|
"Frank's insert must land normally after recovery"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn inline_delete_conflict_writes_sidecar_before_rejecting() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let db = helpers::init_and_load(&dir).await;
|
|
|
|
let pre_snapshot = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
let pre_person_pin = pre_snapshot.entry("node:Person").unwrap().table_version;
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
|
|
{
|
|
let _pause_delete = ScopedFailPoint::new("mutation.delete_node_pre_primary_delete", "pause");
|
|
let delete_params = helpers::params(&[("$name", "Alice")]);
|
|
let delete = db.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"remove_person",
|
|
&delete_params,
|
|
);
|
|
tokio::pin!(delete);
|
|
|
|
let mut concurrent_update_succeeded = false;
|
|
for _ in 0..50 {
|
|
if delete.as_mut().now_or_never().is_some() {
|
|
panic!("delete mutation completed before primary-delete failpoint was released");
|
|
}
|
|
let mut concurrent = Omnigraph::open_read_only(&uri).await.unwrap();
|
|
if mutate_main(
|
|
&mut concurrent,
|
|
MUTATION_QUERIES,
|
|
"set_age",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 26)]),
|
|
)
|
|
.await
|
|
.is_ok()
|
|
{
|
|
concurrent_update_succeeded = true;
|
|
break;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
|
}
|
|
assert!(concurrent_update_succeeded, "concurrent update must land while delete is paused");
|
|
fail::remove("mutation.delete_node_pre_primary_delete");
|
|
|
|
let err = delete.await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains("stale view of 'node:Person'")
|
|
|| err.to_string().contains("ExpectedVersionMismatch")
|
|
|| err.to_string().contains("expected version mismatch"),
|
|
"unexpected error: {err}",
|
|
);
|
|
}
|
|
|
|
let person_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
assert!(
|
|
person_head > pre_person_pin,
|
|
"primary inline delete must have advanced node:Person before rejecting"
|
|
);
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
4,
|
|
"manifest-conflicted delete must not remove net Person rows after recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "edge:Knows").await,
|
|
3,
|
|
"manifest-conflicted delete must not remove net Knows rows after recovery"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn recovery_rolls_forward_load_on_feature_branch() {
|
|
use omnigraph::loader::LoadMode;
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
let main_person_pin;
|
|
let feature_parent_commit_id;
|
|
|
|
{
|
|
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
db.branch_create("feature").await.unwrap();
|
|
db.mutate(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "BeforeLoad")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
main_person_pin = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap()
|
|
.entry("node:Person")
|
|
.expect("main must have Person")
|
|
.table_version;
|
|
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
|
|
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
let err = db
|
|
.load(
|
|
"feature",
|
|
r#"{"type":"Person","data":{"name":"FeatureLoad","age":41}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
|
|
"unexpected error: {err}"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
2,
|
|
"feature branch load row must be visible after recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
0,
|
|
"feature branch load recovery must not publish the row to main"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![
|
|
TableExpectation::branch("node:Person", "feature")
|
|
.expected_main_manifest_pin(main_person_pin)
|
|
.expected_recovery_parent_commit_id(feature_parent_commit_id)
|
|
.follow_up_mutation(FollowUpMutation::new(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
mixed_params(&[("$name", "AfterLoad")], &[("$age", 42)]),
|
|
)),
|
|
],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
3,
|
|
"follow-up feature mutation must succeed after load recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
0,
|
|
"follow-up feature mutation must not move main"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn recovery_rolls_forward_ensure_indices_on_feature_branch() {
|
|
use lance::index::DatasetIndexExt;
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
use omnigraph::table_store::TableStore;
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
let feature_parent_commit_id;
|
|
let main_person_pin;
|
|
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("feature").await.unwrap();
|
|
db.mutate(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "BeforeEnsure")], &[("$age", 42)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
main_person_pin = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap()
|
|
.entry("node:Person")
|
|
.expect("main must have Person")
|
|
.table_version;
|
|
|
|
// Make the feature branch's Person table genuinely need index work
|
|
// while keeping the manifest internally consistent. The test-only
|
|
// publisher deliberately skips the normal index-rebuild preparation;
|
|
// the failed writer below is still the real `ensure_indices_on`.
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
let store = TableStore::new(&uri);
|
|
let mut ds = store
|
|
.open_dataset_head(&person_uri, Some("feature"))
|
|
.await
|
|
.unwrap();
|
|
ds.drop_index("id_idx").await.unwrap();
|
|
let dropped_index_head = ds.version().version;
|
|
db.failpoint_publish_table_head_without_index_rebuild_for_test(
|
|
"feature",
|
|
"node:Person",
|
|
Some("feature"),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
let feature_snapshot = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(
|
|
feature_snapshot
|
|
.entry("node:Person")
|
|
.expect("feature must have Person")
|
|
.table_version,
|
|
dropped_index_head,
|
|
"test setup must publish the dropped-index table head before ensure_indices runs",
|
|
);
|
|
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
|
|
|
|
{
|
|
let _failpoint =
|
|
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db.ensure_indices_on("feature").await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
drop(db);
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
2,
|
|
"feature should see inherited alice plus recovered branch-local row"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
1,
|
|
"ensure_indices branch recovery must not move main"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![
|
|
TableExpectation::branch("node:Person", "feature")
|
|
.expected_main_manifest_pin(main_person_pin)
|
|
.expected_recovery_parent_commit_id(feature_parent_commit_id)
|
|
.follow_up_mutation(FollowUpMutation::new(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
mixed_params(&[("$name", "AfterEnsure")], &[("$age", 44)]),
|
|
)),
|
|
],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
helpers::count_rows_branch(&db, "feature", "node:Person").await,
|
|
3,
|
|
"follow-up feature mutation must succeed after ensure_indices recovery"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
1,
|
|
"follow-up feature mutation must not move main"
|
|
);
|
|
}
|
|
|
|
/// Refresh-time recovery (Option B): the in-process `Omnigraph::refresh`
|
|
/// runs roll-forward-only recovery, closing the long-running-server
|
|
/// residual without restart.
|
|
///
|
|
/// Setup: trigger `mutation.post_finalize_pre_publisher` once. The
|
|
/// sidecar persists. Without dropping the engine, call `db.refresh()`.
|
|
/// The post-condition: sidecar gone; Eve visible; subsequent mutation
|
|
/// on the same handle succeeds without restart and without
|
|
/// ExpectedVersionMismatch.
|
|
#[tokio::test]
|
|
async fn refresh_runs_roll_forward_recovery_in_process() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
|
|
// Setup: trigger the residual (sidecar persists; manifest unchanged).
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
let err = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
|
|
"unexpected error: {err}"
|
|
);
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
assert_eq!(
|
|
std::fs::read_dir(&recovery_dir).unwrap().count(),
|
|
1,
|
|
"exactly one sidecar must persist after the finalize failure"
|
|
);
|
|
}
|
|
|
|
// Recovery: explicit refresh runs roll-forward-only recovery
|
|
// in-process — no restart needed. Sidecar finds the Person drift,
|
|
// classifies RolledPastExpected, rolls forward via publisher CAS,
|
|
// and deletes the sidecar.
|
|
db.refresh().await.expect("refresh must succeed");
|
|
|
|
// Sidecar must be gone — refresh-time recovery rolled it forward.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
if recovery_dir.exists() {
|
|
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert!(
|
|
remaining.is_empty(),
|
|
"sidecar must be deleted by refresh-time roll-forward; remaining: {:?}",
|
|
remaining,
|
|
);
|
|
}
|
|
|
|
// Eve (the originally-attempted insert) is visible without restart.
|
|
let person_count = helpers::count_rows(&db, "node:Person").await;
|
|
assert_eq!(
|
|
person_count, 1,
|
|
"Eve must be visible after refresh-time roll-forward"
|
|
);
|
|
|
|
// A direct Person mutation also succeeds without ExpectedVersionMismatch.
|
|
mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
|
|
)
|
|
.await
|
|
.expect("Person insert must succeed after refresh-time recovery");
|
|
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
|
|
}
|
|
|
|
/// Refresh-time recovery must NOT call `Dataset::restore` — it can
|
|
/// silently orphan a concurrent writer's commit. Sidecars that would
|
|
/// require rollback must be left on disk for the next ReadWrite open.
|
|
///
|
|
/// Setup: synthesize a sidecar that would classify as `UnexpectedAtP1`
|
|
/// (rollback territory) — strict-match Mutation kind with
|
|
/// expected_version != manifest_pinned. Trigger refresh and assert:
|
|
/// sidecar still on disk, Lance HEAD unchanged (no restore commit).
|
|
/// Then drop + open: full sweep handles it.
|
|
#[tokio::test]
|
|
async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
use omnigraph::table_store::TableStore;
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Bootstrap.
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Capture Person's full URI and manifest pin.
|
|
let snapshot = db
|
|
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap();
|
|
let entry = snapshot.entry("node:Person").unwrap();
|
|
let person_uri = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
|
|
let manifest_pin = entry.table_version;
|
|
|
|
// Drift Person's Lance HEAD ahead of the manifest pin (without
|
|
// touching the manifest) so the classifier can reach UnexpectedAtP1
|
|
// / UnexpectedMultistep / RolledPastExpected paths that require
|
|
// a real restore on rollback.
|
|
let store = TableStore::new(&uri);
|
|
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
store
|
|
.delete_where(&person_uri, &mut ds, "1 = 2")
|
|
.await
|
|
.unwrap();
|
|
let head_after_drift = ds.version().version;
|
|
assert_eq!(head_after_drift, manifest_pin + 1);
|
|
|
|
// Synthesize a sidecar with expected_version that DOES NOT match
|
|
// the current manifest pin AND post_commit_pin == lance_head →
|
|
// strict Mutation classifier sees lance_head == manifest_pinned + 1
|
|
// but expected != manifest_pinned → UnexpectedAtP1. decide → RollBack.
|
|
//
|
|
// expected_version must be a REAL Lance version (`restore_table_to_version`
|
|
// calls `checkout_version` on it, and an unknown version errors). Use
|
|
// manifest_pin - 1 which exists from the bootstrap commit chain.
|
|
let bogus_expected = manifest_pin - 1;
|
|
let bogus_post = head_after_drift;
|
|
let sidecar_json = format!(
|
|
r#"{{
|
|
"schema_version": 1,
|
|
"operation_id": "01H0000000000000000000RBCK",
|
|
"started_at": "0",
|
|
"branch": null,
|
|
"actor_id": "act-rollback",
|
|
"writer_kind": "Mutation",
|
|
"tables": [
|
|
{{
|
|
"table_key":"node:Person",
|
|
"table_path":"{}",
|
|
"expected_version":{},
|
|
"post_commit_pin":{}
|
|
}}
|
|
]
|
|
}}"#,
|
|
person_uri, bogus_expected, bogus_post,
|
|
);
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
std::fs::create_dir_all(&recovery_dir).unwrap();
|
|
std::fs::write(
|
|
recovery_dir.join("01H0000000000000000000RBCK.json"),
|
|
&sidecar_json,
|
|
)
|
|
.unwrap();
|
|
|
|
// Capture pre-refresh Lance HEAD on Person.
|
|
let pre_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
|
|
// Trigger refresh-time recovery directly. Sidecar is rollback-
|
|
// eligible (UnexpectedAtP1); RollForwardOnly mode defers it,
|
|
// leaving the sidecar on disk and Lance HEAD unchanged on Person.
|
|
db.refresh()
|
|
.await
|
|
.expect("refresh must succeed (deferring rollback)");
|
|
|
|
// Sidecar still on disk.
|
|
assert_eq!(
|
|
std::fs::read_dir(&recovery_dir).unwrap().count(),
|
|
1,
|
|
"rollback-eligible sidecar must be deferred to next ReadWrite open",
|
|
);
|
|
|
|
// Lance HEAD on Person unchanged — no restore ran.
|
|
let post_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
assert_eq!(
|
|
pre_head, post_head,
|
|
"refresh-time recovery must NOT call Dataset::restore on Person; \
|
|
pre_head={pre_head}, post_head={post_head}",
|
|
);
|
|
|
|
// Cross-check: drop the engine and reopen — full sweep handles
|
|
// the rollback (will use Dataset::restore safely; no concurrent
|
|
// writers at open time).
|
|
drop(db);
|
|
let _db = Omnigraph::open(&uri).await.unwrap();
|
|
// After full-sweep recovery, the sidecar should be processed
|
|
// (deleted). Sidecar's tables are eligible for rollback (UnexpectedAtP1):
|
|
// restore happens on Person (HEAD advances by 1).
|
|
let remaining = if recovery_dir.exists() {
|
|
std::fs::read_dir(&recovery_dir).unwrap().count()
|
|
} else {
|
|
0
|
|
};
|
|
assert_eq!(
|
|
remaining, 0,
|
|
"full sweep at next open must process the deferred sidecar",
|
|
);
|
|
let final_head = lance::Dataset::open(&person_uri)
|
|
.await
|
|
.unwrap()
|
|
.version()
|
|
.version;
|
|
assert!(
|
|
final_head > post_head,
|
|
"full sweep must run Dataset::restore (head advances); \
|
|
post_head={post_head}, final_head={final_head}",
|
|
);
|
|
}
|
|
|
|
/// Companion to the above — confirms that a finalize→publisher failure
|
|
/// on one table leaves OTHER tables untouched. Subsequent writes to
|
|
/// non-drifted tables proceed normally; the drift is contained.
|
|
#[tokio::test]
|
|
async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
|
|
.await
|
|
.unwrap();
|
|
|
|
{
|
|
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
|
|
let _ = mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.expect_err("synthetic failpoint must fire");
|
|
}
|
|
|
|
// node:Person drifted. node:Company didn't — try a Company write.
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type": "Company", "data": {"name": "Acme"}}"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.expect("Company write on a non-drifted table should succeed");
|
|
}
|
|
|
|
/// Acceptance test: a stage-step failure in the staged-index path
|
|
/// (`stage_create_btree_index` succeeded; `commit_staged` not yet
|
|
/// called) leaves NO Lance-HEAD drift on the existing tables.
|
|
/// Subsequent operations against those tables succeed without
|
|
/// `ExpectedVersionMismatch`.
|
|
///
|
|
/// Path: `apply_schema(v1 → v2)` adds a new node type. The
|
|
/// `added_tables` loop in `schema_apply` creates the empty dataset and
|
|
/// then calls `build_indices_on_dataset_for_catalog` →
|
|
/// `stage_and_commit_btree(..., &["id"])`. The failpoint fires
|
|
/// between `stage_create_btree_index` and `commit_staged`, so the
|
|
/// staged segments are written under `_indices/<uuid>/` but Lance HEAD
|
|
/// on the new dataset is unchanged at v=1. The schema-apply lock
|
|
/// branch is released by `apply_schema`'s outer match. Existing
|
|
/// tables (e.g. `node:Person`) are completely untouched by the new
|
|
/// node's added_tables iteration — they're outside the failed apply
|
|
/// path entirely — and we assert that mutations against them continue
|
|
/// to work.
|
|
///
|
|
/// The orphan empty dataset from the failed apply is acceptable
|
|
/// residual: it's unreferenced by `__manifest` and will be reclaimed
|
|
/// by `cleanup_old_versions` (or removed when a future apply at the
|
|
/// same target path resolves the rename).
|
|
#[tokio::test]
|
|
async fn ensure_indices_stage_btree_failure_leaves_existing_tables_writable() {
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Init with TEST_SCHEMA which declares Person + Knows. Indices on
|
|
// those tables get built during init.
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
|
|
// Apply a schema that adds a new node type. The added_tables loop
|
|
// will hit the failpoint between stage and commit on the new
|
|
// node:Project table's btree-on-id build. (TEST_SCHEMA already
|
|
// has Person + Company + Knows + WorksAt — pick a name that isn't
|
|
// already declared.)
|
|
let extended_schema = format!(
|
|
"{}\nnode Project {{ name: String @key }}\n",
|
|
helpers::TEST_SCHEMA
|
|
);
|
|
|
|
{
|
|
let _failpoint =
|
|
ScopedFailPoint::new("ensure_indices.post_stage_pre_commit_btree", "return");
|
|
let err = db.apply_schema(&extended_schema).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("ensure_indices.post_stage_pre_commit_btree"),
|
|
"schema apply should fail with the synthetic failpoint error, got: {err}"
|
|
);
|
|
}
|
|
|
|
// Existing tables stayed at their pre-apply versions; subsequent
|
|
// mutations against them succeed (no Lance-HEAD drift).
|
|
mutate_main(
|
|
&mut db,
|
|
helpers::MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.expect("Person mutation must succeed after the failed schema apply — existing tables are not drifted");
|
|
}
|
|
|
|
fn assert_no_staging_files(repo: &std::path::Path) {
|
|
for name in [
|
|
"_schema.pg.staging",
|
|
"_schema.ir.json.staging",
|
|
"__schema_state.json.staging",
|
|
] {
|
|
let path = repo.join(name);
|
|
assert!(
|
|
!path.exists(),
|
|
"staging file {} still exists after recovery",
|
|
path.display()
|
|
);
|
|
}
|
|
}
|
|
|
|
// =====================================================================
|
|
// Per-writer Phase B → Phase C recovery integration
|
|
// =====================================================================
|
|
//
|
|
// Each of the four migrated writers writes a sidecar BEFORE its
|
|
// per-table commit_staged loop and deletes it AFTER the manifest
|
|
// publish. The `recovery_rolls_forward_after_finalize_publisher_failure`
|
|
// test above covers MutationStaging::finalize. The three tests below
|
|
// cover the other three writers: schema_apply, branch_merge,
|
|
// ensure_indices.
|
|
//
|
|
// Each follows the same shape: trigger the writer with a failpoint
|
|
// active in the Phase B → Phase C window, drop the engine, reopen,
|
|
// assert recovery rolled forward (manifest pin advanced, audit row
|
|
// recorded, sidecar deleted) and a follow-up operation succeeds without
|
|
// ExpectedVersionMismatch.
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_without_schema_staging_rolls_back_on_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let pre_failure_version = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
version_main(&db).await.unwrap()
|
|
};
|
|
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.before_staging_write", "return");
|
|
let v2_schema = r#"node Person {
|
|
name: String @key
|
|
age: I32?
|
|
city: String?
|
|
}
|
|
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
|
|
node Tag {
|
|
label: String @key
|
|
}
|
|
|
|
edge Knows: Person -> Person {
|
|
since: Date?
|
|
}
|
|
|
|
edge WorksAt: Person -> Company
|
|
"#;
|
|
let err = db.apply_schema(v2_schema).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.before_staging_write"),
|
|
"unexpected error: {err}"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
assert_eq!(
|
|
version_main(&db).await.unwrap(),
|
|
pre_failure_version,
|
|
"manifest must remain on the old schema when no schema staging files existed"
|
|
);
|
|
assert_eq!(
|
|
helpers::count_rows(&db, "node:Person").await,
|
|
1,
|
|
"old-schema data must remain readable after rollback"
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledBack {
|
|
tables: vec![TableExpectation::main("node:Person")],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
|
|
assert!(
|
|
!live_schema.contains("city: String?"),
|
|
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
|
|
);
|
|
assert!(
|
|
!live_schema.contains("node Tag"),
|
|
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_apply_phase_b_failure_recovered_on_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
|
|
// Seed: a Person table with one row so the schema-apply rewritten_tables
|
|
// loop has actual work to do.
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Capture pre-failure manifest version so we can assert the recovery
|
|
// sweep advances it.
|
|
let pre_failure_version = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
version_main(&db).await.unwrap()
|
|
};
|
|
|
|
// Setup: trigger the residual via `schema_apply.after_staging_write`.
|
|
// This failpoint fires AFTER the rewritten_tables/indexed_tables loops
|
|
// (Lance HEAD advanced) AND AFTER the schema-state staging files are
|
|
// written, but BEFORE the manifest publish. The recovery sidecar persists.
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
|
|
// v2 schema: add a `city` property to Person AND add a new
|
|
// `Tag` node type. The new property triggers the rewritten_tables
|
|
// path (Phase B sidecar coverage). The new type changes the
|
|
// overall table set — required to keep `recover_schema_state_files`
|
|
// (which runs BEFORE recover_manifest_drift) happy: it can't
|
|
// disambiguate property-only migrations and would reject the
|
|
// open before the recovery sweep ever ran.
|
|
let v2_schema = r#"node Person {
|
|
name: String @key
|
|
age: I32?
|
|
city: String?
|
|
}
|
|
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
|
|
node Tag {
|
|
label: String @key
|
|
}
|
|
|
|
edge Knows: Person -> Person {
|
|
since: Date?
|
|
}
|
|
|
|
edge WorksAt: Person -> Company
|
|
"#;
|
|
let err = db.apply_schema(v2_schema).await.unwrap_err();
|
|
assert!(
|
|
err.to_string()
|
|
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
// Sidecar must still exist.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert_eq!(
|
|
sidecars.len(),
|
|
1,
|
|
"exactly one sidecar must persist after schema_apply failure"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
// Recovery: reopen runs the recovery sweep. Sidecar's writer_kind is
|
|
// SchemaApply (loose-match) — classifier accepts the multi-commit
|
|
// drift on Person, decision is RollForward, manifest extends to the
|
|
// current Lance HEAD.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
// Recovery sweep must have advanced the manifest pin on the rewritten
|
|
// table: roll-forward published the post-failure Lance HEAD.
|
|
let post_recovery_version = version_main(&db).await.unwrap();
|
|
assert!(
|
|
post_recovery_version > pre_failure_version,
|
|
"manifest version must advance post-recovery; pre={pre_failure_version}, \
|
|
post={post_recovery_version}",
|
|
);
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![TableExpectation::main("node:Person")],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Schema-apply atomicity: the live `_schema.pg` must reflect the
|
|
// NEW schema (city column on Person, Tag node type) — not the old.
|
|
// Without the schema-staging coordination, the schema-state
|
|
// recovery would have deleted the staging files (because manifest
|
|
// hadn't advanced when it ran), leaving a corrupt repo with new-
|
|
// schema data on disk but old-schema catalog.
|
|
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
|
|
assert!(
|
|
live_schema.contains("city: String?"),
|
|
"_schema.pg must reflect the NEW schema (city column added); got:\n{live_schema}",
|
|
);
|
|
assert!(
|
|
live_schema.contains("node Tag"),
|
|
"_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}",
|
|
);
|
|
|
|
// Catalog ↔ manifest agreement: the new `node:Tag` type the schema
|
|
// declares must have a manifest entry the engine can read against.
|
|
// Without registrations / tombstones in the sidecar, recovery's
|
|
// `roll_forward_all` only publishes Updates for rewritten tables;
|
|
// added tables (Tag) end up as orphan datasets on disk with no
|
|
// manifest entry, and the live schema declares a type the manifest
|
|
// doesn't know about.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let tag_rows = helpers::count_rows(&db, "node:Tag").await;
|
|
assert_eq!(
|
|
tag_rows, 0,
|
|
"node:Tag must have a manifest entry (with 0 rows) post-recovery; \
|
|
a panic here means recovery failed to register the added table"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn branch_merge_phase_b_failure_recovered_on_next_open() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Seed main with a row, branch off, mutate BOTH sides so the merge
|
|
// produces at least one `RewriteMerged` candidate (target moved past
|
|
// base too — required for the recovery sidecar to pin anything; the
|
|
// sidecar only pins RewriteMerged candidates because they're the
|
|
// only path that always advances Lance HEAD via
|
|
// `publish_rewritten_merge_table`).
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("feature").await.unwrap();
|
|
db.mutate(
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
// Mutate main too so the merge sees target ≠ base for Person —
|
|
// forces RewriteMerged classification.
|
|
mutate_main(
|
|
&mut db,
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Capture pre-failure state on main for post-recovery comparison.
|
|
let pre_failure_version = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
version_main(&db).await.unwrap()
|
|
};
|
|
|
|
// Setup: failpoint fires after the per-table publish loop completes
|
|
// but before commit_manifest_updates. Sidecar persists.
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db.branch_merge("feature", "main").await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert_eq!(
|
|
sidecars.len(),
|
|
1,
|
|
"exactly one sidecar must persist after branch_merge failure"
|
|
);
|
|
}
|
|
|
|
// Recovery: reopen runs the sweep. BranchMerge uses LOOSE
|
|
// classification — `publish_rewritten_merge_table` runs multiple
|
|
// commit_staged calls per table (stage_merge_insert + delete_where +
|
|
// index rebuilds), so post_commit_pin in the sidecar is a lower
|
|
// bound; the loose-match classifier accepts any HEAD > expected_version
|
|
// when expected_version == manifest_pinned.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
if recovery_dir.exists() {
|
|
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert!(
|
|
remaining.is_empty(),
|
|
"sidecar must be deleted; remaining: {:?}",
|
|
remaining,
|
|
);
|
|
}
|
|
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
|
|
assert!(
|
|
audit_dir.exists(),
|
|
"_graph_commit_recoveries.lance must exist after branch_merge recovery"
|
|
);
|
|
|
|
// Recovery must have advanced main's manifest pin (the merge published).
|
|
let post_recovery_version = version_main(&db).await.unwrap();
|
|
assert!(
|
|
post_recovery_version > pre_failure_version,
|
|
"manifest version must advance post-recovery; pre={pre_failure_version}, \
|
|
post={post_recovery_version}",
|
|
);
|
|
|
|
// The recovered branch_merge must record a MERGE commit (with
|
|
// `merged_parent_commit_id` set), not a plain commit. Without
|
|
// this, future merges between the same pair lose
|
|
// already-up-to-date detection. We verify by reading
|
|
// `_graph_commits.lance` and asserting the most recent commit
|
|
// tagged with the recovery actor has a non-null
|
|
// `merged_parent_commit_id`.
|
|
{
|
|
use arrow_array::{Array, StringArray};
|
|
use futures::TryStreamExt;
|
|
let commits_dir = dir.path().join("_graph_commits.lance");
|
|
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
|
|
.await
|
|
.unwrap();
|
|
let batches: Vec<arrow_array::RecordBatch> = ds
|
|
.scan()
|
|
.try_into_stream()
|
|
.await
|
|
.unwrap()
|
|
.try_collect()
|
|
.await
|
|
.unwrap();
|
|
let mut found_recovery_merge = false;
|
|
for batch in batches {
|
|
let merged = batch
|
|
.column_by_name("merged_parent_commit_id")
|
|
.expect("merged_parent_commit_id column present")
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.expect("merged_parent_commit_id is Utf8");
|
|
// The actor_id lives in _graph_commit_actors; cross-checking
|
|
// is heavier than necessary. Detecting any non-null
|
|
// merged_parent_commit_id in the post-recovery state is
|
|
// sufficient: only a recovered branch_merge can produce one
|
|
// here (we never completed a normal merge in this test).
|
|
for i in 0..merged.len() {
|
|
if !merged.is_null(i) {
|
|
found_recovery_merge = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
assert!(
|
|
found_recovery_merge,
|
|
"recovered branch_merge must record `merged_parent_commit_id` so future \
|
|
merges detect already-up-to-date — no merge-parent-tagged commit found",
|
|
);
|
|
}
|
|
drop(db);
|
|
}
|
|
|
|
/// Branch-axis variant of the branch_merge recovery test: target is a
|
|
/// non-main branch. Catches the branch-specific commit-graph head bug
|
|
/// (D2) — without `CommitGraph::open_at_branch`, the recovery sweep
|
|
/// would record the global head as the merge parent on a non-main
|
|
/// target, and future merges between the same pair would lose
|
|
/// already-up-to-date detection.
|
|
#[tokio::test]
|
|
async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let operation_id;
|
|
let target_parent_commit_id;
|
|
|
|
// Setup:
|
|
// main: alice
|
|
// target_branch (off main): + bob (target moved past base)
|
|
// source_branch (off main): + carol (source moved past base)
|
|
// Merge: source_branch → target_branch
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("target_branch").await.unwrap();
|
|
db.mutate(
|
|
"target_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("source_branch").await.unwrap();
|
|
db.mutate(
|
|
"source_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let main_person_pin = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
db.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
|
|
.await
|
|
.unwrap()
|
|
.entry("node:Person")
|
|
.expect("main must have Person")
|
|
.table_version
|
|
};
|
|
target_parent_commit_id = branch_head_commit_id(dir.path(), "target_branch")
|
|
.await
|
|
.unwrap();
|
|
|
|
// Setup: failpoint fires after the per-table publish loop completes
|
|
// but before commit_manifest_updates. Sidecar persists with
|
|
// branch=Some("target_branch").
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db
|
|
.branch_merge("source_branch", "target_branch")
|
|
.await
|
|
.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecar_count = std::fs::read_dir(&recovery_dir).unwrap().count();
|
|
assert_eq!(
|
|
sidecar_count, 1,
|
|
"exactly one sidecar must persist after non-main branch_merge failure"
|
|
);
|
|
operation_id = single_sidecar_operation_id(dir.path());
|
|
}
|
|
|
|
// Recovery: reopen runs full sweep. The BranchMerge sidecar's branch
|
|
// = Some("target_branch"); D2 fix opens a per-branch CommitGraph
|
|
// for the audit append so the merge-parent linkage is correct.
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
drop(db);
|
|
|
|
assert_post_recovery_invariants(
|
|
dir.path(),
|
|
&operation_id,
|
|
RecoveryExpectation::RolledForward {
|
|
tables: vec![
|
|
TableExpectation::branch("node:Person", "target_branch")
|
|
.expected_main_manifest_pin(main_person_pin)
|
|
.expected_recovery_parent_commit_id(target_parent_commit_id),
|
|
],
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
/// Contract: the BranchMerge sidecar's per-table `table_branch` MUST be
|
|
/// the merge target branch (where commits land via
|
|
/// `publish_rewritten_merge_table` → `open_for_mutation` → potentially
|
|
/// `fork_dataset_from_entry_state`), NOT `entry.table_branch` (where
|
|
/// the table currently lives in the target's manifest snapshot).
|
|
///
|
|
/// `ensure_indices_for_branch` already has this invariant pinned by an
|
|
/// explicit comment at `table_ops.rs:115-120`. Without the same fix in
|
|
/// `merge.rs`, a future change to candidate selection or the publish
|
|
/// path that produces a `RewriteMerged` whose entry.table_branch
|
|
/// diverges from active_branch would silently drift Lance HEAD on the
|
|
/// target ref while recovery checks the wrong ref and no-ops the
|
|
/// rollback.
|
|
///
|
|
/// This test reads the sidecar JSON directly and asserts every per-pin
|
|
/// `table_branch` equals the active (target) branch. Even when the
|
|
/// values happen to coincide in practice (the strict candidate logic
|
|
/// keeps RewriteMerged tables on active_branch), the contract assertion
|
|
/// catches a regression that reverts to `entry.table_branch.clone()`.
|
|
#[tokio::test]
|
|
async fn branch_merge_sidecar_pins_table_branch_to_active_branch() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("target_branch").await.unwrap();
|
|
db.mutate(
|
|
"target_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
db.branch_create("source_branch").await.unwrap();
|
|
db.mutate(
|
|
"source_branch",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
|
|
let _ = db
|
|
.branch_merge("source_branch", "target_branch")
|
|
.await
|
|
.expect_err("failpoint must fire");
|
|
}
|
|
|
|
let operation_id = single_sidecar_operation_id(dir.path());
|
|
let sidecar_path = dir
|
|
.path()
|
|
.join("__recovery")
|
|
.join(format!("{operation_id}.json"));
|
|
let sidecar_json = std::fs::read_to_string(&sidecar_path).unwrap();
|
|
let sidecar: serde_json::Value = serde_json::from_str(&sidecar_json).unwrap();
|
|
|
|
let tables = sidecar["tables"]
|
|
.as_array()
|
|
.expect("sidecar tables must be an array");
|
|
assert!(
|
|
!tables.is_empty(),
|
|
"sidecar must pin at least one RewriteMerged table — both branches mutated Person"
|
|
);
|
|
for pin in tables {
|
|
let table_branch = pin
|
|
.get("table_branch")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or_else(|| {
|
|
panic!(
|
|
"sidecar pin must record table_branch as the merge target (active_branch); \
|
|
got pin {pin:?}"
|
|
)
|
|
});
|
|
assert_eq!(
|
|
table_branch, "target_branch",
|
|
"sidecar pin must record `table_branch` as the merge target branch (where \
|
|
commits actually land via publish_rewritten_merge_table → open_for_mutation), \
|
|
NOT entry.table_branch from the target snapshot. See merge.rs filter_map and \
|
|
the rationale comment at table_ops.rs:115-120. Got pin: {pin:?}"
|
|
);
|
|
}
|
|
}
|
|
|
|
/// `ensure_indices` only writes a sidecar when at least one table
|
|
/// genuinely needs index work (per `needs_index_work_*` helpers in
|
|
/// `db/omnigraph/table_ops.rs`). When all tables are steady-state
|
|
/// (every declared index already built, or empty tables that the loop
|
|
/// skips), the sidecar is omitted entirely.
|
|
///
|
|
/// Test setup: `load_jsonl` auto-builds indices via
|
|
/// `prepare_updates_for_commit`. So after the load, every Person/Knows
|
|
/// index is built and Company is empty. `ensure_indices` correctly
|
|
/// produces zero pins → no sidecar. The failpoint still fires (it sits
|
|
/// after the loops), so the call returns Err — but no recovery state
|
|
/// persists. Reopen is a clean no-op.
|
|
///
|
|
/// Triggering an actual sidecar persistence requires bypassing
|
|
/// `load_jsonl`'s auto-build via raw `TableStore::append_batch` — the
|
|
/// helper-direct path. That's covered structurally by the
|
|
/// `needs_index_work_*` code path and the
|
|
/// `recovery_ensure_indices_handles_empty_tables` integration test.
|
|
#[tokio::test]
|
|
async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_needed() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
let _scenario = FailScenario::setup();
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
|
|
// Seed: load_jsonl auto-builds Person's indices via
|
|
// prepare_updates_for_commit. After this, ensure_indices has no
|
|
// work to do (steady state).
|
|
{
|
|
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Person","data":{"name":"alice","age":30}}
|
|
{"type":"Person","data":{"name":"bob","age":25}}
|
|
"#,
|
|
LoadMode::Append,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Setup: trigger the failpoint. Steady-state ensure_indices
|
|
// produces zero sidecar pins (the helpers scope pins to tables
|
|
// that genuinely need work); no sidecar is written. The failpoint
|
|
// still fires, surfacing the Err.
|
|
{
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let _failpoint =
|
|
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
|
|
let err = db.ensure_indices().await.unwrap_err();
|
|
assert!(
|
|
err.to_string().contains(
|
|
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
|
|
),
|
|
"unexpected error: {err}"
|
|
);
|
|
|
|
// KEY ASSERTION: no sidecar persists, because the helpers
|
|
// scope pins to tables that genuinely need work. Steady-state
|
|
// = no pins = no sidecar = no recovery state = zero open-time
|
|
// overhead.
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
let sidecars: Vec<_> = if recovery_dir.exists() {
|
|
std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect()
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
assert!(
|
|
sidecars.is_empty(),
|
|
"steady-state ensure_indices must not leave a sidecar; got {:?}",
|
|
sidecars,
|
|
);
|
|
}
|
|
|
|
// Recovery: reopen is a clean no-op (no sidecar to recover).
|
|
let _db = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
let recovery_dir = dir.path().join("__recovery");
|
|
if recovery_dir.exists() {
|
|
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
|
|
.unwrap()
|
|
.filter_map(|e| e.ok())
|
|
.collect();
|
|
assert!(
|
|
remaining.is_empty(),
|
|
"sidecar must remain deleted; remaining: {:?}",
|
|
remaining,
|
|
);
|
|
}
|
|
// No audit row expected — no sidecar was processed.
|
|
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
|
|
assert!(
|
|
!audit_dir.exists(),
|
|
"_graph_commit_recoveries.lance must NOT exist when no sidecar was processed"
|
|
);
|
|
}
|