mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-27 02:39:38 +02:00
Some checks failed
CI / Classify Changes (push) Has been cancelled
CI / Check AGENTS.md Links (push) Has been cancelled
CI / Container Entrypoint (push) Has been cancelled
Release Edge / Prepare edge release (push) Has been cancelled
CI / Test Workspace (push) Has been cancelled
CI / Test omnigraph-server --features aws (push) Has been cancelled
CI / RustFS S3 Integration (push) Has been cancelled
Release Edge / Build edge omnigraph-linux-x86_64 (push) Has been cancelled
Release Edge / Build edge omnigraph-macos-arm64 (push) Has been cancelled
Release Edge / Build edge omnigraph-windows-x86_64 (push) Has been cancelled
Release Edge / Smoke Windows installer (push) Has been cancelled
* docs(rfc-013): bank the #295 spec-review comments as step-5 constraints (§5.1) 3b shipped a minimal WriteTxn{branch,base} and deferred the full §4.1 opener unification (pinned-base opener, shared Session, write-local handle cache, strict-op conflict-timing move) to step 5. The greptile comments on the #295 spec were moot for #298 (none of those constructs were built) but are load-bearing for step 5: (1) the handle cache must be Send+Sync (Mutex, not RefCell); (2) the strict-op timing move needs an explicit retry contract — txn discarded after any commit, retry re-opens a fresh base — which is the SAME contract as the stale-view false-fail (§1d.2); (3) the opener-equivalence test must advance HEAD externally then assert pinned-base, not the trivial HEAD==base. * feat(engine): fold graph lineage into the __manifest publish CAS (RFC-013 Phase 7) Graph lineage no longer lives in a second write to _graph_commits.lance. Each commit's graph_commit + graph_head:<branch> rows now ride the SAME __manifest merge-insert as the table-version rows (one atomic version), and CommitGraph reads its cache from the manifest projection (read_graph_lineage). _graph_commits.lance is no longer written commit rows (it remains only as a Lance branch-ref carrier). Mechanism: a LineageIntent { graph_commit_id (ULID, minted once), branch, actor, merged_parent, created_at } threads through ManifestBatchPublisher::publish. Inside the publisher retry loop the parent is resolved per attempt from the just-loaded branch-scoped manifest (the should_replace_head winner over the visible graph_commit rows — branch-correct by Lance branch isolation; the graph_head row is written for forward-compat + the §7.1 contention point but is not the parent source, so a freshly-forked branch resolves the right fork-point parent). A CAS-conflict retry re-reads the advanced head → correct new parent; the commit_id is stable across retries. Closes two known gaps BY CONSTRUCTION (one write, no second step to fail/ race): - manifest→commit-graph atomicity (no crash window between manifest + lineage), - commit-graph parent under concurrency (no refresh→append TOCTOU; the per-write commit_graph.refresh() is gone). Recovery, branch-merge, and genesis route their lineage through the same CAS (merge: one commit_merge_with_actor; recovery: publish_recovery_commit folds the recovery commit, actor=omnigraph:recovery; genesis rides the init __manifest write). The dead _graph_commits write helpers (append_commit/_merge/_actor) are #[allow(dead_code)] (the actor sidecar table is still enumerated by optimize). Verified (sequential): build clean; the new lineage_projection gate (manifest-only — _graph_commits/_actors have 0 rows; full lineage reconstructs via the projection); branching/merge_truth_table (exhaustive, branch-aware)/composite_flow/point_in_time/ changes/consistency/recovery; failpoints (59, incl. recovery lifecycle + the now-closed atomicity gap); full --workspace. Cost tests REVERT to their pre-fold values (writes +1, write_cost ceiling 80) — the proof of true single-CAS (no extra write). invariants.md marks both gaps CLOSED. PENDING (next stages, this PR): the §7.1 concurrent graph_head one-winner gate (stage 5 — two concurrent same-branch commits, exactly one wins); the stamp bump v4 + migrate_v3_to_v4 backfill + read-only refuse for EXISTING graphs (stage 4); full doc-sync of storage.md/architecture.md/writes.md. * feat(engine): migrate existing v3 graphs to manifest lineage (RFC-013 Phase 7 stage 4) The Phase-7 fold made CommitGraph read lineage from the __manifest projection, so a pre-Phase-7 (internal-schema v3) graph — lineage in _graph_commits.lance, none in __manifest — would read an empty commit DAG. Stage 4 makes existing graphs upgrade seamlessly and not break reads. - Stamp 3 -> 4 + migrate_v3_to_v4: bumps INTERNAL_MANIFEST_SCHEMA_VERSION and adds the 3 => migrate_v3_to_v4 arm. The migration reads this branch's _graph_commits/_actors, emits one graph_commit row per commit + exactly one graph_head:<branch> for the head (should_replace_head winner, deterministic id-sort — no hash-map-order in migration output), merge-inserts into __manifest, then set_stamp(4) LAST. Idempotency guard first (read_graph_lineage non-empty -> just stamp); crash before set_stamp re-enters at v3 and the guard completes it. Does NOT touch the unenforced-PK metadata. Runs per branch: migrate_on_open backfills main; load_publish_state backfills each branch on its first write (root_uri/branch threaded through migrate_internal_schema). - v3-read fallback: CommitGraph version-gates the lineage source — stamp < 4 reads the (re-activated) _graph_commits.lance; >= 4 uses the manifest projection. So a READ-ONLY open of an un-migrated graph reads correct history with no write. Correctness catch: the legacy _graph_commit_actors.lance was never branched, so the fallback reads it FLAT (no branch checkout) while checking out the branch only on the commits dataset. - Read-only stamp-refuse: a ReadOnly open of a FUTURE-stamped graph now refuses with the same upgrade error (future-proofing the next format bump; the write path already refused via migrate_internal_schema). - Docs: storage/architecture/writes/invariants/constants updated to manifest-stored lineage; release note docs/releases/v0.8.0.md (format v4, old writers clean-break, data preserved, upgrade writers first). 6 new tests (v3 backfill, idempotent, v3 read-only fallback, future-stamp refuse in both modes, crash-before-stamp completes, legacy branch+flat-actor read). Full engine suite + failpoints (59) + cargo test --workspace --locked green; check-agents-md passes. * test(engine): graph_head concurrency gate — disjoint same-branch writers form a linear commit DAG (RFC-013 Phase 7) Two (or N) writers committing disjoint tables on one branch still share the mutable `graph_head:<branch>` manifest row, so the only row-level CAS contention is that row. The contract — exactly one writer wins each CAS round; the loser retries inside the publisher, re-resolves its parent off the freshly-advanced head, and re-commits, so every writer lands and the graph_commit DAG stays a single LINEAR chain (no fork) — had no acceptance test. This adds it. - concurrent_disjoint_writes_share_head_and_form_linear_chain: two disjoint writers + distinct LineageIntent, tokio::join!; both commit; the on-disk DAG is genesis -> c -> c' (asserted linear: exactly one genesis, no two commits share a parent, the head is the unique non-parent). - n_concurrent_disjoint_writers_converge_to_one_linear_chain: N=8 disjoint writers each with an app-level retry loop (the publisher's internal budget can be exhausted under contention); all converge to one linear chain of 8. - concurrent_disjoint_writes_form_linear_chain_on_s3: the same race on a real object store (true conditional-put CAS), bucket-gated. Cites both tests from the §7.1 contention note in invariants.md. Test-only; no production change. * perf(engine): fold the lineage parent scan into the publish path's single __manifest scan (RFC-013 P2) Each lineage publish scanned `__manifest` twice: `load_publish_state` read table state via one scan, then `resolve_lineage_rows` did a second full `read_graph_lineage` scan only to find the parent commit. Fold the `graph_commit` extraction into the existing scan. - `read_manifest_scan` gains a `collect_lineage` flag. The publish path (`read_publish_scan`) collects the `graph_commit` rows in the same pass; the table-state hot path leaves them in the forward-compat skip arm, so it never pays the O(commits) lineage JSON decode (it also skips reading the `object_id` column entirely). One shared `decode_graph_commit_row` serves both the folded path and the standalone `read_graph_lineage`, so the two cannot drift. - `resolve_lineage_rows` is now sync and takes the already-parsed rows; the per-attempt re-read is preserved because `load_publish_state` runs once per CAS attempt, so a retry still re-parents off the advanced head. - `load_publish_state` returns a named `LoadedPublishState` instead of a four-tuple; the thin `read_registered_table_locations` / `read_tombstone_versions` accessors fold away. `read_manifest_entries` becomes `#[cfg(test)]`: the fold removes its last production caller, leaving only the test-only namespace module (`db/manifest.rs`: `#[cfg(test)] mod namespace`), so gating it keeps it from becoming dead code in non-test builds. Measured at depth ~5: per-write `__manifest` reads drop 44 -> 26 (total reads 54 -> 36). write_cost.rs gains a `manifest_reads <= 34` sub-ceiling that trips if a publish-path scan is re-added, and its calibration comment is corrected. * test(engine): red — transient legacy-open failure silently completes the v3→v4 migration A pre-Phase-7 (internal schema v3) graph keeps its graph lineage in `_graph_commits.lance`; the v3→v4 internal-schema migration backfills it into `__manifest` and stamps v4. `read_legacy_commit_cache` currently maps EVERY `Dataset::open` error to "no legacy data" (`Err(_) => empty`), so a transient or corrupt open during the one-time migration backfills nothing and still stamps v4 — orphaning the real lineage permanently (the migration runs once; the v3 fallback is then disabled). Add a `migration.v3_to_v4.legacy_open` failpoint that injects a non-not-found Lance error at the legacy open, and a fault-injection regression test in the `failpoints` binary. Against the current swallow the migration completes anyway, so the test fails on its "migration must abort" assertion — the predicted symptom. The fix follows in the next commit. Test support reachable from the `failpoints` integration binary (it compiles the crate without `cfg(test)`): the v3-fixture helpers and a stamp/row-count reader are gated `cfg(any(test, feature = "failpoints"))`, still excluded from release builds. Failpoint tests stay in the integration binary because the fail registry is process-global. * fix(engine): propagate non-not-found legacy-open errors in the v3→v4 migration `read_legacy_commit_cache` mapped EVERY `Dataset::open` error to an empty cache (`Err(_) => empty`) on both the legacy commits dataset and its actor sidecar. The v3→v4 internal-schema migration reads this once before stamping internal-schema v4; a transient or corrupt open therefore backfilled nothing and stamped v4 anyway, orphaning the graph's real lineage permanently (the migration runs once, and the stamp-gated v3 fallback is disabled at v4). This is the "no silent failures" deny-list violation, and realistic on object storage. Both opens now match the not-found variants — Lance maps an object-store NotFound to `DatasetNotFound` — as the benign "no legacy data" / "no authors" signal, and propagate anything else as a loud error. The two arms share the variant contract but carry different rationale (commits-absent is the legitimate empty signal; actor-sidecar-absent is benign, but a corrupt actor open silently wiping authorship before stamping v4 is the same loss hole), commented at each site. Pinned by the `lance_surface_guards.rs::dataset_open_missing_returns_not_found_variant` guard (turns red if a Lance bump changes the absence variant) and greens the fault-injection regression test from the previous commit. * test(engine): cover the per-branch v3→v4 migration against a real Lance branch `seed_legacy_v3_lineage` writes every commit (including the "feature"-tagged one) to MAIN's `_graph_commits.lance` with `manifest_branch` as a mere field, so the production per-branch migration path — `read_legacy_commit_cache` checking out a real Lance branch, and a branch-scoped `__manifest` — was never exercised. Add `seed_legacy_v3_lineage_with_branch`, which forks a real `feature` Lance branch on BOTH `_graph_commits.lance` and `__manifest` (the branch inherits main's stripped v3 state), and a test that migrates the BRANCH and asserts the branch's lineage lands in the BRANCH's `__manifest` (genesis + A + branch commit, `graph_head:feature` → branch commit, parents + actors intact) with main's `__manifest` untouched. This empirically resolves the open question behind the merge robustness work: the fast-path `read_graph_lineage(dataset)` has no `manifest_branch` filter, but `__manifest` is Lance-branched per graph-branch, so a branch reads only its own lineage — the test confirms migrating one branch does not leak into another. No branch filter is needed. * refactor(engine): type the lineage-backfill merge conflict via the publisher classifier `state::merge_lineage_rows` (the v3→v4 lineage backfill's standalone `__manifest` merge-insert) stringified its `execute_reader` error, discarding the Lance variant. Route it through the publisher's `map_lance_publish_error` (now `pub(crate)`) so a concurrent first-open's row-level CAS loss surfaces as the SAME typed `OmniError::Manifest{ details: RowLevelCasContention }` the publisher's own retry consumes — one vocabulary, no raw-Lance matching in the migration. Deliberately NOT unified with `optimize::is_retryable_lance_conflict`: that classifier also matches `CommitConflict`/`RetryableCommitConflict` from the compaction commit path, which a row-level merge-insert never emits. Cross-linked with a comment at both sites. Behavior-preserving: the only path that changes is the error TYPE on a CAS loss (previously an opaque `Lance` string, now a typed conflict); no success/failure outcome changes. The bounded re-open retry that consumes the new type lands next. * test(engine): red — concurrent v3→v4 migrations error instead of converging `migrate_v2_to_v3` is concurrent-runner idempotent by design; v3→v4 regressed it. `merge_lineage_rows` uses `conflict_retries(0)` and `migrate_v3_to_v4` has no app-level retry, so when two processes open the same legacy graph at once the backfill's row-level CAS loser errors the whole open instead of converging. The test opens two `__manifest` handles at the same pre-migration (v3, empty-lineage) HEAD and runs both `migrate_internal_schema` calls under `tokio::join!`, forcing the `graph_head:main` CAS to fire every run. Against the current code the loser fails with `RowLevelCasContention` ("Attempted 0 retries.") — the predicted symptom — so the "both must converge" assertion panics. The bounded re-open retry that makes both converge lands next. * fix(engine): make the v3→v4 lineage backfill converge under concurrent runners `migrate_v2_to_v3` is concurrent-runner idempotent; v3→v4 was not. Two processes (or open-for-write handles) opening the same legacy graph at once both reach the backfill merge, and `merge_lineage_rows`'s `conflict_retries(0)` made the row-level CAS loser error the whole open instead of converging. Two contention points, both now handled all-or-nothing: 1. The backfill merge on `graph_head:<branch>`. Wrap (fast-path re-read → read legacy → merge) in a bounded re-open retry loop: a `RowLevelCasContention` loss re-opens the manifest past the winner's (atomic) commit and re-loops; the fast-path re-read then sees the winner's lineage and stamps. On budget exhaustion it returns a `RowLevelCasContention`-typed error so the publisher's OUTER retry loop completes it. The retry decision reuses the publisher's `is_retryable_publish_conflict` so the two stay in lockstep. 2. The terminal stamp bump. Making the merge loser converge newly lets BOTH runners reach `set_stamp(4)` — an `UpdateConfig` commit on the same key — so the loser gets `lance::Error::IncompatibleTransaction` (NOT a row-level CAS, so the merge loop doesn't catch it). This surfaced only under the concurrent full-suite run, not the isolated test. Both write the SAME value, so the conflict is benign: `commit_v4_stamp_idempotently` re-opens and, if the stamp already reached the target, succeeds; else re-applies (bounded). Greens the race test from the previous commit (3x isolated, 5x full-suite, no flake). The new `IncompatibleTransaction` match is pinned by `lance_surface_guards.rs::lance_error_incompatible_transaction_variant_exists`. * fix(engine): refuse a future internal-schema stamp on the branch read path `load_commit_cache_for_branch` dispatched on the branch's internal-schema stamp — `< CURRENT` to the v3 legacy fallback, `>= CURRENT` to the manifest projection — but never refused a `> CURRENT` branch stamp, so a newer-binary shape would be misread by the projection rather than rejected. Add `refuse_if_stamp_too_new(stamp)` (re-exported `pub(crate)` from `migrations`) right after the branch stamp is read, mirroring the main read path's `refuse_if_internal_schema_too_new`. This is defense-in-depth, not a live hole: migrations run main-first (main migrates on open; each branch on its first write), so main's stamp is always >= every branch's and the main path refuses first. The guard closes the gap if that ordering invariant is ever weakened. Tested by force-stamping a real branch past CURRENT and asserting the branch read refuses with the upgrade error (the test misreads via the projection — returns Ok — without the guard, confirmed by removing it). * docs(rfc-013): record the v3→v4 migration robustness fixes invariants.md Known Gaps: the `migrate_v3_to_v4` entry now states the migration is loud on non-not-found legacy-open errors and concurrent-runner idempotent (bounded re-open retry on the merge CAS + idempotent stamp bump), and that the branch read path refuses a `> CURRENT` stamp. lance.md: note the two new surface guards the migration depends on (`dataset_open_missing_returns_not_found_variant`, `lance_error_incompatible_transaction_variant_exists`). testing.md: note the migration fault-injection test in the failpoints row. * refactor: remove dead code and silence warnings across engine + cluster Dead-code sweep follow-up to the RFC-013 stack. No behavior change. - engine: delete the orphaned `validate_edge_cardinality` — the load path uses `validate_edge_cardinality_with_pending_loader` for every mode (including Overwrite, which it treats as the replacement table image), so the old standalone validator had no caller — and correct its sibling's now-stale doc reference. Gate `TableStore::append_batch` `#[cfg(test)]`: it is the inline- commit residual kept only for recovery test setup, with no non-test caller. - cluster: drop unused imports in `lib.rs`, delete the unused `ClusterStore::payload_display`, and raise `LiveGraphObservation` / `GraphObservationJson` / `PolicyTarget` to `pub(crate)` to match the functions that return them. Both lib crates now build warning-free. * fix(engine): match Lance's typed DatasetAlreadyExists, not the message string The internal create-or-open idempotency fallbacks in `db/commit_graph.rs` and `db/recovery_audit.rs` classified the "already exists" race by `err.to_string().contains("Dataset already exists")` — a Lance display string, not an API contract. A wording change upstream would silently break the fallback (a re-create would error instead of opening the existing table). Match the typed `lance::Error::DatasetAlreadyExists { .. }` variant instead — the same discipline as the v3→v4 migration's not-found classifier — pinned by the new `lance_surface_guards.rs::lance_error_dataset_already_exists_variant_exists` guard so a Lance rename turns red instead of silently regressing. * refactor(engine): consolidate now_micros into one crate::db helper Four `fn now_micros() -> Result<i64>` copies (commit_graph, recovery_audit, graph_coordinator, manifest/graph) had already drifted: three mapped the clock error to `OmniError::manifest("...UNIX_EPOCH...")` while recovery_audit used `OmniError::manifest_internal("...unix epoch...")`. Replace all four with one `pub(crate) fn now_micros()` in `db/mod.rs` (the majority `manifest` variant), and repoint the eight call sites at `crate::db::now_micros()`. No test asserts on the failure message, so unifying the variant is behavior-safe; the timestamp-mapping contract can no longer fork across the rows it stamps. * refactor(engine): drop the dead snapshot param from roll_back_sidecar `roll_back_sidecar` took `snapshot: &Snapshot` only to discard it with `let _ = snapshot;` — rollbacks now always publish (the restored HEAD plus a recovery-commit lineage row), so the snapshot is never read to decide whether to skip a publish. Remove the parameter, the two call-site arguments, and the suppressor. A signature must not advertise inputs it does not consume. The `Snapshot` import stays — `process_sidecar`, `roll_forward_all`, and `record_audit_recovery_rollforward` still take it. * test(engine): red — open_at_branch wedges a branch on a missing commit-graph ref A v4 graph keeps its graph lineage in `__manifest` (RFC-013 Phase 7); the `_graph_commits.lance` branch ref is a derived artifact. An interrupted fork-reclaim or a `cleanup` race can drop that derived ref while the manifest lineage stays intact. Per invariants 7 + 15 a missing derived ref must not fail a logical read of the lineage. This wedge builds a real v4 `feature` branch (its `graph_head:feature` row in `__manifest`), force-deletes ONLY the `_graph_commits.lance` `feature` ref, then asserts the branch reads (`open_at_branch` / list-commits / `merge_base`) succeed from `__manifest` while a write that needs the derived ref (`create_branch`) fails loudly with the typed actionable error. Red against current code: `open_at_branch`'s hard `checkout_branch(branch)?` on the missing ref errors `OmniError::Lance` (Lance "Not found: _graph_commits.lance/tree/feature/_versions"), wedging the logical read. * fix(engine): read manifest lineage independent of the derived _graph_commits ref `CommitGraph::open_at_branch` did a hard `checkout_branch(branch)?` on the `_graph_commits.lance` branch ref before reading lineage — so a missing derived ref (an interrupted fork-reclaim, or a `cleanup` race) wedged the branch's commit-list / merge-base / snapshot resolution even though the lineage is readable from the authoritative `__manifest` (RFC-013 Phase 7). That is a derived/physical artifact failing a logical read — invariants 7 and 15. Make the held commits handle `Option<Dataset>` (mirroring `actor_dataset`). `open_at_branch` and `refresh` check out the derived ref best-effort: a typed not-found (`RefNotFound`/`NotFound`) yields a `None` handle while the read re-syncs from `__manifest`; any other open error still propagates. The manifest existence gate is unchanged — `load_commit_cache_for_branch` keeps its hard `?`, so a truly absent branch still fails loudly at the manifest. `create_branch` (the only writer that forks a ref) and the folded-in version lookup return a loud, actionable error on `None`, deferring repair to `cleanup`'s existing orphan reconciler rather than inlining a write on a read-side refresh. Reads (`head_commit`/`load_commits`/`get_commit`/`merge_base`) never touch the handle. Greens the wedge regression from the preceding commit. * fix(engine): v3→v4 retry loops return retryable contention on exhaustion `commit_v4_stamp_idempotently`'s retry loop used `0..=STAMP_RETRY_BUDGET` (6 iterations) with an `attempt < STAMP_RETRY_BUDGET` guard, so the LAST iteration's `IncompatibleTransaction` fell through to `Err(e) => OmniError::Lance(...)` — stringified, non-retryable — instead of the intended `RowLevelCasContention`, and the post-loop contention return was dead code. The publisher's outer retry only re-runs `is_retryable_publish_conflict`, so under sustained concurrent v3→v4 migration the one-time stamp bump could fail instead of converging, defeating the idempotency the migration is supposed to add. Fix the loop to `0..BUDGET` with an UNGUARDED `IncompatibleTransaction` arm: the retryable variant is always handled inside the loop (re-open + same-value check + retry), so it can never reach the stringifying catch-all, and the post-loop is the SINGLE reachable exhaustion path — the typed `RowLevelCasContention`. The `Err(e)` arm now catches only genuine non-contention errors. Apply the same range alignment to the sibling merge loop in `migrate_v3_to_v4` (behaviorally correct today — its `Err(err)` returns the already-typed contention — but it carried the identical off-by-one structure the stamp loop was copied from; aligning both stops the next copy from re-introducing it). Test-first. The exhaustion path is otherwise near-unreachable — a real concurrent winner stamps the same value, so the re-read returns Ok on the first retry — so a new `migration.v4_stamp.force_incompatible` failpoint forces every stamp attempt to lose, driving exhaustion deterministically. Against the pre-fix loop the new `v4_stamp_exhaustion_returns_retryable_contention` test goes red with `Lance("Incompatible transaction: injected failpoint triggered…")`; with the fix it asserts the typed `RowLevelCasContention`. Found by automated review on #299. * feat(engine): minimum-supported internal-schema floor + retirement tripwire The internal-schema migration chain (`migrate_internal_schema`) had a too-new ceiling but no floor, so every old `migrate_vN_…` arm and the v3 legacy readers it needs stay forever — the pile grows by one migration + readers + tests every schema version. Add `MIN_SUPPORTED_INTERNAL_SCHEMA_VERSION` (1 today, a pure no-op: `read_stamp` floors an absent stamp at 1 and no real graph carries 0) as the oldest stamp this binary opens; raising it is how the chain sheds old code. Collapse the one-sided `refuse_if_stamp_too_new` into `refuse_if_stamp_unsupported` checking both bounds, so the floor lands at all three stamp-enforcement sites — the write-path migrate dispatcher, the read-only open guard, and the branch lineage-read path (`commit_graph.rs`) — via one compiler-enforced rename. A hand-wired floor twin would have had to touch each site, and the branch-read path is easy to miss; one combined guard cannot half-enforce. Rename the read-only wrapper `refuse_if_internal_schema_unsupported` to match. A compile-time tripwire (`const _: () = assert!(LOWEST_REGISTERED_MIGRATION_SOURCE == MIN_SUPPORTED…)`) fails the build if a future floor bump forgets to delete the now-dead migration arm (or vice versa) — stronger than a runtime test, impossible to skip, and it doubles as the use that keeps the mirror const live. Tests: a sub-floor graph is refused in both open modes (twin of `future_stamp_is_refused_in_both_open_modes`); the guard accepts exactly [MIN, CURRENT]. No behavior change for any real graph. The retirement runbook lives on the `MIN_SUPPORTED` doc-comment + invariants.md. * fix(engine): compose migration contention with publisher retry; precise recovery-converge audit commit Three review-surfaced fixes on the RFC-013 Phase 7 path. Publisher retry vs migration contention: `publish()` propagated a `load_publish_state` error fatally via `?`, so a `RowLevelCasContention` surfaced by the v3->v4 migration's exhausted merge/stamp budgets aborted the publish instead of being retried — only `merge_rows` conflicts hit the retry. This contradicted the migration's own design, which returns that typed error EXPECTING the publisher to re-run the load (by which point a concurrent winner has usually finished the migration, so the next scan is a no-op). Route a retryable load error through the same retry path as a retryable `merge_rows` conflict. Regression test (failpoints): a one-shot retryable contention injected into `load_publish_state` now commits via the retry; red without the fix (the write fails with the injected contention). Recovery-converge audit commit id: `converge_or_defer_roll_forward` recorded the branch HEAD as the audit row's `graph_commit_id`, but a concurrent user write can advance `graph_head` past the recovery commit between the winner's publish and this read — attributing the audit to a later, wrong commit. Use the latest `RECOVERY_ACTOR`-authored commit (what `publish_recovery_commit` mints), which is the recovery commit by construction. The audit's actor was already correct (it comes from `sidecar.actor_id`, not the commit). Dead param: drop the unused `snapshot` from `record_audit_recovery_rollforward` (removing the `let _ = snapshot;` suppressor). `storage` stays — it is used to delete the sidecar.
1727 lines
61 KiB
Rust
1727 lines
61 KiB
Rust
//! Tests for the direct-publish write path: mutations and loads write
|
|
//! directly to target tables and commit once via the publisher's
|
|
//! `expected_table_versions` CAS. (History: this replaced the removed Run
|
|
//! state machine / `__run__` staging branches / RunRecord — MR-771.)
|
|
//!
|
|
//! What this file covers:
|
|
//! - No `__run__*` branches are created by load or mutate.
|
|
//! - Cancellation of a mutation future leaves no graph-level state.
|
|
//! - Concurrent non-strict inserts/merges rebase under the per-table queue;
|
|
//! strict updates/deletes surface `ExpectedVersionMismatch` on stale state.
|
|
//! - Failed mutations and loads leave the target unchanged.
|
|
//! - Multi-statement mutations are atomic (one commit per query).
|
|
//! - actor_id propagates through to the commit graph.
|
|
|
|
mod helpers;
|
|
|
|
use arrow_array::Array;
|
|
use omnigraph::db::commit_graph::CommitGraph;
|
|
use omnigraph::db::{Omnigraph, ReadTarget};
|
|
use omnigraph::error::OmniError;
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
use helpers::*;
|
|
|
|
/// `omnigraph load` (no `--branch`) writes directly to the target — no
|
|
/// `__run__*` staging branch is created on success.
|
|
#[tokio::test]
|
|
async fn load_does_not_create_run_branch() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
|
|
|
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
|
|
assert!(
|
|
!std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(),
|
|
"run state machine should not write _graph_runs.lance",
|
|
);
|
|
|
|
let qr = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", "Alice")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(qr.num_rows(), 1);
|
|
}
|
|
|
|
/// `omnigraph change` writes directly to the target. After the call,
|
|
/// `branch_list()` shows only `main`; no run record exists.
|
|
#[tokio::test]
|
|
async fn mutation_does_not_create_run_branch() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let result = db
|
|
.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(result.affected_nodes, 1);
|
|
|
|
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
|
|
assert!(
|
|
!std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(),
|
|
"run state machine should not write _graph_runs.lance",
|
|
);
|
|
}
|
|
|
|
/// A failed mutation (validation error mid-query) leaves the target branch's
|
|
/// observable state unchanged. There is nothing for cleanup to delete.
|
|
#[tokio::test]
|
|
async fn failed_mutation_leaves_target_unchanged() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let err = db
|
|
.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"add_friend",
|
|
¶ms(&[("$from", "Alice"), ("$to", "Missing")]),
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
match err {
|
|
OmniError::Manifest(message) => assert!(message.message.contains("not found")),
|
|
other => panic!("unexpected error: {}", other),
|
|
}
|
|
|
|
let qr = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"friends_of",
|
|
¶ms(&[("$name", "Alice")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(qr.num_rows(), 2);
|
|
|
|
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
|
|
}
|
|
|
|
/// Multi-statement mutations are atomic at the query boundary. The
|
|
/// `insert_person_and_friend` query inserts a person and an edge that
|
|
/// references it; both must land together (read-your-writes within the
|
|
/// query, single publish at the end).
|
|
#[tokio::test]
|
|
async fn multi_statement_mutation_is_atomic_with_read_your_writes() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let result = db
|
|
.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person_and_friend",
|
|
&mixed_params(&[("$name", "Eve"), ("$friend", "Alice")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(result.affected_nodes, 1);
|
|
assert_eq!(result.affected_edges, 1);
|
|
|
|
// Both writes are visible after one publish.
|
|
let person = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", "Eve")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(person.num_rows(), 1);
|
|
|
|
let friends = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"friends_of",
|
|
¶ms(&[("$name", "Eve")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(friends.num_rows(), 1);
|
|
}
|
|
|
|
/// Mid-query partial failure: op-1 stages a Person insert, op-2 fails
|
|
/// on referential integrity (validate_edge_insert_endpoints). Under
|
|
/// the staged-write writer, op-1's batch lives in the in-memory
|
|
/// accumulator and never reaches Lance — Lance HEAD on `node:Person`
|
|
/// stays at the pre-mutation version. The publisher never publishes,
|
|
/// the manifest never advances, and the next mutation against the same
|
|
/// table proceeds normally (no `ExpectedVersionMismatch`).
|
|
///
|
|
/// Pins the staged-write contract:
|
|
/// - Failed multi-statement mutation surfaces a clear error, no
|
|
/// manifest commit, no observable state change.
|
|
/// - The touched tables stay queryable and writable from the next
|
|
/// query — Lance HEAD has not drifted.
|
|
#[tokio::test]
|
|
async fn partial_failure_leaves_target_queryable_and_unblocks_next_mutation() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// Op-1 stages a Person 'Eve' insert. Op-2 attempts an edge to
|
|
// 'Missing' — fails at validate_edge_insert_endpoints because
|
|
// 'Missing' doesn't exist (and isn't pending).
|
|
let err = db
|
|
.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person_and_friend",
|
|
&mixed_params(&[("$name", "Eve"), ("$friend", "Missing")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.expect_err("op-2 must fail");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("not found"),
|
|
"unexpected error: {}",
|
|
manifest_err.message,
|
|
);
|
|
|
|
// Atomicity at the manifest level: Eve is *not* observable. The
|
|
// staged batch never reached Lance, so neither the Lance HEAD nor
|
|
// the manifest moved.
|
|
let eve = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", "Eve")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(eve.num_rows(), 0, "partial mutation must not be visible");
|
|
|
|
// The next mutation against the same table SUCCEEDS — staged writes
|
|
// never advance Lance HEAD on a failed query, so there is no drift
|
|
// to trip the publisher's CAS.
|
|
let result = db
|
|
.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
|
|
)
|
|
.await
|
|
.expect("next mutation on the touched table must succeed under the staged-write writer");
|
|
assert_eq!(
|
|
result.affected_nodes, 1,
|
|
"follow-up insert should report 1 affected node"
|
|
);
|
|
|
|
// And Frank is observable.
|
|
let frank = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", "Frank")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(frank.num_rows(), 1, "Frank must be visible after publish");
|
|
}
|
|
|
|
/// Stale non-strict writers rebase to the live manifest pin under the
|
|
/// per-table queue instead of folding raw drift or returning a false 409.
|
|
/// Strict update/delete semantics are covered by the consistency/server tests.
|
|
#[tokio::test]
|
|
async fn stale_non_strict_insert_rebases_to_live_manifest_pin() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_string_lossy().into_owned();
|
|
|
|
{
|
|
let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Open handle B first — it captures the pre-write snapshot. We don't
|
|
// actually mutate yet; we just want B's coordinator to be at the
|
|
// pre-A-commit state when we eventually call mutate.
|
|
let mut db_b = Omnigraph::open(&uri).await.unwrap();
|
|
|
|
// Writer A advances the manifest by inserting a new Person.
|
|
{
|
|
let mut db_a = Omnigraph::open(&uri).await.unwrap();
|
|
db_a.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "WriterA")], &[("$age", 41)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Writer B's coordinator is still at the pre-A snapshot, but Insert is
|
|
// non-strict: commit_all re-reads the live manifest pin under the queue,
|
|
// verifies Lance HEAD equals that pin, and then lets Lance rebase the
|
|
// staged append.
|
|
db_b.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "WriterB")], &[("$age", 42)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
for name in ["WriterA", "WriterB"] {
|
|
let person = query_main(
|
|
&mut db_b,
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", name)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(person.num_rows(), 1, "{name} should be visible");
|
|
}
|
|
}
|
|
|
|
/// The cancellation hole that motivated removing the Run state machine: dropping a mutation future
|
|
/// mid-flight must not leave any graph-level state behind. With the run
|
|
/// state machine gone, only orphaned Lance fragments can remain — and those
|
|
/// are reclaimed by `omnigraph cleanup`.
|
|
///
|
|
/// The test deliberately does NOT assert that the manifest version is
|
|
/// unchanged: `handle.abort()` is racing the spawned task, and on a fast
|
|
/// machine the mutation may complete before cancellation. That is acceptable
|
|
/// — what matters for cancel safety is that no `__run__*` staging branches
|
|
/// are ever created, that `_graph_runs.lance` is never written, and that
|
|
/// any partial state on disk is reachable through the regular manifest /
|
|
/// commit graph pipes (so `omnigraph cleanup` can reclaim it). Asserting
|
|
/// version equality would just be a flake on hosts where the abort lands
|
|
/// late.
|
|
#[tokio::test]
|
|
async fn cancelled_mutation_future_leaves_no_state() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_string_lossy().into_owned();
|
|
|
|
{
|
|
let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let branches_before = {
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
db.branch_list().await.unwrap()
|
|
};
|
|
|
|
let uri_handle = uri.clone();
|
|
let handle = tokio::spawn(async move {
|
|
let mut db = Omnigraph::open(&uri_handle).await.unwrap();
|
|
db.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
});
|
|
|
|
// Cancel the future. Whether the in-flight write managed to land a
|
|
// fragment (or even fully publish) is timing-dependent and irrelevant —
|
|
// see the doc comment on this test for why.
|
|
handle.abort();
|
|
let _ = handle.await;
|
|
|
|
let db = Omnigraph::open(&uri).await.unwrap();
|
|
let branches_after = db.branch_list().await.unwrap();
|
|
|
|
// Cancel-safety property: no graph-level run/staging state remains.
|
|
//
|
|
// No `__run__` branches can ever be created: the Run state machine
|
|
// (`begin_run` etc.) was deleted in MR-771 — verified by the build itself,
|
|
// those symbols no longer exist. Any legacy `__run__*` branch on an
|
|
// upgraded graph is swept by the v2→v3 manifest migration.
|
|
//
|
|
// (1) The branch list is unchanged: cancellation/completion cannot
|
|
// synthesize new public branches.
|
|
assert_eq!(
|
|
branches_after, branches_before,
|
|
"cancelled mutation must not synthesize new public branches",
|
|
);
|
|
// (2) The legacy run-state machine table never reappears on disk.
|
|
assert!(
|
|
!std::path::Path::new(&format!("{}/_graph_runs.lance", uri)).exists(),
|
|
"no _graph_runs.lance after cancel — state machine is gone",
|
|
);
|
|
}
|
|
|
|
/// `actor_id` provided to `mutate_as` reaches the commit graph so audit can
|
|
/// reconstruct who published which commit. This used to be plumbed via the
|
|
/// run record; now it goes directly through the publisher and
|
|
/// `record_graph_commit`.
|
|
#[tokio::test]
|
|
async fn mutation_actor_id_lands_in_commit_graph() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
db.mutate_as(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"set_age",
|
|
&mixed_params(&[("$name", "Alice")], &[("$age", 31)]),
|
|
Some("act-andrew"),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let head = CommitGraph::open(uri)
|
|
.await
|
|
.unwrap()
|
|
.head_commit()
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
|
|
}
|
|
|
|
/// Repeated loads must not accumulate `__run__*` branches across calls. In
|
|
/// the post-demotion world there are no run branches at all — verify that
|
|
/// 10 sequential loads end with `branch_list() == ["main"]`.
|
|
#[tokio::test]
|
|
async fn repeated_loads_do_not_accumulate_branches() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
|
|
|
for i in 0..10 {
|
|
let payload = format!(
|
|
r#"{{"type":"Person","data":{{"name":"p{}","age":{}}}}}"#,
|
|
i, i
|
|
);
|
|
load_jsonl(&mut db, &payload, LoadMode::Append)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
|
|
}
|
|
|
|
/// After MR-770, `__run__*` is an ordinary branch name — the Run state machine
|
|
/// and its `is_internal_run_branch` guard are gone. The surviving internal-ref
|
|
/// guard still rejects the active `__schema_apply_lock__` branch on the public
|
|
/// create/merge APIs.
|
|
#[tokio::test]
|
|
async fn public_branch_apis_reject_internal_system_refs() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// `__run__*` is no longer reserved — creating it now succeeds.
|
|
db.branch_create("__run__formerly_reserved")
|
|
.await
|
|
.expect("__run__ prefix is a normal branch name post-MR-770");
|
|
|
|
// The schema-apply lock branch is still rejected on public branch APIs.
|
|
let create_err = db.branch_create("__schema_apply_lock__").await.unwrap_err();
|
|
let OmniError::Manifest(err) = create_err else {
|
|
panic!("expected Manifest error");
|
|
};
|
|
assert!(
|
|
err.message.contains("internal system ref"),
|
|
"unexpected error: {}",
|
|
err.message
|
|
);
|
|
|
|
let merge_err = db
|
|
.branch_merge("__schema_apply_lock__", "main")
|
|
.await
|
|
.unwrap_err();
|
|
let OmniError::Manifest(err) = merge_err else {
|
|
panic!("expected Manifest error");
|
|
};
|
|
assert!(
|
|
err.message.contains("internal system refs"),
|
|
"unexpected error: {}",
|
|
err.message
|
|
);
|
|
}
|
|
|
|
// ─── Staged-write rewire — additional contract tests ───────────────────────
|
|
|
|
/// Mutation queries used only by the staged-write tests below. Kept in
|
|
/// the test file (not in helpers' shared `MUTATION_QUERIES`) to keep
|
|
/// their scope local to the staged-write coverage.
|
|
const STAGED_QUERIES: &str = r#"
|
|
query insert_two_persons($a_name: String, $a_age: I32, $b_name: String, $b_age: I32) {
|
|
insert Person { name: $a_name, age: $a_age }
|
|
insert Person { name: $b_name, age: $b_age }
|
|
}
|
|
|
|
query insert_then_update_same_person(
|
|
$name: String, $insert_age: I32, $update_age: I32
|
|
) {
|
|
insert Person { name: $name, age: $insert_age }
|
|
update Person set { age: $update_age } where name = $name
|
|
}
|
|
|
|
query insert_two_friends($from: String, $a: String, $b: String) {
|
|
insert Knows { from: $from, to: $a }
|
|
insert Knows { from: $from, to: $b }
|
|
}
|
|
|
|
query mixed_insert_and_delete($name: String, $age: I32, $victim: String) {
|
|
insert Person { name: $name, age: $age }
|
|
delete Person where name = $victim
|
|
}
|
|
|
|
query update_then_filter_by_old_value(
|
|
$first_name: String, $first_new_age: I32,
|
|
$second_threshold: I32, $second_new_age: I32
|
|
) {
|
|
update Person set { age: $first_new_age } where name = $first_name
|
|
update Person set { age: $second_new_age } where age > $second_threshold
|
|
}
|
|
|
|
query delete_two_persons($first: String, $second: String) {
|
|
delete Person where name = $first
|
|
delete Person where name = $second
|
|
}
|
|
|
|
query update_age_by_name($name: String, $age: I32) {
|
|
update Person set { age: $age } where name = $name
|
|
}
|
|
"#;
|
|
|
|
/// D₂: a query mixing inserts/updates with deletes is rejected at parse
|
|
/// time, BEFORE any I/O. The error shape directs the user to split the
|
|
/// query into two mutations.
|
|
#[tokio::test]
|
|
async fn mutation_rejects_mixed_insert_and_delete_at_parse_time() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// Capture pre-mutation state on touched tables to confirm no I/O.
|
|
let persons_before = count_rows(&db, "node:Person").await;
|
|
|
|
let err = db
|
|
.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"mixed_insert_and_delete",
|
|
&mixed_params(&[("$name", "Eve"), ("$victim", "Alice")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.expect_err("D₂ must reject mixed insert+delete");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("inserts/updates and deletes"),
|
|
"unexpected error message: {}",
|
|
manifest_err.message,
|
|
);
|
|
assert!(
|
|
manifest_err
|
|
.message
|
|
.contains("split into separate mutations"),
|
|
"error message should direct user to split: {}",
|
|
manifest_err.message,
|
|
);
|
|
|
|
// No I/O — counts unchanged, branches unchanged.
|
|
let persons_after = count_rows(&db, "node:Person").await;
|
|
assert_eq!(
|
|
persons_before, persons_after,
|
|
"D₂ rejection must fire before any write",
|
|
);
|
|
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
|
|
}
|
|
|
|
/// `insert Person 'X'; update Person where name='X' set age=...` — both
|
|
/// ops produce content on `node:Person` and coalesce into one
|
|
/// `stage_merge_insert` at end-of-query. The accumulator's last-write-wins
|
|
/// dedupe (in `MutationStaging::finalize`) ensures the update's value
|
|
/// wins. Single Lance commit per table per query.
|
|
#[tokio::test]
|
|
async fn mixed_insert_and_update_on_same_person_coalesces_to_one_merge() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let pre_version = version_main(&db).await.unwrap();
|
|
|
|
let result = db
|
|
.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"insert_then_update_same_person",
|
|
&mixed_params(
|
|
&[("$name", "Yves")],
|
|
&[("$insert_age", 10), ("$update_age", 99)],
|
|
),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(result.affected_nodes, 2, "1 insert + 1 update reported");
|
|
|
|
// The end-state row carries the update value (last-write-wins via
|
|
// dedupe in finalize), proving the staged merge_insert ran with the
|
|
// correct source dedupe. Read the underlying Person table directly
|
|
// and assert age=99 for the row we just inserted+updated.
|
|
let batches = read_table(&db, "node:Person").await;
|
|
let mut found_age: Option<i32> = None;
|
|
for batch in &batches {
|
|
let names = batch
|
|
.column_by_name("name")
|
|
.expect("Person table missing 'name' column")
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::StringArray>()
|
|
.expect("'name' should be Utf8");
|
|
let ages = batch
|
|
.column_by_name("age")
|
|
.expect("Person table missing 'age' column")
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::Int32Array>()
|
|
.expect("'age' should be I32");
|
|
for i in 0..batch.num_rows() {
|
|
if names.is_valid(i) && names.value(i) == "Yves" {
|
|
if ages.is_valid(i) {
|
|
found_age = Some(ages.value(i));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
assert_eq!(
|
|
found_age,
|
|
Some(99),
|
|
"dedupe must keep the update's age value, not the insert's",
|
|
);
|
|
|
|
// One-publish guarantee: manifest version advanced by exactly 1. The graph
|
|
// commit (`graph_commit` + `graph_head` rows) rides the SAME publish CAS as
|
|
// the table-version rows (RFC-013 Phase 7), so one graph commit is exactly
|
|
// one manifest version bump.
|
|
let post_version = version_main(&db).await.unwrap();
|
|
assert_eq!(
|
|
post_version,
|
|
pre_version + 1,
|
|
"insert+update query must publish exactly once",
|
|
);
|
|
}
|
|
|
|
/// `insert Knows from='Alice' to='Bob'; insert Knows from='Alice' to='Eve'`
|
|
/// — both append to `edge:Knows`. The accumulator coalesces them into one
|
|
/// `stage_append` at end-of-query. Edge IDs are ULID-generated so no
|
|
/// dedupe is needed (Append mode).
|
|
#[tokio::test]
|
|
async fn multiple_appends_to_same_edge_coalesce_to_one_append() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// Add Eve so the second edge has a valid endpoint.
|
|
db.mutate(
|
|
"main",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let edges_before = count_rows(&db, "edge:Knows").await;
|
|
let pre_version = version_main(&db).await.unwrap();
|
|
|
|
let result = db
|
|
.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"insert_two_friends",
|
|
¶ms(&[("$from", "Alice"), ("$a", "Bob"), ("$b", "Eve")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(result.affected_edges, 2);
|
|
|
|
// Both edges visible.
|
|
let edges_after = count_rows(&db, "edge:Knows").await;
|
|
assert_eq!(edges_after, edges_before + 2);
|
|
|
|
// One manifest version bump for the two-edge query (atomic publish): the
|
|
// graph commit rides the same publish CAS as the table-version rows
|
|
// (RFC-013 Phase 7).
|
|
let post_version = version_main(&db).await.unwrap();
|
|
assert_eq!(
|
|
post_version,
|
|
pre_version + 1,
|
|
"two-statement edge insert must publish exactly once",
|
|
);
|
|
}
|
|
|
|
/// A multi-statement insert query touching two Person rows produces a
|
|
/// single `stage_*` + `commit_staged` per table — verified by checking
|
|
/// that the manifest version advances exactly once across the query.
|
|
#[tokio::test]
|
|
async fn multi_statement_inserts_publish_exactly_once() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let pre_version = version_main(&db).await.unwrap();
|
|
|
|
db.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"insert_two_persons",
|
|
&mixed_params(
|
|
&[("$a_name", "Owen"), ("$b_name", "Pat")],
|
|
&[("$a_age", 50), ("$b_age", 51)],
|
|
),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// One manifest version bump: the graph commit rides the same publish CAS
|
|
// as the table-version rows (RFC-013 Phase 7).
|
|
let post_version = version_main(&db).await.unwrap();
|
|
assert_eq!(
|
|
post_version,
|
|
pre_version + 1,
|
|
"two-statement insert query must publish exactly once",
|
|
);
|
|
|
|
// Both rows visible.
|
|
let owen = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", "Owen")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(owen.num_rows(), 1);
|
|
let pat = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", "Pat")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(pat.num_rows(), 1);
|
|
}
|
|
|
|
/// A load with a mid-input edge RI violation must leave Lance HEAD on
|
|
/// the touched node tables untouched (staged loader never commits any
|
|
/// fragment when the load fails). The next load on the same tables
|
|
/// succeeds — no `ExpectedVersionMismatch` from drift.
|
|
#[tokio::test]
|
|
async fn load_with_bad_edge_reference_unblocks_next_load() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
|
// Seed with the standard fixture so we're working from a non-empty
|
|
// baseline.
|
|
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
let pre_persons = count_rows(&db, "node:Person").await;
|
|
let pre_edges = count_rows(&db, "edge:Knows").await;
|
|
|
|
// First load: append a Person + an edge whose `to` points to a
|
|
// non-existent Person. RI fails AFTER the staged Person is in the
|
|
// accumulator but BEFORE the publish.
|
|
let bad = r#"{"type": "Person", "data": {"name": "Mallory", "age": 5}}
|
|
{"edge": "Knows", "from": "Mallory", "to": "Ghost"}
|
|
"#;
|
|
let err = load_jsonl(&mut db, bad, LoadMode::Append)
|
|
.await
|
|
.expect_err("RI violation must fail the load");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("not found"),
|
|
"unexpected error: {}",
|
|
manifest_err.message,
|
|
);
|
|
|
|
// No write made it to disk: counts unchanged.
|
|
let mid_persons = count_rows(&db, "node:Person").await;
|
|
let mid_edges = count_rows(&db, "edge:Knows").await;
|
|
assert_eq!(
|
|
mid_persons, pre_persons,
|
|
"failed load must not advance Person count"
|
|
);
|
|
assert_eq!(
|
|
mid_edges, pre_edges,
|
|
"failed load must not advance Knows count"
|
|
);
|
|
|
|
// Second load against the same tables — succeeds (no HEAD drift).
|
|
let good = r#"{"type": "Person", "data": {"name": "Pat", "age": 55}}"#;
|
|
load_jsonl(&mut db, good, LoadMode::Append).await.unwrap();
|
|
assert_eq!(
|
|
count_rows(&db, "node:Person").await,
|
|
pre_persons + 1,
|
|
"follow-up load must succeed (no drift)",
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn load_overwrite_with_bad_edge_reference_unblocks_next_load() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
let pre_persons = count_rows(&db, "node:Person").await;
|
|
let pre_edges = count_rows(&db, "edge:Knows").await;
|
|
|
|
let bad = r#"{"type": "Person", "data": {"name": "Mallory", "age": 5}}
|
|
{"edge": "Knows", "from": "Mallory", "to": "Ghost"}
|
|
"#;
|
|
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
|
|
.await
|
|
.expect_err("RI violation must fail overwrite before commit_staged");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("not found"),
|
|
"unexpected error: {}",
|
|
manifest_err.message,
|
|
);
|
|
|
|
assert_eq!(count_rows(&db, "node:Person").await, pre_persons);
|
|
assert_eq!(count_rows(&db, "edge:Knows").await, pre_edges);
|
|
|
|
let good = r#"{"type": "Person", "data": {"name": "Pat", "age": 55}}
|
|
{"type": "Person", "data": {"name": "Quinn", "age": 56}}
|
|
{"edge": "Knows", "from": "Pat", "to": "Quinn"}
|
|
"#;
|
|
load_jsonl(&mut db, good, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(count_rows(&db, "node:Person").await, 2);
|
|
assert_eq!(count_rows(&db, "edge:Knows").await, 1);
|
|
}
|
|
|
|
/// Same shape as the RI test above, but driven by a cardinality
|
|
/// violation (`@card(0..1)` on `WorksAt`). The staged loader's pending
|
|
/// edge accumulator drives the cardinality scan; a violation aborts
|
|
/// the load before publish; the next load on the same tables succeeds.
|
|
#[tokio::test]
|
|
async fn load_with_cardinality_violation_unblocks_next_load() {
|
|
// Use a custom schema where WorksAt has a strict 0..1 cardinality
|
|
// bound — the default test schema leaves WorksAt unbounded. Seed
|
|
// Alice + two companies, then attempt two WorksAt edges from Alice,
|
|
// which violates the bound.
|
|
const CARD_SCHEMA: &str = r#"
|
|
node Person {
|
|
name: String @key
|
|
age: I32?
|
|
}
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
edge WorksAt: Person -> Company @card(0..1)
|
|
"#;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
|
|
|
|
let seed = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
|
|
{"type": "Company", "data": {"name": "Acme"}}
|
|
{"type": "Company", "data": {"name": "Bigco"}}
|
|
"#;
|
|
load_jsonl(&mut db, seed, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
let pre_works = count_rows(&db, "edge:WorksAt").await;
|
|
|
|
// Two WorksAt edges from Alice — exceeds @card(0..1).
|
|
let bad = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
|
|
{"edge": "WorksAt", "from": "Alice", "to": "Bigco"}
|
|
"#;
|
|
let err = load_jsonl(&mut db, bad, LoadMode::Append)
|
|
.await
|
|
.expect_err("cardinality violation must fail the load");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("@card violation"),
|
|
"unexpected error: {}",
|
|
manifest_err.message,
|
|
);
|
|
|
|
// No edges added; next load on the same edge table succeeds.
|
|
let mid_works = count_rows(&db, "edge:WorksAt").await;
|
|
assert_eq!(mid_works, pre_works);
|
|
|
|
let good = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}"#;
|
|
load_jsonl(&mut db, good, LoadMode::Append).await.unwrap();
|
|
assert_eq!(
|
|
count_rows(&db, "edge:WorksAt").await,
|
|
pre_works + 1,
|
|
"follow-up load must succeed (no drift on edge table)",
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn load_overwrite_with_cardinality_violation_unblocks_next_load() {
|
|
const CARD_SCHEMA: &str = r#"
|
|
node Person {
|
|
name: String @key
|
|
age: I32?
|
|
}
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
edge WorksAt: Person -> Company @card(0..1)
|
|
"#;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
|
|
|
|
let seed = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
|
|
{"type": "Company", "data": {"name": "Acme"}}
|
|
{"type": "Company", "data": {"name": "Bigco"}}
|
|
"#;
|
|
load_jsonl(&mut db, seed, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
let pre_works = count_rows(&db, "edge:WorksAt").await;
|
|
|
|
let bad = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}
|
|
{"edge": "WorksAt", "from": "Alice", "to": "Bigco"}
|
|
"#;
|
|
let err = load_jsonl(&mut db, bad, LoadMode::Overwrite)
|
|
.await
|
|
.expect_err("cardinality violation must fail overwrite before commit_staged");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("@card violation"),
|
|
"unexpected error: {}",
|
|
manifest_err.message,
|
|
);
|
|
assert_eq!(count_rows(&db, "edge:WorksAt").await, pre_works);
|
|
|
|
let good = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme"}"#;
|
|
load_jsonl(&mut db, good, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
|
|
}
|
|
|
|
// ─── Chained-mutation correctness — pinned coverage ─────────────────────────
|
|
|
|
/// Chained `update` ops in one query must respect each previous op's
|
|
/// view of the rows. Without merge-shadow semantics on
|
|
/// `scan_with_pending`, the second update sees the stale committed value
|
|
/// (the first update's row still appears in the Lance scan because the
|
|
/// pending side hasn't committed), the predicate matches it, and the
|
|
/// dedupe-last-wins step at finalize ends up applying the second update
|
|
/// to a row whose pending value should have shielded it.
|
|
///
|
|
/// Concretely: Alice starts at age=30 in TEST_DATA. Op-1 sets Alice to
|
|
/// age=99. Op-2 updates anyone with age > 50 to age=10. After op-1,
|
|
/// Alice's logical value is age=99 — within op-2's predicate. So op-2
|
|
/// SHOULD update Alice to age=10. The interesting case is: op-2 must
|
|
/// see Alice at age=99 (op-1's pending value), not age=30 (committed).
|
|
/// If the helper unioned without shadowing, op-2 would also match the
|
|
/// stale committed Alice (age=30 doesn't trigger the predicate, but the
|
|
/// row would appear twice and dedupe could pick either). The test
|
|
/// asserts both ends: Alice ends at age=10, the publisher publishes
|
|
/// once.
|
|
#[tokio::test]
|
|
async fn chained_updates_with_overlapping_predicate_respects_intermediate_value() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let pre_version = version_main(&db).await.unwrap();
|
|
|
|
db.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"update_then_filter_by_old_value",
|
|
&mixed_params(
|
|
&[("$first_name", "Alice")],
|
|
&[
|
|
("$first_new_age", 99),
|
|
("$second_threshold", 50),
|
|
("$second_new_age", 10),
|
|
],
|
|
),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// After op-1: Alice = 99. After op-2 (where age > 50): Alice
|
|
// matches (99 > 50) → set to 10. End state: Alice = 10.
|
|
let batches = read_table(&db, "node:Person").await;
|
|
let mut alice_age: Option<i32> = None;
|
|
for batch in &batches {
|
|
let names = batch
|
|
.column_by_name("name")
|
|
.unwrap()
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::StringArray>()
|
|
.unwrap();
|
|
let ages = batch
|
|
.column_by_name("age")
|
|
.unwrap()
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::Int32Array>()
|
|
.unwrap();
|
|
for i in 0..batch.num_rows() {
|
|
if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) {
|
|
alice_age = Some(ages.value(i));
|
|
}
|
|
}
|
|
}
|
|
assert_eq!(
|
|
alice_age,
|
|
Some(10),
|
|
"chained-update final value must reflect the second update applied to op-1's pending value"
|
|
);
|
|
|
|
// One manifest version bump: the graph commit rides the same publish CAS
|
|
// as the table-version rows (RFC-013 Phase 7).
|
|
let post_version = version_main(&db).await.unwrap();
|
|
assert_eq!(
|
|
post_version,
|
|
pre_version + 1,
|
|
"chained update must publish exactly once",
|
|
);
|
|
}
|
|
|
|
/// Two `delete` ops on the same node table in one query. Pre-fix,
|
|
/// op-2's `open_table_for_mutation` went through
|
|
/// `open_for_mutation_on_branch` which trips `ensure_expected_version`
|
|
/// (Lance HEAD has advanced past the manifest's pinned version after
|
|
/// op-1's inline-commit, but the manifest hasn't moved). Post-fix,
|
|
/// `open_table_for_mutation` reopens via `inline_committed[table_key]`
|
|
/// at the post-delete Lance version. Test asserts both deletes succeed
|
|
/// in one query, both rows are gone, manifest version advances by 1.
|
|
#[tokio::test]
|
|
async fn multi_statement_delete_on_same_node_table() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let pre_persons = count_rows(&db, "node:Person").await;
|
|
let pre_version = version_main(&db).await.unwrap();
|
|
|
|
db.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"delete_two_persons",
|
|
¶ms(&[("$first", "Alice"), ("$second", "Bob")]),
|
|
)
|
|
.await
|
|
.expect("multi-delete on same table must succeed");
|
|
|
|
assert_eq!(
|
|
count_rows(&db, "node:Person").await,
|
|
pre_persons - 2,
|
|
"both deletes must land",
|
|
);
|
|
// One manifest version bump: the graph commit (delete-only queries record
|
|
// one too) rides the same publish CAS as the table-version rows
|
|
// (RFC-013 Phase 7).
|
|
let post_version = version_main(&db).await.unwrap();
|
|
assert_eq!(
|
|
post_version,
|
|
pre_version + 1,
|
|
"multi-delete query publishes exactly once at end",
|
|
);
|
|
|
|
// Both rows actually gone:
|
|
for name in ["Alice", "Bob"] {
|
|
let qr = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
TEST_QUERIES,
|
|
"get_person",
|
|
¶ms(&[("$name", name)]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(qr.num_rows(), 0, "{name} should be deleted");
|
|
}
|
|
}
|
|
|
|
/// Cascade-then-explicit variant: deleting a node cascades to its
|
|
/// edges, advancing Lance HEAD on the edge table. A subsequent
|
|
/// `delete <Edge>` op in the same query must reopen at the
|
|
/// post-cascade-commit version of the edge table — not trip
|
|
/// `ensure_expected_version` against the manifest's pinned version.
|
|
#[tokio::test]
|
|
async fn cascade_delete_node_then_explicit_delete_edge_on_same_table() {
|
|
const QUERY: &str = r#"
|
|
query cascade_then_explicit($name: String, $other: String) {
|
|
delete Person where name = $name
|
|
delete Knows where from = $other
|
|
}
|
|
"#;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// TEST_DATA seeds three Knows edges:
|
|
// Alice → Bob, Alice → Charlie (cascade target — should be deleted by op-1)
|
|
// Bob → Diana (explicit target — should be deleted by op-2)
|
|
// After both ops, all three edges must be gone. A weaker assertion
|
|
// (just "count decreased") would pass even if op-2 silently no-op'd
|
|
// — Bob→Diana would survive. The exact-count check makes both ops
|
|
// independently observable.
|
|
let pre_knows = count_rows(&db, "edge:Knows").await;
|
|
assert_eq!(
|
|
pre_knows, 3,
|
|
"fixture invariant: TEST_DATA seeds 3 Knows edges"
|
|
);
|
|
|
|
db.mutate(
|
|
"main",
|
|
QUERY,
|
|
"cascade_then_explicit",
|
|
¶ms(&[("$name", "Alice"), ("$other", "Bob")]),
|
|
)
|
|
.await
|
|
.expect("cascade-then-explicit-delete on same edge table must succeed");
|
|
|
|
// Both ops landed: cascade removed Alice→Bob and Alice→Charlie;
|
|
// explicit removed Bob→Diana. Anything > 0 means one op silently
|
|
// did nothing (the bug we're guarding against).
|
|
let post_knows = count_rows(&db, "edge:Knows").await;
|
|
assert_eq!(
|
|
post_knows, 0,
|
|
"both cascade + explicit delete must complete (Bob→Diana would survive if op-2 no-op'd)",
|
|
);
|
|
}
|
|
|
|
/// The engine cardinality path must enforce `min` bounds. Pre-fix the
|
|
/// engine path silently dropped the min check (a `let _ = card.min;`
|
|
/// line). The loader path always enforced both. Post-fix, both paths
|
|
/// route through `enforce_cardinality_bounds` which checks both bounds.
|
|
///
|
|
/// Build a custom schema with `Knows: Person -> Person @card(2..*)`.
|
|
/// Inserting a single Knows edge violates min=2. The mutation path must
|
|
/// reject.
|
|
#[tokio::test]
|
|
async fn mutation_insert_edge_enforces_min_cardinality() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
const MIN_CARD_SCHEMA: &str = r#"
|
|
node Person {
|
|
name: String @key
|
|
}
|
|
edge Knows: Person -> Person @card(2..)
|
|
"#;
|
|
const MIN_CARD_QUERY: &str = r#"
|
|
query add_friend($from: String, $to: String) {
|
|
insert Knows { from: $from, to: $to }
|
|
}
|
|
"#;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, MIN_CARD_SCHEMA).await.unwrap();
|
|
|
|
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
|
|
{"type": "Person", "data": {"name": "Bob"}}
|
|
"#;
|
|
load_jsonl(&mut db, seed, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Single insert: count=1 < min=2 → reject with clear message.
|
|
let err = db
|
|
.mutate(
|
|
"main",
|
|
MIN_CARD_QUERY,
|
|
"add_friend",
|
|
¶ms(&[("$from", "Alice"), ("$to", "Bob")]),
|
|
)
|
|
.await
|
|
.expect_err("min cardinality must reject the engine path");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("@card violation") && manifest_err.message.contains("min 2"),
|
|
"unexpected error: {}",
|
|
manifest_err.message,
|
|
);
|
|
}
|
|
|
|
/// `LoadMode::Merge` on edges must NOT double-count the committed
|
|
/// edge AND its updated pending replacement. Build a custom
|
|
/// schema where WorksAt has @card(0..1). Seed Alice with one WorksAt to
|
|
/// Acme. Then Merge-load the SAME edge id (so it's an update, not an
|
|
/// insert) pointing Alice's WorksAt at Bigco. Cardinality must count
|
|
/// Alice's edges as 1 (the post-merge count), not 2 (committed + pending).
|
|
#[tokio::test]
|
|
async fn load_merge_mode_dedupes_edge_for_cardinality_count() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
const CARD_SCHEMA: &str = r#"
|
|
node Person {
|
|
name: String @key
|
|
}
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
edge WorksAt: Person -> Company @card(0..1)
|
|
"#;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
|
|
|
|
// Seed: Alice + Acme + Bigco + WorksAt(id=w1, Alice→Acme). Note the
|
|
// loader reads edge ids from the `data.id` field (not top-level), so
|
|
// we place the id inside `data` for both the seed and the update.
|
|
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
|
|
{"type": "Company", "data": {"name": "Acme"}}
|
|
{"type": "Company", "data": {"name": "Bigco"}}
|
|
{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}}
|
|
"#;
|
|
load_jsonl(&mut db, seed, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Merge-update the same edge id w1 to point at Bigco. Counted naively
|
|
// as union, Alice has 2 WorksAt (committed Acme + pending Bigco) which
|
|
// would trip @card(0..1). With merge dedupe, Alice has 1 WorksAt.
|
|
let merge_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}}
|
|
"#;
|
|
load_jsonl(&mut db, merge_data, LoadMode::Merge)
|
|
.await
|
|
.expect("Merge update must dedupe the committed edge by id");
|
|
|
|
// Confirm there's exactly 1 WorksAt edge after merge.
|
|
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
|
|
}
|
|
|
|
/// A Merge load whose input has TWO rows with the same edge id must be
|
|
/// deduped at cardinality-count time, not just at finalize. Without
|
|
/// dedup, two pending rows count twice → spurious `@card` violation.
|
|
/// With dedup (last-occurrence-wins, mirroring
|
|
/// `dedupe_merge_batches_by_id`), the pending side counts once.
|
|
///
|
|
/// This is a separate path from `load_merge_mode_dedupes_edge_for_cardinality_count`
|
|
/// (which dedupes committed-vs-pending). Here we verify pending-vs-pending
|
|
/// dedup.
|
|
#[tokio::test]
|
|
async fn load_merge_mode_dedupes_within_pending_for_cardinality_count() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
const CARD_SCHEMA: &str = r#"
|
|
node Person {
|
|
name: String @key
|
|
}
|
|
node Company {
|
|
name: String @key
|
|
}
|
|
edge WorksAt: Person -> Company @card(0..1)
|
|
"#;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, CARD_SCHEMA).await.unwrap();
|
|
|
|
let seed = r#"{"type": "Person", "data": {"name": "Alice"}}
|
|
{"type": "Company", "data": {"name": "Acme"}}
|
|
{"type": "Company", "data": {"name": "Bigco"}}
|
|
"#;
|
|
load_jsonl(&mut db, seed, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Merge load with the SAME edge id twice — the second row supersedes
|
|
// the first in the finalize-time dedupe. If pending-counting doesn't
|
|
// dedupe, Alice has 2 pending edges → @card(0..1) trips → load
|
|
// fails. With dedupe, Alice has 1 → load succeeds.
|
|
let dup_data = r#"{"edge": "WorksAt", "from": "Alice", "to": "Acme", "data": {"id": "w1"}}
|
|
{"edge": "WorksAt", "from": "Alice", "to": "Bigco", "data": {"id": "w1"}}
|
|
"#;
|
|
load_jsonl(&mut db, dup_data, LoadMode::Merge)
|
|
.await
|
|
.expect("Merge load with within-input dup ids must dedupe pending count");
|
|
|
|
// Exactly one WorksAt edge after the dedup; the second row wins
|
|
// (last-occurrence) so dst should be Bigco.
|
|
assert_eq!(count_rows(&db, "edge:WorksAt").await, 1);
|
|
}
|
|
|
|
/// `scan_with_pending` must reject a call where `key_column` is
|
|
/// requested but the projection omits that column. Without the
|
|
/// up-front check, the helper silently degraded to union semantics —
|
|
/// letting a chained-update bug slip through unnoticed. This test
|
|
/// verifies the contract is enforced at the API boundary.
|
|
#[tokio::test]
|
|
async fn scan_with_pending_rejects_key_column_missing_from_projection() {
|
|
use arrow_array::{RecordBatch, StringArray};
|
|
use arrow_schema::{DataType, Field, Schema};
|
|
use omnigraph::table_store::TableStore;
|
|
use std::sync::Arc;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
|
|
let store = TableStore::new(dir.path().to_str().unwrap());
|
|
|
|
let schema = Arc::new(Schema::new(vec![
|
|
Field::new("id", DataType::Utf8, false),
|
|
Field::new("note", DataType::Utf8, true),
|
|
]));
|
|
let seed = RecordBatch::try_new(
|
|
schema.clone(),
|
|
vec![
|
|
Arc::new(StringArray::from(vec!["a", "b"])) as _,
|
|
Arc::new(StringArray::from(vec![Some("seed-a"), Some("seed-b")])) as _,
|
|
],
|
|
)
|
|
.unwrap();
|
|
let ds = TableStore::write_dataset(&uri, seed).await.unwrap();
|
|
|
|
let pending = RecordBatch::try_new(
|
|
schema.clone(),
|
|
vec![
|
|
Arc::new(StringArray::from(vec!["a"])) as _,
|
|
Arc::new(StringArray::from(vec![Some("pending-a")])) as _,
|
|
],
|
|
)
|
|
.unwrap();
|
|
|
|
// Bad call: key_column = "id" but projection doesn't include "id".
|
|
// Pre-fix this silently disabled merge-shadowing and returned both
|
|
// committed "a" and pending "a" rows. Now it must error.
|
|
let err = store
|
|
.scan_with_pending(
|
|
&ds,
|
|
std::slice::from_ref(&pending),
|
|
None,
|
|
Some(&["note"]),
|
|
None,
|
|
Some("id"),
|
|
)
|
|
.await
|
|
.expect_err("scan_with_pending must reject merge-shadow with missing key in projection");
|
|
let msg = err.to_string();
|
|
assert!(
|
|
msg.contains("key_column 'id'") && msg.contains("must appear in projection"),
|
|
"unexpected error: {msg}"
|
|
);
|
|
|
|
// Good call: projection includes the key column. Shadow works:
|
|
// pending row 'a' shadows committed 'a', so the result has only
|
|
// committed 'b' + pending 'a'.
|
|
let batches = store
|
|
.scan_with_pending(
|
|
&ds,
|
|
std::slice::from_ref(&pending),
|
|
None,
|
|
Some(&["id", "note"]),
|
|
None,
|
|
Some("id"),
|
|
)
|
|
.await
|
|
.expect("projection containing key_column must succeed");
|
|
let mut ids: Vec<String> = Vec::new();
|
|
for b in &batches {
|
|
let arr = b
|
|
.column_by_name("id")
|
|
.unwrap()
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::StringArray>()
|
|
.unwrap();
|
|
for i in 0..arr.len() {
|
|
ids.push(arr.value(i).to_string());
|
|
}
|
|
}
|
|
ids.sort();
|
|
assert_eq!(
|
|
ids,
|
|
vec!["a", "b"],
|
|
"merge-shadow should drop committed 'a' and surface pending 'a' + committed 'b'"
|
|
);
|
|
}
|
|
|
|
/// `PendingTable.schema` is captured from the first `append_batch` call
|
|
/// and never updated. On a blob-bearing table, an `insert` produces a
|
|
/// full-schema batch (blob columns included) and an `update` that
|
|
/// doesn't assign every blob produces a subset-schema batch. Mixed in
|
|
/// one query, the second `append_batch` would silently push an
|
|
/// incompatible batch — the mismatch surfaced eventually at
|
|
/// `concat_batches`/MemTable construction inside finalize, but the
|
|
/// failure point was distant from the offending op.
|
|
///
|
|
/// `append_batch` validates the new batch's schema against the existing
|
|
/// accumulator's schema and returns a typed error directing the caller
|
|
/// to split the mutation. The error fires at the second op (the
|
|
/// update), not at end-of-query.
|
|
#[tokio::test]
|
|
async fn append_batch_rejects_mismatched_schema_in_blob_table_at_offending_op() {
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
|
|
const BLOB_SCHEMA: &str = r#"
|
|
node Document {
|
|
title: String @key
|
|
content: Blob?
|
|
note: String?
|
|
}
|
|
"#;
|
|
const BLOB_QUERIES: &str = r#"
|
|
query insert_then_update_note(
|
|
$title: String, $blob: String, $note: String
|
|
) {
|
|
insert Document { title: $title, content: $blob }
|
|
update Document set { note: $note } where title = $title
|
|
}
|
|
"#;
|
|
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
|
|
|
|
// Seed with a Document so the update has something to match (the
|
|
// mid-query case is the chained-update scenario where the update's
|
|
// predicate matches the just-inserted row, exercising the in-memory
|
|
// pending union).
|
|
load_jsonl(
|
|
&mut db,
|
|
r#"{"type":"Document","data":{"title":"seed","content":"base64:AQID"}}"#,
|
|
LoadMode::Overwrite,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let err = db
|
|
.mutate(
|
|
"main",
|
|
BLOB_QUERIES,
|
|
"insert_then_update_note",
|
|
¶ms(&[
|
|
("$title", "letter"),
|
|
("$blob", "base64:BAUG"),
|
|
("$note", "draft 1"),
|
|
]),
|
|
)
|
|
.await
|
|
.expect_err("blob-table mixed insert+update with non-fully-assigned blob must error early");
|
|
let OmniError::Manifest(manifest_err) = err else {
|
|
panic!("expected Manifest error, got {err:?}");
|
|
};
|
|
assert!(
|
|
manifest_err.message.contains("mismatched schemas")
|
|
&& manifest_err.message.contains("Split the mutation"),
|
|
"error must direct user to split: {}",
|
|
manifest_err.message,
|
|
);
|
|
|
|
// Confirm the manifest didn't advance — early error must be
|
|
// before any commit.
|
|
let qr = db
|
|
.query(
|
|
ReadTarget::branch("main"),
|
|
r#"query get_doc($title: String) {
|
|
match { $d: Document { title: $title } }
|
|
return { $d.title }
|
|
}"#,
|
|
"get_doc",
|
|
¶ms(&[("$title", "letter")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(
|
|
qr.num_rows(),
|
|
0,
|
|
"letter must not be visible after early error"
|
|
);
|
|
}
|
|
|
|
/// MR-920 regression: two sequential `update T set {f:v} where x=y`
|
|
/// invocations against the same row must both succeed. Pre-fix, the
|
|
/// second one failed with `Ambiguous merge inserts are prohibited:
|
|
/// multiple source rows match the same target row on (id = "Alice")`
|
|
/// even though the scan returned exactly one row.
|
|
///
|
|
/// Root cause hypothesis (per MR-920): Lance's
|
|
/// `processed_row_ids: Mutex<HashSet<u64>>`
|
|
/// (`src/dataset/write/merge_insert.rs:2099`) double-processes the
|
|
/// same target row_id against datasets previously rewritten by
|
|
/// merge_insert. `SourceDedupeBehavior::FirstSeen` makes Lance skip
|
|
/// rather than error.
|
|
///
|
|
/// Companion to `consistency.rs::load_merge_repeated_against_overlapping_keys_succeeds`
|
|
/// (PR #98 / Window 1 of the bug class via the load surface).
|
|
#[tokio::test]
|
|
async fn second_sequential_update_on_same_row_succeeds() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
db.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"update_age_by_name",
|
|
&mixed_params(&[("$name", "Alice")], &[("$age", 99)]),
|
|
)
|
|
.await
|
|
.expect("first sequential update on Alice must succeed");
|
|
|
|
let batches = read_table(&db, "node:Person").await;
|
|
let alice_count: usize = batches
|
|
.iter()
|
|
.map(|b| {
|
|
let names = b
|
|
.column_by_name("name")
|
|
.unwrap()
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::StringArray>()
|
|
.unwrap();
|
|
(0..b.num_rows())
|
|
.filter(|i| names.is_valid(*i) && names.value(*i) == "Alice")
|
|
.count()
|
|
})
|
|
.sum();
|
|
assert_eq!(
|
|
alice_count, 1,
|
|
"after first update, exactly one Alice row should be visible"
|
|
);
|
|
|
|
db.mutate(
|
|
"main",
|
|
STAGED_QUERIES,
|
|
"update_age_by_name",
|
|
&mixed_params(&[("$name", "Alice")], &[("$age", 42)]),
|
|
)
|
|
.await
|
|
.expect("second sequential update on Alice must succeed");
|
|
|
|
let batches = read_table(&db, "node:Person").await;
|
|
let mut alice_age: Option<i32> = None;
|
|
for batch in &batches {
|
|
let names = batch
|
|
.column_by_name("name")
|
|
.unwrap()
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::StringArray>()
|
|
.unwrap();
|
|
let ages = batch
|
|
.column_by_name("age")
|
|
.unwrap()
|
|
.as_any()
|
|
.downcast_ref::<arrow_array::Int32Array>()
|
|
.unwrap();
|
|
for i in 0..batch.num_rows() {
|
|
if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) {
|
|
alice_age = Some(ages.value(i));
|
|
}
|
|
}
|
|
}
|
|
assert_eq!(
|
|
alice_age,
|
|
Some(42),
|
|
"Alice's age must reflect the second update"
|
|
);
|
|
}
|
|
|
|
// An interrupted first-write fork (create_branch succeeded, the manifest
|
|
// publish did not) leaves a fully-formed Lance branch ref on the table that
|
|
// the manifest never references — a "manifest-unreferenced fork". The branch
|
|
// itself stays a valid manifest branch, so `cleanup`'s reconciler (keyed on
|
|
// the manifest branch list) never reclaims it. Today the next write to that
|
|
// table on that branch re-enters the fork path, `create_branch` collides, and
|
|
// the engine wedges with "incomplete prior delete; run `omnigraph cleanup`".
|
|
//
|
|
// We forge that exact residue (a live `feature` branch + a directly-created
|
|
// `feature` ref on the Person table the manifest doesn't reference) and assert
|
|
// the next write — via both `load` and `mutate` — self-heals by reclaiming the
|
|
// orphan fork and re-forking, rather than wedging. No process death / timing
|
|
// needed: the forge is the post-crash state.
|
|
#[tokio::test]
|
|
async fn first_write_self_heals_manifest_unreferenced_fork_on_live_branch() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap().to_string();
|
|
let mut db = init_and_load(&dir).await;
|
|
db.branch_create("feature").await.unwrap();
|
|
|
|
// Forge the manifest-unreferenced fork directly at the Lance layer.
|
|
let person_uri = node_table_uri(&uri, "Person");
|
|
{
|
|
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
|
|
let base = ds.version().version;
|
|
ds.create_branch("feature", base, None).await.unwrap();
|
|
assert!(
|
|
ds.list_branches().await.unwrap().contains_key("feature"),
|
|
"precondition: forged orphan fork present on Person"
|
|
);
|
|
}
|
|
|
|
// load → must self-heal, not wedge with "incomplete prior delete".
|
|
let row = r#"{"type":"Person","data":{"name":"Zoe","age":30}}"#;
|
|
db.load_as("feature", None, row, LoadMode::Merge, None)
|
|
.await
|
|
.expect("load onto a manifest-unreferenced fork must self-heal, not wedge");
|
|
|
|
// mutate → same path, must also self-heal.
|
|
mutate_branch(
|
|
&mut db,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"insert_person",
|
|
&mixed_params(&[("$name", "Yan")], &[("$age", 41)]),
|
|
)
|
|
.await
|
|
.expect("mutate onto a manifest-unreferenced fork must self-heal");
|
|
|
|
// The healed branch holds the new rows; main is untouched (still no Zoe/Yan).
|
|
let feature_people = count_rows_branch(&db, "feature", "node:Person").await;
|
|
let main_people = count_rows(&db, "node:Person").await;
|
|
assert!(
|
|
feature_people >= main_people + 2,
|
|
"feature must contain the two new rows on top of the inherited set \
|
|
(feature={feature_people}, main={main_people})"
|
|
);
|
|
}
|
|
|
|
// A node delete cascades to every edge table touching that node, forking those
|
|
// edge tables during execution. The up-front fork-queue acquisition must cover
|
|
// those cascade-forked edges, not just the node table named in the IR — else
|
|
// commit_all's held-guard coverage check fails the write (and, before the
|
|
// coverage check was promoted out of debug-only, edge commits would slip
|
|
// through unserialized). This drives the new code via a DELETE (the only
|
|
// cascading op), on a branch, as the FIRST write (so it actually forks).
|
|
#[tokio::test]
|
|
async fn branch_cascade_delete_forks_node_and_edges_under_held_queues() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
db.branch_create("feature").await.unwrap();
|
|
|
|
// Baseline inherited from main (Alice has 2 Knows + 1 WorksAt edge).
|
|
let main_people = count_rows(&db, "node:Person").await;
|
|
let main_knows = count_rows(&db, "edge:Knows").await;
|
|
|
|
// First write to `feature` is `delete Person Alice`, whose cascade forks
|
|
// node:Person AND edge:Knows + edge:WorksAt. Pre-fix the up-front set held
|
|
// only node:Person, so commit_all's coverage check rejected the write.
|
|
mutate_branch(
|
|
&mut db,
|
|
"feature",
|
|
MUTATION_QUERIES,
|
|
"remove_person",
|
|
&mixed_params(&[("$name", "Alice")], &[]),
|
|
)
|
|
.await
|
|
.expect("branch cascade-delete must hold queues for cascade-forked edge tables");
|
|
|
|
// Alice and her edges are gone on feature; main is untouched.
|
|
assert_eq!(
|
|
count_rows_branch(&db, "feature", "node:Person").await,
|
|
main_people - 1,
|
|
"feature should have Alice removed from the inherited set"
|
|
);
|
|
assert!(
|
|
count_rows_branch(&db, "feature", "edge:Knows").await < main_knows,
|
|
"feature should have Alice's cascade-deleted Knows edges removed"
|
|
);
|
|
assert_eq!(
|
|
count_rows(&db, "node:Person").await,
|
|
main_people,
|
|
"main must be untouched by the branch delete"
|
|
);
|
|
}
|
|
|
|
// #283: a mutation predicate (`where camelField = ...`) on a camelCase column
|
|
// must execute, not fail at the Lance scan with "No field named ...". Covers
|
|
// both `update` (committed scan via scan_with_pending) and `delete`
|
|
// (delete_where), which share the same emitted SQL filter string.
|
|
const CC_SCHEMA: &str = r#"
|
|
node Doc {
|
|
slug: String @key
|
|
repoName: String @index
|
|
status: String?
|
|
}
|
|
"#;
|
|
const CC_DATA: &str = r#"{"type":"Doc","data":{"slug":"d1","repoName":"acme","status":"open"}}
|
|
{"type":"Doc","data":{"slug":"d2","repoName":"globex","status":"open"}}"#;
|
|
|
|
#[tokio::test]
|
|
async fn camelcase_mutation_predicate_updates_and_deletes() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, CC_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, CC_DATA, LoadMode::Overwrite).await.unwrap();
|
|
|
|
let m = r#"
|
|
query set_status($repo: String, $st: String) { update Doc set { status: $st } where repoName = $repo }
|
|
query del($repo: String) { delete Doc where repoName = $repo }
|
|
"#;
|
|
|
|
let upd = db
|
|
.mutate("main", m, "set_status", ¶ms(&[("$repo", "acme"), ("$st", "closed")]))
|
|
.await
|
|
.expect("update with a camelCase predicate must execute");
|
|
assert_eq!(upd.affected_nodes, 1, "exactly the acme Doc should update");
|
|
|
|
let del = db
|
|
.mutate("main", m, "del", ¶ms(&[("$repo", "globex")]))
|
|
.await
|
|
.expect("delete with a camelCase predicate must execute");
|
|
assert_eq!(del.affected_nodes, 1, "exactly the globex Doc should delete");
|
|
|
|
assert_eq!(count_rows(&db, "node:Doc").await, 1, "one Doc (acme) should remain");
|
|
}
|
|
|
|
// #283 (pending side): a chained mutation whose 2nd op filters a camelCase
|
|
// column must read op-1's staged rows through the pending DataFusion `MemTable`
|
|
// (`SELECT … WHERE {filter}` via ctx.sql), which lowercases unquoted idents.
|
|
// This is the path the single update/delete above does NOT exercise.
|
|
#[tokio::test]
|
|
async fn camelcase_chained_mutation_reads_pending_by_camelcase() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, CC_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, CC_DATA, LoadMode::Overwrite).await.unwrap();
|
|
|
|
// op-1 stages a status change to the acme Doc; op-2 re-filters the same
|
|
// camelCase column, so it must match op-1's pending row.
|
|
let m = r#"
|
|
query chain($repo: String) {
|
|
update Doc set { status: "stage1" } where repoName = $repo
|
|
update Doc set { status: "stage2" } where repoName = $repo
|
|
}
|
|
"#;
|
|
let r = db
|
|
.mutate("main", m, "chain", ¶ms(&[("$repo", "acme")]))
|
|
.await
|
|
.expect("chained camelCase mutation must read the pending row, not fail at the MemTable SELECT");
|
|
assert_eq!(r.affected_nodes, 2, "both ops should touch the acme Doc (read-your-writes)");
|
|
}
|