From 7d3a52d6744320a0ee9de38eecf084154764a55a Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Tue, 23 Jun 2026 21:27:31 +0200 Subject: [PATCH] feat(engine): `WriteTxn` - validate schema + open each data table once per write (#298) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * docs(rfc-013): step-3b handoff + §4.1 corrections (validated) Add the RFC-013 write-path handoff doc, and correct §4.1's WriteTxn sketch from the 4-subagent validation against current code: - HandleCache → handle-threading (forward the commit-return handle; a version-keyed cache misses because HEAD walks N→N+1→N+2 across staging + index-build commits). - "re-resolution unrepresentable" softened to "pinned base for the pre-commit phase + named fresh re-reads at the commit/fork boundary" — three reads (commit-time OCC, the live-HEAD drift probe, fork authority) are irreducible correctness machinery. - WriteParams DOES carry a session field; the real constraint is "stage off an open Dataset," so attach the Session by opening read-style then staging off it. * test(engine): RED step-3b capture-once fitness asserts + open_count probe Two write-path cost gates, RED today, GREEN after the WriteTxn lands: - write_validates_schema_contract_once: a write must validate the schema contract once (3 read_text + 2 exists). Today re-validates at every resolve point — measured 12 read_text / 9 exists (~4 validations) via CountingStorageAdapter (zero production change; the write twin of the read-path schema-once test). - keyed_insert_opens_table_at_most_once: a keyed single-table write must open its table <=1x. Today measured 10 opens. Adds an exact open-CALL probe: open_count + record_open() on QueryIoProbes (mirroring probe_count/record_probe), called at both open chokepoints; surfaced as IoCounts.open_count. forbidden_apis guarantees every write open routes through them. * feat(engine): WriteTxn carrier + open_write_txn (3b scaffolding) The capture-once write transaction (RFC-013 step 3b): WriteTxn{branch, base: Snapshot, session} + Omnigraph::open_write_txn, which validates the schema contract once and pins the base snapshot + the shared per-graph Session. Landed as reviewed scaffolding (gated #[allow(dead_code)]); the next pass threads Option<&WriteTxn> through open_for_mutation_on_branch / staging on the non-strict bound-branch path — opening the base once from the pinned entry with the warm session (a session-aware pinned opener returning a SnapshotHandle) and skipping the per-table schema re-validation — to turn the two RED cost gates green. Strict ops / fork / the commit-time OCC re-read keep their fresh reads. * test(engine): scope write-path open_count to data tables (RFC-013 step 3b) The keyed_insert_opens_table_at_most_once gate asserted open_count <= 1, but open_count was a single unclassified counter: record_open() fires in both open chokepoints, and open_dataset_tracked also opens the internal/system tables (__manifest via layout.rs, _graph_commits/_graph_commit_actors via commit_graph.rs). So the count conflated data-table opens with the publisher CAS + commit-graph append opens — making the gate measure the wrong quantity and unreachable by threading alone (the manifest publish keeps it >1 regardless). Scope it by table class, mirroring the read-side counters (which already split by URI prefix via separate wrappers): record_open(uri) classifies the open's last path segment and feeds data_open_count vs internal_open_count. IoCounts exposes both; the gate now asserts data_open_count <= 1. Re-baselined: a single keyed insert is data_open_count=4 / internal_open_count=6 (sum 10, the old conflated value). The RED target for the WriteTxn threading is now the real data-table-open count (4 -> 1), with internal opens correctly out of scope. Pure test-harness/instrumentation; no production behavior change (classification runs only inside the probe closure, skipped when no probes are installed). Also marks #297 (optimize-vs-write race) as landed in the step-3b handoff — this branch is already stacked on origin/main after it merged. * feat(engine): validate the schema contract once per write (RFC-013 step 3b) A single mutate/load re-validated the schema contract ~4 times: at the entry (ensure_schema_state_valid), per-table in open_for_mutation_on_branch (resolved_branch_target), at the commit-time OCC re-read (fresh_snapshot_for_branch), and in the publisher's index-build snapshot (snapshot_for_branch). Each validation is 3 read_text + 2 exists on the storage adapter — O(touched resolve-points) of redundant contract I/O on every write. Thread the already-landed WriteTxn carrier through the write path: capture `txn = open_write_txn(branch)` once at the mutate/load entry (the single validation), then source the per-table entry and the commit/publish snapshots from `txn.base` instead of re-resolving. When `txn` is None (branch merge, schema apply, tests) every function is byte-identical to before. - mutate_with_current_actor / load_jsonl_reader capture txn once (replacing the entry-point ensure_schema_state_valid) and thread Some(&txn) through execute_*/open_table_for_mutation, commit_all, and commit_updates_on_branch_with_expected. - open_for_mutation_on_branch sources (snapshot, branch) from txn.base/txn.branch when present — skipping resolved_branch_target's re-validation. The OPEN itself is unchanged (still HEAD via open_dataset_head_for_write), and strict ops keep ensure_expected_version. Schema-once applies to strict and non-strict alike; the data-open collapse is a separate change. - commit_all uses fresh_snapshot_for_branch_unchecked (the OCC manifest re-read minus the schema re-validation) when txn is present; the drift guard is unchanged. - prepare_updates_for_commit uses txn.base for the publisher index-build snapshot. fresh_snapshot_for_branch{,_unchecked} now read the manifest directly via ManifestCoordinator instead of resolve_target. The OCC re-read consumes only the Snapshot (per-table location + version), which ManifestCoordinator::open().snapshot() produces identically — but resolve_target additionally opened the commit graph (a spurious _graph_commits.lance exists probe the OCC read never consults). Dropping that load is a pure read-cost reduction for every fresh-snapshot caller (commit_all's None arm, optimize, repair, fork reclaim); the returned Snapshot is unchanged and the read is a fresher cold manifest re-read, so the OCC freshness guarantee is preserved. Greens write_validates_schema_contract_once (3 read_text / 2 exists, was 12/9). keyed_insert_opens_table_at_most_once stays red (data_open_count=4) — the open collapse lands next. Full engine suite green otherwise. * feat(engine): open each data table once per write (RFC-013 step 3b) A single keyed-node mutate opened its data table 4 times: accumulation (to read .version()), staging (the real write base), the commit-time drift guard (to read live HEAD), and the publisher's index build (reopen at the just-committed version). Collapse three of the four — using the WriteTxn carrier threaded for schema-once — so a write opens each touched data table at most once. - #1 accumulation: open_for_mutation_on_branch now returns (Option, expected_version, full_path, table_branch). On the txn's own branch, a non-strict (Insert/Merge) op needs no open — the only thing the caller reads is .version() (the CAS fence), which is exactly the pinned base version (entry.table_version). So skip open_dataset_head_for_write and source the version from txn.base. The node insert path already discarded that handle; the edge path resolves a pinned read only when non-default cardinality needs it. STRICT ops and any write that must fork still open live HEAD + ensure_expected_version. - #3 commit drift guard: commit_all reads live HEAD via entry.dataset.dataset().latest_version_id() — a cheap manifest-pointer probe off the already-open staging handle (the same primitive ManifestCoordinator:: probe_latest_version uses) instead of a fresh open_dataset_head_for_write. The headcurrent drift classification is byte-identical. - #4 index build: commit_all now returns the per-table post-commit_staged SnapshotHandle map; commit_updates_on_branch_with_expected threads it into prepare_updates_for_commit, which builds indices on the threaded handle instead of reopening at the same just-committed version. Absent a handle (other writers, inline/delete tables) the reopen path is byte-identical. When txn is None (branch merge, schema apply, tests) every function opens and checks exactly as before. Greens keyed_insert_opens_table_at_most_once (data_open_count 4->1). Schema-once gate stays 3/2. Full engine suite + failpoints (recovery sidecar lifecycle) green. * refactor(engine): name the write-path open/commit returns (RFC-013 step 3b) The open collapse left two positional returns that are easy to mis-thread and carry an unwritten contract: open_for_mutation_on_branch's (Option, u64, String, Option) and commit_all's 5-tuple (updates, expected_versions, sidecar_handle, guards, committed_handles). Replace both with named structs so each field reads at the call site and the Option's contract is documented, not folklore. - OpenedForMutation { handle, expected_version, full_path, table_branch } with a require_handle(ctx) helper for the callers that must have a handle (strict ops, the fork path, every no-txn caller — branch merge, the seed test). The handle is None only on the non-strict-txn open-skip path (collapse #1); require_handle panics with a named context if that contract is ever broken. - CommittedMutation { updates, expected_versions, sidecar_handle, guards, committed_handles } for commit_all; consumers destructure into the same local bindings they already used, so the publish/sidecar/guard-hold logic is unchanged. - A debug_assert in open_table_for_mutation pins the skip contract: a missing handle is legal only on the non-strict txn path, so a future strict arm returning None trips in debug builds instead of handing None to a require_handle consumer. Pure refactor — no behavior change. Both cost gates stay green (schema 3/2, data_open_count=1), full engine suite + lib (162) green. * refactor(engine): drop the unearned session field from WriteTxn (RFC-013 step 3b) The open collapse greens data_open_count<=1 by SKIPPING the accumulation open, PROBING live HEAD with latest_version_id, and REUSING the commit_staged handle — none of which consume a session. The captured WriteTxn.session was therefore dead (`#[allow(dead_code)]`): unearned surface a reviewer rightly flags. Remove it. The carrier is now {branch, base} — exactly what schema-once + the open collapse use. Step 5 (PublishPlan unification) makes WriteTxn the non-optional publish carrier and is the right home for session-aware base opens, where the warm-session benefit on the single remaining open — an object-store (S3) phenomenon, invisible on local FS — can be earned by its own cost gate rather than carried dead through this PR. No behavior change; both cost gates stay green (schema 3/2, data_open_count=1). * docs(rfc-013): mark step 3b DONE — schema-once + open-collapse shipped, session deferred to step 5 * docs(rfc-013): capture the write-base-staleness convergence (§1d) Three findings this cycle share one root — the write base is a stale, un-probed, un-classified pin (the read path probes; the write path returns the warm coordinator snapshot): - #298 edge-@card stale-read regression (cursor High / codex P1, VALID): collapse #1 made the cardinality scan read txn.base instead of live HEAD, so a concurrent edge is uncounted and a max can be exceeded. Fix on #298: restore the live-HEAD read + deterministic test + correct the single-writer doc comment. - The structural liability underneath: no unified write-validation read-set — endpoint/cardinality/uniqueness each pick freshness ad hoc (warm/pinned/live), the same cardinality check forks mutation-vs-loader, none re-validated at commit. - The served-strict-write stale-view false-fail (validated on prod + a #[ignore] repro): a strict update/delete false-fails ExpectedVersionMismatch after an external optimize advance — the write-side mirror of #297/§6.6. The naive blanket probe is proven wrong (breaks the cross-process lost-update OCC contract). All three converge on Design A (step 5): open_txn's warm probe makes the base fresh, the op-class-aware precondition (derive maintenance vs logical from Lance per-version transaction metadata — no parallel marker) fast-forwards maintenance and fails logical, and §7.1's read-set-in-CAS unifies + re-validates the validation read-set. §8 records the #298 follow-up, the widened §7.1 scope, and the step-5 two-test acceptance contract. * test(engine): RED — edge @card must scan live HEAD, not stale txn.base (#298) Regression guard for the cursor-High/codex-P1 finding on #298: 3b's collapse #1 made the non-strict edge-insert cardinality scan read the pinned txn.base instead of live HEAD (edge_cardinality_read_handle), so a concurrent edge committed after txn capture is uncounted and a @card max is silently exceeded (invariant 9). Deterministic two-handle test (no failpoint): handle A commits WorksAt(Alice->Acme) to the @card(0..1) max; stale handle B (never read since) inserts a second WorksAt for Alice. B's coordinator is stale by construction (the write path doesn't probe), so B scans txn.base (Alice has 0) and wrongly commits the 2nd edge. RED: the insert that must be rejected currently succeeds (panics at unwrap_err). Goes green when the scan reads live HEAD. * fix(engine): scan live HEAD for edge @card, not the pinned txn.base (#298) 3b's collapse #1 skips the non-strict edge accumulation open, so edge_cardinality_ read_handle reopened the edge table at the pinned txn.base for the @card scan. Since cardinality is validated once (never rechecked at commit), a concurrent edge committed after txn capture was uncounted and a @card max could be silently exceeded (invariant 9) — the cursor-High/codex-P1 regression on #298. Pre-3b the scan read live HEAD (the mutation's own open_dataset_head_for_write handle). Restore the live-HEAD read: take the table LOCATION from the pinned entry (stable across versions) and open the dataset at its current HEAD via open_dataset_head_for_ write. Gate-safe — the data_open_count / merge-insert-only gates are node inserts; the edge cardinality path (non-default @card only) is untouched by them, and the extra live-HEAD open is exactly the pre-3b shape. Also drops the dead None-fallback's schema re-validation (greptile P2, auto-resolved). The residual validate->commit TOCTOU is the pre-existing §7.1 gap (RFC-013 step 4), recorded in handoff §1d/§8. Turns cardinality_rejected_for_stale_handle_after_concurrent_edge_commit green; validators / write_cost / writes / consistency / end_to_end / branching all green. * docs(dev): link handoff docs from index * docs(engine): tighten 3b claims to match the code (#298 review) Review caught several comments/docs overclaiming what the code does (the session drop + the #298 cardinality fix left stale/too-strong wording). No logic change. - open_write_txn doc: drop the stale "shared per-graph Session" (WriteTxn no longer carries one); scope "once" to the table-touch hot path and note edge/load RI validation still re-resolves (→ step 4 §7.1) + the session-aware open is step 5. - edge cardinality call-site comment: it said the scan uses a "pinned txn.base" — it now opens LIVE HEAD (#298); corrected. - write_cost.rs: "opens the base once (with the shared Session)" → session-aware base open is deferred to step 5. - data_open_count completeness (instrumentation.rs + write_cost.rs): forbidden_apis only keeps engine code OUTSIDE the storage layer on the chokepoints; table_store.rs is allow-listed and holds direct Dataset::opens for branch-management ops (not the keyed-write hot path the gate measures). Narrowed the claim accordingly. - handoff §4: "schema once / open once" is the node hot path (the two gates); edge endpoint + loader RI/cardinality still re-validate and read warm — #298 un-regresses cardinality only, it does NOT close write-validation freshness (that's step 4 §1d/§7.1). build clean; write_cost / validators / forbidden_apis green. --- crates/omnigraph/src/db/mod.rs | 1 + crates/omnigraph/src/db/omnigraph.rs | 108 ++++- .../omnigraph/src/db/omnigraph/table_ops.rs | 185 +++++++- crates/omnigraph/src/exec/merge.rs | 18 +- crates/omnigraph/src/exec/mutation.rs | 181 ++++++-- crates/omnigraph/src/exec/staging.rs | 81 +++- crates/omnigraph/src/instrumentation.rs | 52 +++ crates/omnigraph/src/loader/mod.rs | 64 ++- crates/omnigraph/tests/helpers/cost.rs | 16 + crates/omnigraph/tests/validators.rs | 52 +++ crates/omnigraph/tests/write_cost.rs | 87 +++- docs/dev/handoff-rfc-013-write-path.md | 430 ++++++++++++++++++ .../handoff-schema-apply-recovery-flake.md | 216 +++++++++ docs/dev/index.md | 2 + docs/dev/rfc-013-write-path-latency.md | 18 +- 15 files changed, 1405 insertions(+), 106 deletions(-) create mode 100644 docs/dev/handoff-rfc-013-write-path.md create mode 100644 docs/dev/handoff-schema-apply-recovery-flake.md diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index f382908..587c980 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -10,6 +10,7 @@ pub use commit_graph::GraphCommit; pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId}; pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate}; pub(crate) use omnigraph::ensure_public_branch_ref; +pub(crate) use omnigraph::WriteTxn; pub use omnigraph::{ CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, PendingIndex, RepairAction, RepairClassification, RepairOptions, RepairStats, SchemaApplyOptions, diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index e1d7acf..4824b78 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -41,6 +41,7 @@ pub use repair::{ }; pub use schema_apply::SchemaApplyOptions; pub use table_ops::PendingIndex; +pub(crate) use table_ops::OpenedForMutation; use super::commit_graph::GraphCommit; use super::manifest::{ @@ -79,6 +80,35 @@ pub struct SchemaApplyPreview { pub catalog: Catalog, } +/// A capture-once write transaction (RFC-013 step 3b). Pins the operation's read +/// base ONCE so the per-table opens reuse the pinned version instead of +/// re-resolving / re-validating per table. The schema contract is validated once +/// (when `base` is captured). NOT a general "no re-resolution" handle — the +/// commit-time OCC re-read, the live-HEAD drift probe, and the fork-authority reads +/// stay fresh (correctness machinery). Step 5 (PublishPlan unification) makes this +/// the non-optional publish carrier and adds session-aware base opens there, gated +/// by an S3 cost test — the warm-session benefit on the single remaining open is an +/// object-store phenomenon, so it earns its own gate rather than riding this PR. +/// +/// Threaded as `Option<&WriteTxn>` through the mutate/load write chain +/// (`open_for_mutation_on_branch`, `commit_all`, `commit_updates_on_branch_with_expected`) +/// so a single write validates the schema contract EXACTLY ONCE — at capture. When +/// present, the per-table resolves source the pinned `base` entry instead of calling +/// `resolved_branch_target` / `snapshot_for_branch` / `fresh_snapshot_for_branch` +/// (each of which re-runs `ensure_schema_state_valid`). When absent (`None` — every +/// non-mutate/load caller), every threaded function behaves byte-identically to +/// before. The carrier never removes a version guard or changes which dataset version +/// the per-table open targets: strict ops keep `open_dataset_head_for_write` + +/// `ensure_expected_version`, and the commit-time OCC re-read still opens a fresh +/// manifest snapshot (via `fresh_snapshot_for_branch_unchecked`) — only the redundant +/// schema re-validation is dropped. +pub(crate) struct WriteTxn { + /// The resolved branch (`None` = main). + pub(crate) branch: Option, + /// The pinned base snapshot (per-table location + version + e_tag), captured once. + pub(crate) base: Snapshot, +} + /// Top-level handle to an Omnigraph database. /// /// An Omnigraph is a Lance-native graph database with git-style branching. @@ -736,6 +766,29 @@ impl Omnigraph { *self.coordinator.write().await = coordinator; } + /// Open a capture-once write transaction (RFC-013 step 3b): validate the schema + /// contract ONCE and pin the base snapshot. The per-table opens take + /// `Option<&WriteTxn>` and, on the bound branch for the non-strict (Insert/Merge) + /// path, source the pinned base entry — instead of re-resolving (re-validating the + /// schema) per table. Strict ops, the fork path, and the commit-time OCC re-read + /// keep their fresh reads (those are correctness machinery — see the handoff doc). + /// + /// "Once" covers the table-touch hot path captured here (proven by the node-insert + /// gate `write_validates_schema_contract_once`); it does NOT yet cover edge endpoint + /// / cardinality RI validation (`ensure_node_id_exists`, the loader's RI/cardinality), + /// which still resolve through `snapshot_for_branch` and re-validate. Those reads must + /// observe LIVE committed state, so unifying them (validate-once + pinned + re-checked + /// read-set) is step 4's §7.1 work — threading `txn.base` there would re-introduce the + /// stale-read class the #298 cardinality fix removed. A session-aware base open is + /// likewise deferred to step 5 (handoff §1d). + pub(crate) async fn open_write_txn(&self, branch: Option<&str>) -> Result { + let resolved = self.resolved_branch_target(branch).await?; + Ok(WriteTxn { + branch: resolved.branch, + base: resolved.snapshot, + }) + } + pub(crate) async fn resolved_branch_target( &self, branch: Option<&str>, @@ -770,12 +823,39 @@ impl Omnigraph { pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result { self.ensure_schema_state_valid().await?; - let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string()); - let coord = self.coordinator.read().await; - coord - .resolve_target(&requested) - .await - .map(|resolved| resolved.snapshot) + self.fresh_snapshot_for_branch_unchecked(branch).await + } + + /// Fresh per-branch manifest snapshot WITHOUT the schema-contract + /// re-validation. Identical OCC freshness to [`fresh_snapshot_for_branch`] + /// — a fresh manifest re-read from storage, never the warm cache — only the + /// redundant `ensure_schema_state_valid` is dropped. Used inside a single + /// write once a `WriteTxn` has already validated the contract at capture: the + /// commit-time drift re-read needs the live manifest, not a second contract + /// read. Callers with no `WriteTxn` MUST use the checked variant. + /// + /// Reads the manifest directly via `ManifestCoordinator` rather than + /// `resolve_target`. The OCC re-read uses only the returned `Snapshot` + /// (per-table location + version), which `ManifestCoordinator::open().snapshot()` + /// produces identically to `GraphCoordinator::open(...).snapshot()` — but + /// `resolve_target` additionally opens the commit graph (an extra + /// `_graph_commits.lance` probe) the OCC read never consults. Skipping that + /// load is a pure read-cost reduction, not a freshness change. The checked + /// `fresh_snapshot_for_branch` delegates here, so its no-`txn` callers + /// (commit_all's None arm, optimize, repair, fork reclaim) get the same + /// identical `Snapshot` via this lighter manifest-only read; they consume + /// only the snapshot and never relied on the commit-graph side load. + pub(crate) async fn fresh_snapshot_for_branch_unchecked( + &self, + branch: Option<&str>, + ) -> Result { + let manifest = match branch { + Some(branch) => { + crate::db::manifest::ManifestCoordinator::open_at_branch(self.uri(), branch).await? + } + None => crate::db::manifest::ManifestCoordinator::open(self.uri()).await?, + }; + Ok(manifest.snapshot()) } pub(crate) async fn version(&self) -> u64 { @@ -1599,7 +1679,7 @@ impl Omnigraph { &self, table_key: &str, op_kind: crate::db::MutationOpKind, - ) -> Result<(SnapshotHandle, String, Option)> { + ) -> Result { table_ops::open_for_mutation(self, table_key, op_kind).await } @@ -1608,8 +1688,9 @@ impl Omnigraph { branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, - ) -> Result<(SnapshotHandle, String, Option)> { - table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await + txn: Option<&crate::db::WriteTxn>, + ) -> Result { + table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind, txn).await } /// Fork `table_key` onto `active_branch` from the given source state, @@ -1728,6 +1809,8 @@ impl Omnigraph { updates: &[crate::db::SubTableUpdate], expected_table_versions: &std::collections::HashMap, actor_id: Option<&str>, + txn: Option<&crate::db::WriteTxn>, + committed_handles: std::collections::HashMap, ) -> Result { table_ops::commit_updates_on_branch_with_expected( self, @@ -1735,6 +1818,8 @@ impl Omnigraph { updates, expected_table_versions, actor_id, + txn, + committed_handles, ) .await } @@ -2466,10 +2551,13 @@ edge WorksAt: Person -> Company } async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option) { + // No-txn entry, so the handle is always `Some` (collapse #1's skip is + // gated on `txn.is_some()`). let (ds, full_path, table_branch) = db .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert) .await - .unwrap(); + .unwrap() + .require_handle("seed_person_row test"); let schema: Arc = Arc::new(ds.dataset().schema().into()); let columns: Vec> = schema .fields() diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index ed5d082..93fdddf 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -488,18 +488,52 @@ pub(super) async fn needs_index_work_edge( || !db.storage().has_btree_index(&ds, "dst").await?) } +/// Result of opening a sub-table for mutation. `handle` is `None` only when a +/// non-strict (Insert/Merge) op on the WriteTxn's own branch skipped the +/// accumulation open (RFC-013 step 3b collapse #1) — there the caller needs just +/// `expected_version`. It is ALWAYS `Some` for strict ops, the fork path, and +/// every no-`txn` caller (branch merge), which use [`Self::require_handle`]. +#[derive(Debug)] +pub(crate) struct OpenedForMutation { + /// The opened dataset, or `None` on the non-strict-txn open-skip path. + pub(crate) handle: Option, + /// The publisher's CAS fence: the opened handle's version, or — when the open + /// was skipped — the pinned base entry's version (equal absent uncovered drift). + pub(crate) expected_version: u64, + pub(crate) full_path: String, + pub(crate) table_branch: Option, +} + +impl OpenedForMutation { + /// Destructure for a caller that REQUIRES the handle (strict ops, the fork + /// path, every no-`txn` caller). The `None` skip fires solely on the + /// non-strict `txn` path, which these callers are not — so a panic here means + /// a future change broke that contract, named by `ctx`. + pub(crate) fn require_handle(self, ctx: &str) -> (SnapshotHandle, String, Option) { + let handle = self.handle.unwrap_or_else(|| { + panic!("{ctx}: open_for_mutation returned no handle on a path that requires one") + }); + (handle, self.full_path, self.table_branch) + } +} + pub(super) async fn open_for_mutation( db: &Omnigraph, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(SnapshotHandle, String, Option)> { +) -> Result { let current_branch = db .coordinator .read() .await .current_branch() .map(str::to_string); - open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind).await + // `open_for_mutation` is the no-txn entry (branch merge). Passing `None` + // keeps the exact pre-WriteTxn code path (a fresh `resolved_branch_target` + // that re-validates the schema). With `txn = None` the non-strict early-skip + // in `open_for_mutation_on_branch` never fires, so this always returns a + // `Some(handle)` for its callers. + open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind, None).await } /// Open a sub-table for mutation. The `op_kind` selects the strict-vs-relaxed @@ -513,15 +547,69 @@ pub(super) async fn open_for_mutation_on_branch( branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(SnapshotHandle, String, Option)> { + txn: Option<&crate::db::WriteTxn>, +) -> Result { db.ensure_schema_apply_not_locked("write").await?; - let resolved = db.resolved_branch_target(branch).await?; - let entry = resolved - .snapshot + // Source the resolved (snapshot, branch). With a `WriteTxn` the contract was + // validated once at capture, so use the pinned base + resolved branch instead + // of `resolved_branch_target` (which re-runs `ensure_schema_state_valid`). The + // base is the same fresh per-branch manifest read the no-txn path would have + // resolved — only the redundant schema re-validation is dropped. Without a txn + // this is byte-identical to the prior `resolved_branch_target` call. + let (snapshot, resolved_branch) = match txn { + Some(txn) => (txn.base.clone(), txn.branch.clone()), + None => { + let resolved = db.resolved_branch_target(branch).await?; + (resolved.snapshot, resolved.branch) + } + }; + let entry = snapshot .entry(table_key) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; let full_path = format!("{}/{}", db.root_uri, entry.table_path); - match resolved.branch.as_deref() { + + // Collapse #1 (RFC-013 step 3b): a non-strict op (Insert/Merge) on the txn's + // own branch needs no dataset open for ACCUMULATION — the only thing the + // caller reads from this handle on the non-strict path is `.version()` (the + // publisher's CAS fence), which is exactly the pinned base version. The base + // already validated the schema contract once, and the staging reopen + // (`reopen_for_mutation`) plus the publisher CAS in `commit_all` are the real + // drift guards. So skip `open_dataset_head_for_write` entirely and source the + // expected version from the pinned entry. + // + // Gated on `txn.is_some()`: without a txn (branch merge's `open_for_mutation`) + // every arm below is byte-identical to before. STRICT ops (Update/Delete/ + // SchemaRewrite) always open live HEAD + run `ensure_expected_version` + // (read-modify-write SI), and any write that must FORK (the table isn't yet on + // the resolved branch) opens too (the fork is a real Lance state advance the + // manifest snapshot can't substitute for). + if txn.is_some() && !op_kind.strict_pre_stage_version_check() { + match resolved_branch.as_deref() { + // Non-strict, table already on the active branch → no open, no fork. + Some(active_branch) if entry.table_branch.as_deref() == Some(active_branch) => { + return Ok(OpenedForMutation { + handle: None, + expected_version: entry.table_version, + full_path, + table_branch: Some(active_branch.to_string()), + }); + } + // Main branch, non-strict → no open. (Main never forks.) + None => { + return Ok(OpenedForMutation { + handle: None, + expected_version: entry.table_version, + full_path, + table_branch: None, + }); + } + // Non-strict but the table isn't on the active branch yet — falls + // through to fork below. + Some(_) => {} + } + } + + match resolved_branch.as_deref() { None => { let ds = db .storage() @@ -531,7 +619,13 @@ pub(super) async fn open_for_mutation_on_branch( db.storage() .ensure_expected_version(&ds, table_key, entry.table_version)?; } - Ok((ds, full_path, None)) + let version = ds.version(); + Ok(OpenedForMutation { + handle: Some(ds), + expected_version: version, + full_path, + table_branch: None, + }) } Some(active_branch) => { let (ds, table_branch) = open_owned_dataset_for_branch_write( @@ -544,7 +638,13 @@ pub(super) async fn open_for_mutation_on_branch( op_kind, ) .await?; - Ok((ds, full_path, table_branch)) + let version = ds.version(); + Ok(OpenedForMutation { + handle: Some(ds), + expected_version: version, + full_path, + table_branch, + }) } } } @@ -1065,12 +1165,30 @@ async fn prepare_updates_for_commit( db: &Omnigraph, branch: Option<&str>, updates: &[crate::db::SubTableUpdate], + txn: Option<&crate::db::WriteTxn>, + // Post-`commit_staged` handles handed out by `StagedMutation::commit_all` + // (RFC-013 step 3b, collapse #4): table_key → the handle already open at + // its just-committed version. When a table's handle is present, the index + // build below reuses it and SKIPS the `reopen_for_mutation` open. Absent + // entries (other writers — schema apply, merge, ensure_indices, tests — + // pass `HashMap::new()`; inline-committed/delete tables are never staged) + // keep the byte-identical `reopen_for_mutation` path. + mut committed_handles: std::collections::HashMap, ) -> Result> { if updates.is_empty() { return Ok(Vec::new()); } - let snapshot = db.snapshot_for_branch(branch).await?; + // With a `WriteTxn` the schema contract was validated once at capture, so + // reuse the pinned base entries (same per-branch manifest snapshot) instead + // of `snapshot_for_branch` (which re-runs `ensure_schema_state_valid`). Only + // the `entry(table_key).table_path` is read out of it here, identical to the + // no-txn path; the post-`commit_staged` index build below still reopens the + // dataset at its just-committed version. Without a txn, byte-identical. + let snapshot = match txn { + Some(txn) => txn.base.clone(), + None => db.snapshot_for_branch(branch).await?, + }; let mut prepared = Vec::with_capacity(updates.len()); for update in updates { @@ -1084,21 +1202,34 @@ async fn prepare_updates_for_commit( let mut prepared_update = update.clone(); if prepared_update.row_count > 0 { let full_path = format!("{}/{}", db.root_uri, entry.table_path); - // Strict version check is correct here: this runs INSIDE + // Reuse the post-`commit_staged` handle when the caller handed one + // out (collapse #4): it is already open at exactly + // `prepared_update.table_version`, so the defense-in-depth strict + // re-check `reopen_for_mutation` would run is trivially satisfied + // and the open is redundant. When no handle is present (other + // writers, or any non-staged table), fall back to the byte-identical + // `reopen_for_mutation` path. + // + // Strict version check is correct on the fallback: this runs INSIDE // the publisher commit path, after `commit_staged` already // advanced Lance HEAD to `prepared_update.table_version`. // The check is a defense-in-depth assertion that the // dataset state matches what we just committed; not the // pre-stage race the op-kind policy targets. - let mut ds = reopen_for_mutation( - db, - &prepared_update.table_key, - &full_path, - prepared_update.table_branch.as_deref(), - prepared_update.table_version, - crate::db::MutationOpKind::SchemaRewrite, - ) - .await?; + let mut ds = match committed_handles.remove(&prepared_update.table_key) { + Some(ds) => ds, + None => { + reopen_for_mutation( + db, + &prepared_update.table_key, + &full_path, + prepared_update.table_branch.as_deref(), + prepared_update.table_version, + crate::db::MutationOpKind::SchemaRewrite, + ) + .await? + } + }; // Any column not yet buildable (e.g. a vector column whose rows // have null embeddings) is deferred and logged inside // build_indices; a later ensure_indices/optimize materializes it. @@ -1237,7 +1368,14 @@ pub(super) async fn commit_updates( .await .current_branch() .map(str::to_string); - let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?; + let prepared = prepare_updates_for_commit( + db, + current_branch.as_deref(), + updates, + None, + std::collections::HashMap::new(), + ) + .await?; commit_prepared_updates(db, &prepared, None).await } @@ -1281,9 +1419,12 @@ pub(super) async fn commit_updates_on_branch_with_expected( updates: &[crate::db::SubTableUpdate], expected_table_versions: &std::collections::HashMap, actor_id: Option<&str>, + txn: Option<&crate::db::WriteTxn>, + committed_handles: std::collections::HashMap, ) -> Result { db.ensure_schema_apply_not_locked("write commit").await?; - let prepared = prepare_updates_for_commit(db, branch, updates).await?; + let prepared = + prepare_updates_for_commit(db, branch, updates, txn, committed_handles).await?; commit_prepared_updates_on_branch_with_expected( db, branch, diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 600fdf1..0a1f9e0 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1068,10 +1068,13 @@ async fn publish_rewritten_merge_table( // source onto target). The inline `delete_where` later in this // function operates on rows the rewrite chose to remove, not // user-facing predicates, so Merge is the correct policy here. - let (ds, full_path, table_branch) = target_db + // `open_for_mutation` is the no-txn entry, so collapse #1's non-strict + // open-skip (gated on `txn.is_some()`) never fires here — the handle is + // always `Some`. + let (mut current_ds, full_path, table_branch) = target_db .open_for_mutation(table_key, crate::db::MutationOpKind::Merge) - .await?; - let mut current_ds = ds; + .await? + .require_handle("branch merge"); // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for // existing rows, bumps _row_last_updated_at_version only for actually-changed rows). @@ -1237,10 +1240,13 @@ async fn publish_adopted_delta( table_key: &str, delta: &AdoptDelta, ) -> Result { - let (ds, full_path, table_branch) = target_db + // `open_for_mutation` is the no-txn entry, so collapse #1's non-strict + // open-skip (gated on `txn.is_some()`) never fires here — the handle is + // always `Some`. + let (mut current_ds, full_path, table_branch) = target_db .open_for_mutation(table_key, crate::db::MutationOpKind::Merge) - .await?; - let mut current_ds = ds; + .await? + .require_handle("branch merge"); // Phase 1a: append the NEW rows. `stage_append_stream` is a streaming // `Operation::Append` — no hash join — so it never buffers the delta and diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index fbd0751..6411522 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -601,13 +601,51 @@ use super::staging::{MutationStaging, PendingMode}; /// away once Lance exposes a two-phase delete API /// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658)) /// and we can stage deletes on the same path as inserts/updates. +impl Omnigraph { + /// Resolve a LIVE-HEAD read handle for an edge table's committed-state `@card` + /// scan when collapse #1 skipped the accumulation open. The edge-insert path no + /// longer opens the edge dataset (non-strict op + txn), but cardinality is + /// validated ONCE (never rechecked at commit), so the scan must observe the + /// freshest committed edges — NOT the pinned `txn.base`. A concurrent writer can + /// commit edges to this table after `txn` capture; counting against the stale + /// base undercounts and lets a violating insert through (invariant 9). The table + /// LOCATION is read from the pinned entry (stable across versions); the dataset is + /// opened at live HEAD via `open_dataset_head_for_write` (a read here despite the + /// name — no lock/stage), restoring the pre-3b image (the mutation's own open). + /// The residual validate→commit race (a writer committing between this scan and + /// the end-of-query commit) is the §7.1 gap, closed by RFC-013 step 4. + async fn edge_cardinality_read_handle( + &self, + txn: Option<&crate::db::WriteTxn>, + table_key: &str, + ) -> Result { + let branch = txn.and_then(|t| t.branch.as_deref()); + match txn.and_then(|t| t.base.entry(table_key)) { + Some(entry) => { + let full_path = self.storage().dataset_uri(&entry.table_path); + self.storage() + .open_dataset_head_for_write(table_key, &full_path, branch) + .await + } + // Unreachable today (the `None` handle only reaches here under a txn whose + // base contains the table). Defensive: resolve the table fresh (live) + // without the schema re-validation `snapshot_for_branch` would re-run. + None => { + let snapshot = self.fresh_snapshot_for_branch_unchecked(branch).await?; + self.storage().open_snapshot_at_table(&snapshot, table_key).await + } + } + } +} + async fn open_table_for_mutation( db: &Omnigraph, staging: &mut MutationStaging, branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(SnapshotHandle, String, Option)> { + txn: Option<&crate::db::WriteTxn>, +) -> Result<(Option, String, Option)> { if let Some(prior) = staging.inline_committed.get(table_key) { let path = staging.paths.get(table_key).ok_or_else(|| { OmniError::manifest_internal(format!( @@ -615,6 +653,10 @@ async fn open_table_for_mutation( table_key )) })?; + // The inline-committed reopen does NOT validate the schema contract + // (it reopens at the post-inline-commit Lance version directly), so it + // takes no `txn` — threading it here would change nothing. Deletes are + // strict ops, so this always opens (returns `Some`). let ds = db .reopen_for_mutation( table_key, @@ -624,20 +666,32 @@ async fn open_table_for_mutation( op_kind, ) .await?; - return Ok((ds, path.full_path.clone(), path.table_branch.clone())); + return Ok((Some(ds), path.full_path.clone(), path.table_branch.clone())); } - let (ds, full_path, table_branch) = db - .open_for_mutation_on_branch(branch, table_key, op_kind) + // `open_for_mutation_on_branch` returns the expected version even when it + // skips the open (collapse #1, the non-strict insert/merge path): the version + // is the pinned base's, identical to the opened handle's `.version()`. Use it + // directly for `ensure_path` so the no-open path still captures the publisher + // CAS fence. + let opened = db + .open_for_mutation_on_branch(branch, table_key, op_kind, txn) .await?; - let expected_version = ds.version(); + // Pin the open-skip contract (collapse #1): a missing handle is legal ONLY on + // the non-strict `txn` path. A future change that returns `None` elsewhere + // (e.g. a new strict arm) trips this in debug builds rather than silently + // handing a `None` to a `require_handle` consumer. + debug_assert!( + opened.handle.is_some() || (txn.is_some() && !op_kind.strict_pre_stage_version_check()), + "open_for_mutation_on_branch returned no handle outside the non-strict txn open-skip path", + ); staging.ensure_path( table_key, - full_path.clone(), - table_branch.clone(), - expected_version, + opened.full_path.clone(), + opened.table_branch.clone(), + opened.expected_version, op_kind, ); - Ok((ds, full_path, table_branch)) + Ok((opened.handle, opened.full_path, opened.table_branch)) } /// D₂ parse-time check: a single mutation query is either insert/update-only @@ -720,14 +774,14 @@ impl Omnigraph { params: &ParamMap, actor_id: Option<&str>, ) -> Result { - self.ensure_schema_state_valid().await?; // Converge any pending recovery sidecar (a previously failed // writer's Phase B → Phase C residual) before executing: the // inline delete path advances Lance HEAD during execution and // the staged path's commit-time drift guard refuses // sidecar-covered drift, so a long-lived handle must heal here // — not at restart. One `list_dir` when no sidecars exist (the - // steady state). + // steady state). MUST run before `open_write_txn` below — the heal + // may advance the manifest, so the pinned base must be captured after. self.heal_pending_recovery_sidecars().await?; let requested = Self::normalize_branch_name(branch)?; // Reject internal `__run__*` / system-prefixed branches at the @@ -737,6 +791,16 @@ impl Omnigraph { if let Some(name) = requested.as_deref() { crate::db::ensure_public_branch_ref(name, "mutate")?; } + // Capture-once write transaction (RFC-013 step 3b). `open_write_txn` + // validates the schema contract ONCE (it resolves the branch target, + // whose first line is `ensure_schema_state_valid`) and pins the base + // snapshot for this write. Threaded as `Some(&txn)` through execution, + // staging commit, and the manifest publish so the per-table opens and + // the commit-time OCC re-read reuse the pinned base instead of + // re-validating the contract at every resolve point. Captured AFTER the + // recovery heal (which may advance the manifest) and AFTER `requested` + // is known so it pins the post-heal snapshot for the correct branch. + let txn = self.open_write_txn(requested.as_deref()).await?; let resolved_params = enrich_mutation_params(params)?; // Per-query staging accumulator. Inserts and updates push batches @@ -785,7 +849,13 @@ impl Omnigraph { }; let exec_result = self - .execute_named_mutation(&ir, &resolved_params, requested.as_deref(), &mut staging) + .execute_named_mutation( + &ir, + &resolved_params, + requested.as_deref(), + &mut staging, + Some(&txn), + ) .await; match exec_result { @@ -799,13 +869,20 @@ impl Omnigraph { // interleave between our commit_staged and our publish // (which would correctly fail our CAS but leave Lance // HEAD advanced — the residual class MR-870 recovers). - let (updates, expected_versions, sidecar_handle, _queue_guards) = staged + let super::staging::CommittedMutation { + updates, + expected_versions, + sidecar_handle, + guards: _queue_guards, + committed_handles, + } = staged .commit_all( self, requested.as_deref(), crate::db::manifest::SidecarKind::Mutation, actor_id, fork_queue_guards, + Some(&txn), ) .await?; // Failpoint that wedges the documented finalize→publisher @@ -824,6 +901,8 @@ impl Omnigraph { &updates, &expected_versions, actor_id, + Some(&txn), + committed_handles, ) .await?; // Phase C succeeded — sidecar can be deleted. If this @@ -938,6 +1017,7 @@ impl Omnigraph { params: &ParamMap, branch: Option<&str>, staging: &mut MutationStaging, + txn: Option<&crate::db::WriteTxn>, ) -> Result { let mut total = MutationResult::default(); for op in &ir.ops { @@ -946,7 +1026,7 @@ impl Omnigraph { type_name, assignments, } => { - self.execute_insert(type_name, assignments, params, branch, staging) + self.execute_insert(type_name, assignments, params, branch, staging, txn) .await? } MutationOpIR::Update { @@ -954,14 +1034,16 @@ impl Omnigraph { assignments, predicate, } => { - self.execute_update(type_name, assignments, predicate, params, branch, staging) - .await? + self.execute_update( + type_name, assignments, predicate, params, branch, staging, txn, + ) + .await? } MutationOpIR::Delete { type_name, predicate, } => { - self.execute_delete(type_name, predicate, params, branch, staging) + self.execute_delete(type_name, predicate, params, branch, staging, txn) .await? } }; @@ -978,6 +1060,7 @@ impl Omnigraph { params: &ParamMap, branch: Option<&str>, staging: &mut MutationStaging, + txn: Option<&crate::db::WriteTxn>, ) -> Result { let mut resolved: HashMap = HashMap::new(); for a in assignments { @@ -1025,8 +1108,12 @@ impl Omnigraph { } else { crate::db::MutationOpKind::Insert }; + // Node inserts are non-strict (Insert/Merge), so with a `WriteTxn` + // this opens NOTHING (collapse #1) — the handle is discarded anyway; + // only `ensure_path`'s captured version (read inside + // `open_table_for_mutation`) is used downstream. let (_ds, _full_path, _table_branch) = - open_table_for_mutation(self, staging, branch, &table_key, insert_kind).await?; + open_table_for_mutation(self, staging, branch, &table_key, insert_kind, txn).await?; // Accumulate. @key inserts go into the Merge stream (so a // later update on the same id coalesces correctly); no-key // inserts go into the Append stream. @@ -1059,13 +1146,16 @@ impl Omnigraph { )?; } let table_key = format!("edge:{}", type_name); - // Capture pre-write metadata on first touch (no Lance write). - let (ds, _full_path, _table_branch) = open_table_for_mutation( + // Capture pre-write metadata on first touch. Edge inserts are + // non-strict, so with a `WriteTxn` this opens NOTHING (collapse #1) + // and returns `None`. + let (handle, _full_path, _table_branch) = open_table_for_mutation( self, staging, branch, &table_key, crate::db::MutationOpKind::Insert, + txn, ) .await?; // Accumulate the new edge row. Edge IDs are ULID-generated so @@ -1075,9 +1165,27 @@ impl Omnigraph { // Edge cardinality validation: scan committed edges via Lance // + iterate pending edges in-memory for the `src` column, // group-by-src. The pending side already includes the row - // we just appended (above). - validate_edge_cardinality_with_pending(self, &ds, staging, &table_key, edge_type) + // we just appended (above). When the open was skipped (collapse + // #1), resolve a read handle for the committed scan at LIVE HEAD + // (`edge_cardinality_read_handle`, #298) — NOT the pinned txn.base, + // which would undercount edges a concurrent writer committed since + // capture. Only when cardinality is non-default, so the common + // default-cardinality edge keeps the open-free path. (The residual + // validate→commit race is the §7.1 gap — step 4.) + if !edge_type.cardinality.is_default() { + let committed_ds = match handle { + Some(h) => h, + None => self.edge_cardinality_read_handle(txn, &table_key).await?, + }; + validate_edge_cardinality_with_pending( + self, + &committed_ds, + staging, + &table_key, + edge_type, + ) .await?; + } self.invalidate_graph_index().await; @@ -1098,6 +1206,7 @@ impl Omnigraph { params: &ParamMap, branch: Option<&str>, staging: &mut MutationStaging, + txn: Option<&crate::db::WriteTxn>, ) -> Result { // Defense in depth: ensure this is a node type if !self.catalog().node_types.contains_key(type_name) { @@ -1122,14 +1231,18 @@ impl Omnigraph { let blob_props = self.catalog().node_types[type_name].blob_properties.clone(); let table_key = format!("node:{}", type_name); - let (ds, _full_path, _table_branch) = open_table_for_mutation( + let (handle, _full_path, _table_branch) = open_table_for_mutation( self, staging, branch, &table_key, crate::db::MutationOpKind::Update, + txn, ) .await?; + // Update is a STRICT op, so collapse #1 never skips its open — the + // handle is always `Some` (and it's needed for the committed scan below). + let ds = handle.expect("strict Update op always opens its dataset"); // Scan committed via Lance + apply the same predicate to pending // batches via DataFusion `MemTable` (read-your-writes for prior @@ -1228,13 +1341,14 @@ impl Omnigraph { params: &ParamMap, branch: Option<&str>, staging: &mut MutationStaging, + txn: Option<&crate::db::WriteTxn>, ) -> Result { let is_node = self.catalog().node_types.contains_key(type_name); if is_node { - self.execute_delete_node(type_name, predicate, params, branch, staging) + self.execute_delete_node(type_name, predicate, params, branch, staging, txn) .await } else { - self.execute_delete_edge(type_name, predicate, params, branch, staging) + self.execute_delete_edge(type_name, predicate, params, branch, staging, txn) .await } } @@ -1246,18 +1360,22 @@ impl Omnigraph { params: &ParamMap, branch: Option<&str>, staging: &mut MutationStaging, + txn: Option<&crate::db::WriteTxn>, ) -> Result { let pred_sql = predicate_to_sql(predicate, params, false)?; let table_key = format!("node:{}", type_name); - let (ds, full_path, table_branch) = open_table_for_mutation( + let (handle, full_path, table_branch) = open_table_for_mutation( self, staging, branch, &table_key, crate::db::MutationOpKind::Delete, + txn, ) .await?; + // Delete is a STRICT op, so collapse #1 never skips its open. + let ds = handle.expect("strict Delete op always opens its dataset"); let initial_version = ds.version(); // Scan matching IDs for cascade. Per D₂ this never overlaps with @@ -1347,14 +1465,17 @@ impl Omnigraph { let edge_table_key = format!("edge:{}", edge_name); let cascade_filter = cascade_filters.join(" OR "); - let (edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation( + let (edge_handle, edge_full_path, edge_table_branch) = open_table_for_mutation( self, staging, branch, &edge_table_key, crate::db::MutationOpKind::Delete, + txn, ) .await?; + // Delete is a STRICT op, so collapse #1 never skips its open. + let edge_ds = edge_handle.expect("strict Delete op always opens its dataset"); let (_new_edge_ds, edge_delete) = self .storage_inline_residual() @@ -1391,18 +1512,22 @@ impl Omnigraph { params: &ParamMap, branch: Option<&str>, staging: &mut MutationStaging, + txn: Option<&crate::db::WriteTxn>, ) -> Result { let pred_sql = predicate_to_sql(predicate, params, true)?; let table_key = format!("edge:{}", type_name); - let (ds, full_path, table_branch) = open_table_for_mutation( + let (handle, full_path, table_branch) = open_table_for_mutation( self, staging, branch, &table_key, crate::db::MutationOpKind::Delete, + txn, ) .await?; + // Delete is a STRICT op, so collapse #1 never skips its open. + let ds = handle.expect("strict Delete op always opens its dataset"); let (_new_ds, delete_state) = self .storage_inline_residual() diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 31d5ce8..7760c95 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -440,6 +440,26 @@ struct StagedTableEntry { staged_write: StagedHandle, } +/// Output of [`StagedMutation::commit_all`] (Phase B): the publisher's input plus +/// the queue guards the caller must hold across the manifest publish. +pub(crate) struct CommittedMutation { + /// Per-table updates to publish to the manifest. + pub(crate) updates: Vec, + /// Per-table manifest pins refreshed under the write queue — the publisher's CAS fence. + pub(crate) expected_versions: HashMap, + /// Recovery sidecar to delete after Phase C succeeds (`None` when nothing staged). + pub(crate) sidecar_handle: Option, + /// Per-`(table, branch)` write-queue guards — the caller MUST hold these across + /// the manifest publish (see `commit_all`) so no writer interleaves between + /// `commit_staged` and the publish. + pub(crate) guards: Vec>, + /// Post-`commit_staged` handle per STAGED table (table_key → handle at the + /// just-committed version). Carried out (RFC-013 step 3b, collapse #4) so the + /// publish-prepare index build reuses it instead of a fresh `reopen_for_mutation` + /// at the same version. Inline-committed / delete tables are absent (no staged handle). + pub(crate) committed_handles: HashMap, +} + impl StagedMutation { /// **Phase B** of the two-phase commit: acquire per-`(table_key, /// branch)` queues, revalidate manifest pins, write the recovery @@ -485,12 +505,8 @@ impl StagedMutation { Vec<(String, Option)>, Vec>, )>, - ) -> Result<( - Vec, - HashMap, - Option, - Vec>, - )> { + txn: Option<&crate::db::WriteTxn>, + ) -> Result { let StagedMutation { inline_committed, mut staged, @@ -585,7 +601,18 @@ impl StagedMutation { // Multi-coordinator deployments (§VI.27 aspirational) get // genuine cross-process drift detection from this read for // free. - let snapshot = db.fresh_snapshot_for_branch(branch).await?; + // + // This MUST be a FRESH per-branch manifest read (never the warm + // cache) for the OCC re-capture below — but with a `WriteTxn` the + // schema contract was already validated at capture, so use the + // `_unchecked` variant, which drops the redundant + // `ensure_schema_state_valid` AND the commit-graph load the OCC read + // never consults (a fresh manifest read yields the same `Snapshot`). + // Without a txn this is byte-identical to the prior checked call. + let snapshot = match txn { + Some(_) => db.fresh_snapshot_for_branch_unchecked(branch).await?, + None => db.fresh_snapshot_for_branch(branch).await?, + }; for entry in staged.iter_mut() { let current = snapshot .entry(&entry.table_key) @@ -619,15 +646,20 @@ impl StagedMutation { // live Lance HEAD still equals that manifest pin. If an external // raw Lance write or a pre-fix maintenance path moved HEAD without // publishing `__manifest`, this write must not silently fold it. - let head = db - .storage() - .open_dataset_head_for_write( - &entry.table_key, - &entry.path.full_path, - entry.path.table_branch.as_deref(), - ) - .await? - .version(); + // + // `latest_version_id` reads the latest manifest pointer off the + // already-open staged handle (the #2 staging open) WITHOUT a fresh + // `Dataset::open` — the same cheap live-HEAD probe + // `ManifestCoordinator::probe_latest_version` uses. This replaces a + // redundant `open_dataset_head_for_write` (RFC-013 step 3b, collapse + // #3): the drift comparison below is byte-identical; only how `head` + // is obtained changes (probe vs cold open). + let head = entry + .dataset + .dataset() + .latest_version_id() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; if head < current { return Err(OmniError::manifest_internal(format!( "table '{}' Lance HEAD version {} is behind manifest version {}", @@ -786,6 +818,12 @@ impl StagedMutation { let mut updates: Vec = inline_committed.into_values().collect(); + // Carry each staged table's post-`commit_staged` handle out so the + // publish-prepare index build reuses it (collapse #4) instead of + // re-opening the dataset at the same just-committed version. + let mut committed_handles: HashMap = + HashMap::with_capacity(staged.len()); + for entry in staged { let StagedTableEntry { table_key, @@ -798,15 +836,22 @@ impl StagedMutation { let new_ds = db.storage().commit_staged(dataset, staged_write).await?; let state = db.storage().table_state(&path.full_path, &new_ds).await?; updates.push(SubTableUpdate { - table_key, + table_key: table_key.clone(), table_version: state.version, table_branch: path.table_branch.clone(), row_count: state.row_count, version_metadata: state.version_metadata, }); + committed_handles.insert(table_key, new_ds); } - Ok((updates, expected_versions, sidecar_handle, guards)) + Ok(CommittedMutation { + updates, + expected_versions, + sidecar_handle, + guards, + committed_handles, + }) } } diff --git a/crates/omnigraph/src/instrumentation.rs b/crates/omnigraph/src/instrumentation.rs index de5b7d3..9718686 100644 --- a/crates/omnigraph/src/instrumentation.rs +++ b/crates/omnigraph/src/instrumentation.rs @@ -43,6 +43,23 @@ pub struct QueryIoProbes { /// handle cache (Fix 3) serves them. pub table_wrapper: Option>, pub probe_count: Arc, + /// Counts DATA-table open CALLS through the two instrumented chokepoints + /// (`open_dataset_tracked` / `open_table_dataset`), classified by URI so the + /// internal/system tables (`__manifest`, `_graph_commits*`) are EXCLUDED — the + /// publisher CAS and commit-graph append open those every write, and counting + /// them would make the `data_open_count <= |touched_tables|` write gate + /// (RFC-013 step 3b) unreachable by threading alone. Unlike the opener-read + /// term (which mixes with the merge-insert/RI scan on the write path), this is + /// an exact open-invocation count. `forbidden_apis` keeps engine code OUTSIDE the + /// storage layer (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) from opening + /// datasets except through these chokepoints, so the count is complete for the + /// keyed-write data path the gate measures. (`table_store.rs` is allow-listed and + /// does hold direct `Dataset::open`s — but only for branch-management ops + /// (`delete_branch`/`list_branches`/`force_delete_branch`), never that hot path.) + pub data_open_count: Arc, + /// Internal/system-table (`__manifest`, `_graph_commits*`) open CALLS — the + /// complement of `data_open_count`, kept for symmetry and debugging. + pub internal_open_count: Arc, } tokio::task_local! { @@ -80,6 +97,39 @@ pub(crate) fn record_probe() { let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed)); } +/// Internal/system table directory names. An open of one of these is a metadata +/// open (publisher CAS, commit-graph append, recovery audit), NOT a data-table +/// open. Kept in sync with the dir constants in `db/manifest/layout.rs`, +/// `db/commit_graph.rs`, and `db/recovery_audit.rs`. +const INTERNAL_TABLE_DIRS: [&str; 4] = [ + "__manifest", + "_graph_commits.lance", + "_graph_commit_actors.lance", + "_graph_commit_recoveries.lance", +]; + +/// True when `uri`'s last path segment names an internal/system table. +fn open_is_internal(uri: &str) -> bool { + let trimmed = uri.trim_end_matches('/'); + let last = trimmed.rsplit('/').next().unwrap_or(trimmed); + INTERNAL_TABLE_DIRS.contains(&last) +} + +/// Record one table-open call against the active per-query probes, classified by +/// table class (the URI's last segment) so the write gate counts DATA-table opens +/// only and ignores the publisher/commit-graph metadata opens. No-op in production +/// (the classification runs only inside the probe closure, which `current` skips +/// when no probes are installed). Called at both open chokepoints. +pub(crate) fn record_open(uri: &str) { + let _ = current(|p| { + if open_is_internal(uri) { + p.internal_open_count.fetch_add(1, Ordering::Relaxed); + } else { + p.data_open_count.fetch_add(1, Ordering::Relaxed); + } + }); +} + /// Per-operation staged-write counts, installed for a task via /// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write /// primitive an operation invokes — e.g. that an append-only fast-forward merge @@ -177,6 +227,7 @@ pub(crate) async fn open_dataset_tracked( uri: &str, wrapper: Option>, ) -> Result { + record_open(uri); let result = match wrapper { None => Dataset::open(uri).await, Some(wrapper) => { @@ -203,6 +254,7 @@ pub(crate) async fn open_table_dataset( version: u64, session: Option<&Arc>, ) -> Result { + record_open(location); let mut builder = DatasetBuilder::from_uri(location).with_version(version); if let Some(session) = session { builder = builder.with_session(session.clone()); diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 2365243..f19c94d 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -187,7 +187,10 @@ impl Omnigraph { &omnigraph_policy::ResourceScope::Branch(branch.to_string()), actor_id, )?; - self.ensure_schema_state_valid().await?; + // Schema-contract validation is captured ONCE per write via the + // `WriteTxn` opened in `load_jsonl_reader` (after branch resolution). + // The redundant `ensure_schema_state_valid` that used to run here is + // subsumed by `open_write_txn`'s `resolved_branch_target` call. // Converge any pending recovery sidecar (a previously failed // writer's Phase B → Phase C residual) before staging anything: // without this, sidecar-covered drift wedges every load on the @@ -397,7 +400,16 @@ async fn load_jsonl_reader( // inline path. let mut result = LoadResult::default(); - let snapshot = db.snapshot_for_branch(branch).await?; + // Capture-once write transaction (RFC-013 step 3b). `open_write_txn` + // validates the schema contract ONCE and pins the base snapshot. Threaded + // as `Some(&txn)` through the per-table opens and the manifest publish so + // each resolve point reuses the pinned base instead of re-validating the + // contract. The branch already exists here (fork-if-missing ran in + // `load_as` before this), so this captures the post-fork snapshot. The + // load's own base read (`db.snapshot_for_branch` previously) is the same + // per-branch snapshot, so reuse `txn.base` for it — dropping a validation. + let txn = db.open_write_txn(branch).await?; + let snapshot = txn.base.clone(); let mut staging = MutationStaging::default(); let pending_mode = match mode { LoadMode::Merge => PendingMode::Merge, @@ -481,15 +493,18 @@ async fn load_jsonl_reader( // Phase 2b: accumulate every node type in memory. Fragment writes are // delayed until after all validation succeeds. for (type_name, table_key, batch, loaded_count) in prepared_nodes { - let (ds, full_path, table_branch) = db - .open_for_mutation_on_branch(branch, &table_key, load_op_kind) + // The loader only needs the captured expected version (the publisher's + // CAS fence) for `ensure_path` — it discards the handle. With a + // non-strict load op (Merge/Append) and a `WriteTxn`, collapse #1 skips + // the dataset open and returns the pinned base version directly. + let opened = db + .open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn)) .await?; - let expected_version = ds.version(); staging.ensure_path( &table_key, - full_path, - table_branch, - expected_version, + opened.full_path, + opened.table_branch, + opened.expected_version, load_op_kind, ); let schema = batch.schema(); @@ -553,15 +568,16 @@ async fn load_jsonl_reader( // Phase 2e: accumulate every edge type. Same dispatch as Phase 2b. for (edge_name, table_key, batch, loaded_count) in prepared_edges { - let (ds, full_path, table_branch) = db - .open_for_mutation_on_branch(branch, &table_key, load_op_kind) + // Same as the node phase: only the captured expected version is used; + // collapse #1 skips the open for a non-strict load op under a `WriteTxn`. + let opened = db + .open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn)) .await?; - let expected_version = ds.version(); staging.ensure_path( &table_key, - full_path, - table_branch, - expected_version, + opened.full_path, + opened.table_branch, + opened.expected_version, load_op_kind, ); let schema = batch.schema(); @@ -589,13 +605,20 @@ async fn load_jsonl_reader( // `_queue_guards` holds per-(table_key, branch) write queues // across the manifest publish below — see exec/mutation.rs for // the rationale (interleaving prevention). - let (updates, expected_versions, sidecar_handle, _queue_guards) = staged + let crate::exec::staging::CommittedMutation { + updates, + expected_versions, + sidecar_handle, + guards: _queue_guards, + committed_handles, + } = staged .commit_all( db, branch, crate::db::manifest::SidecarKind::Load, actor_id, fork_queue_guards, + Some(&txn), ) .await?; // Same finalize → publisher residual as mutations: per-table @@ -603,8 +626,15 @@ async fn load_jsonl_reader( // publish has not run yet. Reuse the mutation failpoint name so // one failpoint pins the shared `MutationStaging` boundary. crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?; - db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id) - .await?; + db.commit_updates_on_branch_with_expected( + branch, + &updates, + &expected_versions, + actor_id, + Some(&txn), + committed_handles, + ) + .await?; // The recovery sidecar protects the per-table commit_staged → // manifest publish window. Phase C succeeded — clean up // best-effort: failing the user here would error out a write diff --git a/crates/omnigraph/tests/helpers/cost.rs b/crates/omnigraph/tests/helpers/cost.rs index 2114f23..9c82229 100644 --- a/crates/omnigraph/tests/helpers/cost.rs +++ b/crates/omnigraph/tests/helpers/cost.rs @@ -58,6 +58,14 @@ pub struct IoCounts { pub commit_graph_reads: u64, /// Version-probe invocations (the cheap freshness check). pub version_probes: u64, + /// DATA-table open CALL count through the two instrumented chokepoints — an + /// exact open-invocation count (not the opener-read term), classified by URI so + /// internal/system-table opens are excluded. Step-3b target: + /// `data_open_count <= |touched_tables|` for a write. + pub data_open_count: u64, + /// Internal/system-table (`__manifest`, `_graph_commits*`) open CALL count — + /// the complement of `data_open_count` (publisher CAS + commit-graph append). + pub internal_open_count: u64, } impl IoCounts { @@ -225,6 +233,8 @@ struct ProbeHandles { commit_graph: IOTracker, table: PrefixCounter, probe_count: Arc, + data_open_count: Arc, + internal_open_count: Arc, } impl ProbeHandles { @@ -234,6 +244,8 @@ impl ProbeHandles { commit_graph: IOTracker::default(), table: PrefixCounter::default(), probe_count: Arc::new(AtomicU64::new(0)), + data_open_count: Arc::new(AtomicU64::new(0)), + internal_open_count: Arc::new(AtomicU64::new(0)), }; let probes = QueryIoProbes { manifest_wrapper: Some(Arc::new(h.manifest.clone()) as Arc), @@ -242,6 +254,8 @@ impl ProbeHandles { ), table_wrapper: Some(Arc::new(h.table.clone()) as Arc), probe_count: Arc::clone(&h.probe_count), + data_open_count: Arc::clone(&h.data_open_count), + internal_open_count: Arc::clone(&h.internal_open_count), }; (probes, h) } @@ -256,6 +270,8 @@ impl ProbeHandles { manifest_reads: self.manifest.stats().read_iops, commit_graph_reads: self.commit_graph.stats().read_iops, version_probes: self.probe_count.load(Ordering::Relaxed), + data_open_count: self.data_open_count.load(Ordering::Relaxed), + internal_open_count: self.internal_open_count.load(Ordering::Relaxed), } } } diff --git a/crates/omnigraph/tests/validators.rs b/crates/omnigraph/tests/validators.rs index 4c7a2f3..ce8525d 100644 --- a/crates/omnigraph/tests/validators.rs +++ b/crates/omnigraph/tests/validators.rs @@ -237,6 +237,58 @@ async fn cardinality_rejected_on_mutation_insert_edge() { ); } +/// RFC-013 step 3b regression guard (cursor High / codex P1 on #298): edge `@card` +/// validation must scan LIVE committed HEAD, not the pinned `txn.base`. Collapse #1 +/// skips the edge accumulation open, so a non-strict edge insert under a `WriteTxn` +/// reopens for the cardinality scan — and that scan must observe edges a concurrent +/// writer committed after this mutation captured its base, or a `@card` max is +/// silently exceeded (invariant 9). The residual validate→commit TOCTOU is the §7.1 +/// gap (step 4); this only un-widens what 3b widened (live HEAD vs mutation-start base). +/// +/// Deterministic — no failpoint: handle B's coordinator is stale by construction +/// (the write path does not probe the manifest version, unlike the read path). B MUST +/// NOT read between A's commit and B's insert — a read refreshes B's coordinator and +/// masks the bug (the same caveat as the served stale-view repro in `writes.rs`). +#[tokio::test] +async fn cardinality_rejected_for_stale_handle_after_concurrent_edge_commit() { + let (dir, mut db_a) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await; + let uri = dir.path().to_str().unwrap(); + + // Handle B opens the same graph at the seed version (no edges yet); it then + // never reads again, so its in-memory coordinator stays pinned at the seed. + let mut db_b = Omnigraph::open(uri).await.unwrap(); + + // Handle A commits WorksAt(Alice -> Acme): Alice is now at the @card(0..1) max. + // This advances the on-disk manifest; B's coordinator is now stale. + mutate_main( + &mut db_a, + CARDINALITY_MUTATIONS, + "add_employment", + ¶ms(&[("$person", "Alice"), ("$company", "Acme")]), + ) + .await + .unwrap(); + + // Handle B (stale, never read since A committed) inserts a second WorksAt for + // Alice. B is non-strict + under a WriteTxn, so collapse #1 skips the open and the + // cardinality scan reopens: it MUST read live HEAD (Alice has 1) → reject (1+1 > 1), + // not the stale base (Alice has 0) → which would wrongly pass and commit a 2nd edge. + let err = mutate_main( + &mut db_b, + CARDINALITY_MUTATIONS, + "add_employment", + ¶ms(&[("$person", "Alice"), ("$company", "Beta")]), + ) + .await + .unwrap_err(); + assert!( + err.to_string().to_lowercase().contains("cardinality") + || err.to_string().to_lowercase().contains("@card"), + "a stale-handle edge insert must be rejected by @card against live HEAD, got: {}", + err + ); +} + #[tokio::test] async fn cardinality_rejected_on_jsonl_load() { // Already covered by existing loader Phase 3 logic but assert the diff --git a/crates/omnigraph/tests/write_cost.rs b/crates/omnigraph/tests/write_cost.rs index c7e8528..9420173 100644 --- a/crates/omnigraph/tests/write_cost.rs +++ b/crates/omnigraph/tests/write_cost.rs @@ -24,10 +24,10 @@ mod helpers; use helpers::cost::{ - IoCounts, assert_flat, assert_grows, local_graph, measure_insert, measure_insert_as, + IoCounts, assert_flat, assert_grows, local_graph, measure, measure_insert, measure_insert_as, measure_with_staged, }; -use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, mixed_params}; +use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixed_params}; // ── (A) The internal-table LOCK — the acceptance test for step 2 (compaction) ── // @@ -169,3 +169,86 @@ async fn keyed_insert_routes_through_merge_insert_only() { assert_eq!(staged.stage_append, 0, "keyed insert must not stage_append"); assert_eq!(staged.create_vector_index, 0, "no inline vector-index build on a plain insert"); } + +// ── (D) Step-3b capture-once fitness asserts (RED today → GREEN after WriteTxn) ── + +/// A write must validate the schema contract EXACTLY ONCE (3 `read_text` + 2 `exists`). +/// Today the write path re-validates at every resolve point (entry, per-table +/// `resolved_branch_target`, commit-time `fresh_snapshot_for_branch`), so the delta is +/// a multiple of that. Step 3b's `WriteTxn` validates once and threads it. The shape is +/// the write twin of `warm_read_cost.rs::warm_query_validates_schema_contract_once`, +/// built with ZERO production change via the counting storage adapter. +#[tokio::test] +async fn write_validates_schema_contract_once() { + use omnigraph::instrumentation::CountingStorageAdapter; + use omnigraph::storage::storage_for_uri; + + let dir = tempfile::tempdir().unwrap(); + let _ = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let (adapter, counts) = CountingStorageAdapter::new(storage_for_uri(uri).unwrap()); + let db = omnigraph::db::Omnigraph::open_with_storage(uri, adapter) + .await + .unwrap(); + + let before_read_text = counts.read_text(); + let before_exists = counts.exists(); + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "schema_once")], &[("$age", 30)]), + ) + .await + .unwrap(); + + let read_text_delta = counts.read_text() - before_read_text; + let exists_delta = counts.exists() - before_exists; + eprintln!("schema-contract reads on one write: read_text={read_text_delta} exists={exists_delta}"); + assert_eq!( + read_text_delta, 3, + "a write must validate the schema contract once (3 reads), not N times", + ); + assert_eq!( + exists_delta, 2, + "a write must probe contract-file existence once (2 probes), not N times", + ); +} + +/// A keyed single-table write must open its DATA table AT MOST ONCE. Today it opens +/// ~4× (accumulation, staging, commit drift-guard, publish-prepare/index-build), each +/// a fresh cold `Dataset::open`. Step 3b opens the base once (a *session-aware* base +/// open is deferred to step 5), threads the commit-return handle, and replaces the +/// drift-guard open with a cheap `latest_version_id` probe — collapsing to 1 open. +/// Counted by `data_open_count`, the +/// table-class-scoped chokepoint probe: the internal-table opens (publisher CAS + +/// commit-graph append) are EXCLUDED, since they are unrelated to data-table reuse and +/// would otherwise keep this count >1 regardless of threading. (`forbidden_apis` keeps +/// engine code outside the storage layer from opening datasets except through the +/// instrumented chokepoints — `table_store.rs`'s own direct opens are branch-management +/// ops, not this keyed-write path.) +#[tokio::test] +async fn keyed_insert_opens_table_at_most_once() { + let dir = tempfile::tempdir().unwrap(); + let mut db = local_graph(&dir).await; + let io = { + let (res, io) = measure(db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "opens")], &[("$age", 30)]), + )) + .await; + res.unwrap(); + io + }; + eprintln!( + "data_open_count={} internal_open_count={} for a single-table keyed insert", + io.data_open_count, io.internal_open_count + ); + assert!( + io.data_open_count <= 1, + "a keyed single-table write must open its data table at most once, got {}", + io.data_open_count, + ); +} diff --git a/docs/dev/handoff-rfc-013-write-path.md b/docs/dev/handoff-rfc-013-write-path.md new file mode 100644 index 0000000..c606047 --- /dev/null +++ b/docs/dev/handoff-rfc-013-write-path.md @@ -0,0 +1,430 @@ +# Handoff: finishing RFC-013 (write-path latency + correctness) + +**Status:** living handoff. **Source of truth is [`rfc-013-write-path-latency.md`](rfc-013-write-path-latency.md)** — +this doc is the *current-state map + the decisions/validation from the latest work cycle ++ the concrete next actions*. When they disagree, the RFC wins (and fix this doc). + +**Audience:** the engineer/agent who picks up RFC-013 next. + +--- + +## 0. TL;DR — where we are and what's next + +RFC-013 makes the write path fast **and** correct on object storage (217 Lance tables +under one `__manifest` catalog, on R2/S3). It is sequenced as steps; read §9 of the RFC +for the canonical list. Current reality: + +**Landed on `main`:** +- **Step 1** — Tier-1 cost gate + the shared `helpers::cost` harness (#288). +- **Step 3a** — opener bypass: write opens go direct (`Dataset::open` by URI + version) + instead of the Lance-namespace builder (#288). **This already banked the dominant + depth win** — see §2 below; it reframes everything. +- **Step 2a** — internal-table compaction: `optimize` now compacts `__manifest` / + `_graph_commits` / `_graph_commit_actors` (#291). Plus the RFC latency-model + correction (#292). +- **Optimize-vs-write race** — optimize survives a cross-process write race on the + same table (#297, **LANDED** — origin/main `6d4606a8`; see §6 for why it's not + redundant with Design A). Step 3b stacks on top of this. + +**Open PRs (land these; relationships in §7):** +- **#296** `correctness-by-design-fix` — recovery roll-forward converges on a concurrent + manifest advance (this is the fix for the flaky `iss-schema-apply-reopen-recovery-race`). +- **#295** `docs/rfc-013-step-3b` — the step-3b RFC doc. +- **#254** `ragnorc/bug-4-schema-apply-occ` — schema-apply vs optimize false-fail + (same op-class family as #297, logical side). + +**Step 3b is DONE** (capture-once `WriteTxn`, schema-once + open-collapse; see §4) on +`rfc-013-step-3b-writetxn-v2`. **Next: Phase 7 (step 4), then the big one — Design A / +`PublishPlan` unification (step 5)** — see §5, the convergent fix for the bug *class* this +area keeps generating, which also absorbs 3b's deferred session-aware write opens. + +--- + +## 1. The corrected mental model (read this before touching anything) + +Three reframes from the latest cycle that the older RFC prose may not fully reflect: + +### 1a. 3a already won the depth fight → the residual is constant-factor + RTT +Before 3a, the write re-opened each table through the lance-namespace builder ~13×, and +that path was **O(depth)** (it re-opened `__manifest` + `list_table_versions` per open — +**not** a Lance back-walk; the root cause was OmniGraph's own namespace round-trips, not +Lance — validated against Lance source). 3a swapped it for the direct opener, which is +**O(1)** (`from_uri(loc).with_version(N)` = arithmetic path + one HEAD). So: + +- The dominant **O(depth) data-table** term is **gone**. +- Step 2a flattened the secondary **internal-table** scan term. +- What remains is the **~110-hop serial backbone × RTT + compute** — a constant in + depth. The latency model is **`wall = (serial_hops + ops/effective_concurrency)·RTT + + compute`**; on a capped store (R2) the op-count term re-enters wall-clock, on an + unlimited store it parallelizes away. Measured: prod one-row write 27→15.76s after + 2a; the remaining 15.76s is the serial backbone — **step 3b's target**, not step 2's. +- Step 3b's win is therefore the **call-count/RTT collapse** (redundant opens, the + flat-46 schema reads), NOT a depth slope. Don't expect a depth-slope improvement from + 3b; gate it on the constant-factor (S3 round-trips), not a curve. + +### 1b. Two op classes, two commit models (the §6.6 principle) +Every concurrency bug in this area is **one op class using the other's commit model**: + +| class | examples | commutes? | correct commit model | +|---|---|---|---| +| **maintenance** | compaction (`Rewrite`), `optimize_indices` | yes (content-preserving) | Lance native rebase + app reopen/replan on real overlap + **monotonic manifest fast-forward** — no epoch, no read-set | +| **logical mutation** | load / mutate / merge / delete | no (lost-update, write-skew) | strict cross-process OCC: read-set + write-set CAS under the `writer_epoch` fence | + +Applying strict OCC + equality-CAS uniformly is the mistake: too strong for maintenance +(false conflicts — #297's bug), too weak for logical cross-process (§6.5 corruption). + +### 1c. The root liability (what keeps generating these bugs) +Lance gives **per-table atomic commits** but **no cross-table/cross-step atomicity**, so +every multi-commit op advances per-table Lance HEAD **before** the manifest references it +(the "A-before-B window"). The resulting `HEAD vs manifest` delta is **ambiguous** +(external drift? my own in-flight work? a crashed writer?), and **many uncoordinated code +paths each re-interpret it** (4 writers + the maintenance path + recovery + the write-path +drift guard). Each interpreter is a fresh chance to misclassify. That is the bug class: +- §6.5 cross-process logical corruption, +- #297's own-HEAD-drift misclassification, +- the flaky write-path "HEAD ahead of manifest, run repair" guard, +- the recovery classifier edges. + +**The convergent fix is Design A (one publish authority — step 5); Lance MTT eventually +retires the window entirely.** See §5. + +### 1d. The second facet: the write base is a stale pin (no probe) +The READ path resolves its base behind a freshness probe (`resolve_target_inner` +omnigraph.rs:~1072 → `probe_latest_incarnation` → `refresh_manifest_only`); the WRITE path +does NOT (`resolved_branch_target` omnigraph.rs:~778 returns the warm `coord.snapshot()` for +the bound branch, no probe). So a long-lived server's write base lags the live manifest. That +single staleness feeds **two distinct failure modes**, both surfaced this cycle: + +1. **Stale validation *reads* → integrity under-enforced.** Write-path RI checks read + committed state off the stale base. 3b's collapse #1 made it worse for edge `@card`: + `edge_cardinality_read_handle` (mutation.rs:~614) scans the pinned `txn.base` instead of + live HEAD (was live HEAD pre-3b), so a concurrent edge committed after `txn` capture is + uncounted → a `@card` max can be exceeded (cursor **High** / codex **P1** on #298, + **VALID**). **#298 fix: restore the live-HEAD read for that scan** (un-regress; gate-safe — + the `data_open_count` gate is a node insert) + a deterministic regression test (commit A's + edge, then B validates → must see A) + correct the wrong "pinned base == live HEAD" doc + comment (mutation.rs:~605-613, which assumes a single writer). The *structural* liability + underneath: there is **no unified write-validation read-set** — endpoint + (`ensure_node_id_exists`, warm `snapshot_for_branch`), cardinality (mutation: pinned + `txn.base`; loader: warm `snapshot_for_branch` — the SAME check forks per write path), + commit drift guard (live `fresh_snapshot_for_branch`), and uniqueness + (`enforce_unique_constraints_intra_batch`, intra-batch only — cross-version uniqueness is a + documented gap). Three freshness levels chosen ad hoc, none re-validated at commit → the + §7.1 TOCTOU class, and each new constraint forks the pattern again. + +2. **Stale OCC *pin* → false-fail on a maintenance advance.** A served strict update/delete + pins the stale base version, then false-fails `ExpectedVersionMismatch` after an external + `optimize` advanced `__manifest` — even though the advance was content-preserving + compaction the logical write should fast-forward past (invariant 7). It's the **write-side + mirror of #297/§6.6** (#297 made optimize fast-forward past a logical write; this is a + logical write that must fast-forward past optimize). A served read clears it (the read + probes the shared coordinator). Validated repro on prod (omnigraph.ragnor.co) + + `writes.rs::served_strict_delete_after_external_optimize_advance_auto_refreshes` + (`#[ignore]` on branch `fix/write-path-stale-view-probe`). **The naive "just probe" fix is + proven wrong** — a blanket probe silently refreshes past *logical* advances too, breaking + `consistency::stale_handle_public_mutation_must_refresh_then_retry` (the deliberate + cross-process lost-update OCC primitive). The fix must **discriminate by op class**. + +**Both fold into Design A (step 5), same as §1c.** `open_txn`'s one warm probe makes the base +fresh (absorbs maintenance advances cheaply); the **op-class-aware strict precondition** — +derive from Lance's per-version transaction metadata (all `Rewrite`/`ReserveFragments` = +maintenance → fast-forward the pin; any `Append`/`Update`/`Delete`/`Merge` = logical → fail +loudly; NO parallel marker, invariant 1/15) — is the correctness fence for anything that lands +after. And the §7.1 read-set-in-CAS unifies the validation read-set + re-validates it under the +`graph_head` contention. So **the stale-view false-fail, the cardinality/validation-read-set +liability, and #297's mirror are one bug** (the write base is a stale, un-probed, un-classified +pin) with **one home: the single PublishPlan delta-interpreter** (§1c + §5). Strong corroboration +of Design A — three symptoms, one fix. + +--- + +## 2. Validated facts — do NOT re-derive these + +Established this cycle against **Lance 7.0.0 source** +(`~/.cargo/registry/src/index.crates.io-*/lance-7.0.0`) and current engine code. Cited so +you can trust them without re-investigating. + +**Lance (upstream):** +- `from_uri(loc).with_version(N).load()` and `checkout_version(N)` are **O(1)** (computed + V2 path `_versions/{u64::MAX-N:020}.manifest` + one HEAD; no listing/back-walk). + (`lance-table/src/io/commit.rs` `default_resolve_version`.) +- A shared `Arc` (`DatasetBuilder::with_session`) warms metadata/index caches + keyed by `(URI, version, e_tag)`. Caveat: the *first* manifest read on open is uncached + — the Session warms the *scan/index* metadata, not the first open. **`WriteParams` *does* + carry a `session` field** (`lance/src/dataset/write.rs`), but it only matters on the + `WriteDestination::Uri` arm; OmniGraph's staged path always drives off an **already-open + `Dataset`**, and Lance takes the store/session from that handle. So to attach the shared + Session to a write base, open read-style (`open_table_dataset` → `from_uri().with_version() + .with_session()`) and drive the staged write off that handle. +- A held `Arc` at a pinned version is `Send + Sync`, immutable, safe to reuse for + many scans/count/staged-write base in one txn (OmniGraph's `TableHandleCache` already + relies on this). +- **No compaction `RetryExecutor`** (only Delete/MergeInsert/Update have one). + `commit_compaction` commits a fixed `Rewrite` via `apply_commit` direct. In + `commit_transaction`, a semantic `RetryableCommitConflict` **escapes the retry loop** + via `?` at `io/commit.rs:979`; the loop only retries the OCC `CommitConflict` + (`:1096`), and even that re-rebases the *same* transaction (never re-plans). ⇒ + **compaction needs app-level reopen+REPLAN; you cannot "set conflict_retries" and let + Lance own it.** +- `check_rewrite_txn`: a `Rewrite` rebases **cleanly** past a concurrent `Append`/disjoint + `Update`/`Delete` (preserving both); only a same-fragment overlap yields a retryable + conflict. ⇒ the common concurrent insert/update/delete is rebased for free; the app + retry fires only on real overlap. + +**Engine (internal):** +- Read path (post-#268) already has the capture-once machinery: `Snapshot` (`db/manifest.rs`), + warm `GraphCoordinator` behind a `latest_version_id`/incarnation probe, a held + `TableHandleCache` keyed `(table,branch,version,e_tag)`, **one shared `Session` per + graph** (`read_caches.session`). **Writes bypass all of it by construction** + (`resolved_branch_target` returns `read_caches: None`; the 3a write opener attaches no + session and opens by latest, not pinned version). +- A single write opens each table **3–4×** (accumulation → staging reopen → commit + drift-guard → publish prepare), each a fresh cold open. `validate_schema_contract` + (`db/schema_state.rs`, via `ensure_schema_state_valid`) runs uncached (~3 `read_text` + + 2 `exists`) at every resolve point (~the flat-46). Both are constant-factor, flat in + depth — 3b's targets. +- Strict-op guards are the lost-update floor (3 layers: pre-stage `ensure_expected_version` + `table_store.rs`; commit-time strict drift `exec/staging.rs`; publisher CAS + `publisher.rs`). Capture-once **supplies** the pinned operand — never remove a guard. +- Fork-on-first-write authority reads (`classify_fork_ref` → `fresh_snapshot_for_branch`) + must stay **fresh** (not served from a pinned base). +- Cost harness: `helpers::cost` (`measure`/`measure_with_staged`/`IoCounts`/`assert_flat`/ + `local_graph`/`s3_graph`). The schema-once assert can reuse `CountingStorageAdapter` + (`warm_read_cost.rs::warm_query_validates_schema_contract_once`) with **zero** prod + change; an open-count assert wants a small `open_count` AtomicU64 in `QueryIoProbes` + (copy the `probe_count`/`record_probe` pattern). The forbidden-API guard + (`tests/forbidden_apis.rs`) makes an instrumentation-level counter complete. + +--- + +## 3. The #297 cycle (this branch) — what it is, and the lesson + +`fix-optimize-concurrency-race` (5 commits): a CLI `optimize` racing a served write on the +same table failed (Lance Rewrite lost, or the equality-CAS publish lost). Fix: unify both +compaction paths on the internal path's **reopen+replan** shape, with a **two-level retry** +— outer loop reopens+replans on a real Lance overlap; inner Phase-C loop makes the manifest +publish a **monotonic fast-forward** (advance to compacted version `N`, or no-op when the +manifest already moved to `≥ N`), never the strict equality CAS. Sidecar written once; +in-process queue kept as a contention reducer (not the cross-process guard); no `writer_epoch`. + +**Two review rounds surfaced two follow-on bugs I introduced with the retry loop** — both +fixed, both regression-tested (own-HEAD-drift via negative control): +1. **Own-HEAD-drift misclassification** (`56d004e0`): the drift guard re-ran every + iteration and, after a partial Phase-B commit (auto_cleanup strip or compact, then a + later op conflicts), saw `HEAD > manifest` from *our own* covered work and deleted the + sidecar + returned `skipped_for_drift` (stranding uncovered drift). Fix: track + `head_advanced`; the drift guard fires only when `!head_advanced`. +2. **Publish exhaustion spurious error** (`e9d16a2c`): the publish loop returned `Err` on + its final retry even if the conflict meant a concurrent writer already published `≥ N` + (postcondition met). Fix: re-check `current >= state.version` on exhaustion. + +**The lesson (write it on the wall):** *wrapping a sequence of side-effecting commits in a +retry silently converts every "checked once, before any side effect" precondition into +"re-checked after partial side effects."* That's a distinct bug class; it needs +fault-injection tests **at each commit boundary**, not just end-to-end concurrency tests. +(The `optimize.before_compact` / `optimize.inject_reindex_conflict` failpoints exist for +exactly this.) + +**Temporary mechanism flag:** `head_advanced` is an in-memory proxy for "is this HEAD +movement mine." Under Design A the authority answers that from the plan/sidecar **identity** +— so `head_advanced` is the part that gets *replaced*, while the monotonic-publish + +reopen/replan **semantics** are permanent. (Noted in RFC §6.6.) + +--- + +## 4. DONE: Step 3b — capture-once `WriteTxn` (shipped on `rfc-013-step-3b-writetxn-v2`) + +**Delivered:** on the **table-touch hot path**, a single `mutate`/`load` validates the schema +contract **once** and opens each touched data table **at most once** — a constant-factor/RTT +win (not a depth-slope win; 1a). Two cost gates in `write_cost.rs` lock it (both on a node +insert): `write_validates_schema_contract_once` (3 `read_text` / 2 `exists`, was 12/9) and +`keyed_insert_opens_table_at_most_once` (`data_open_count <= 1`, was 4). The carrier is the +minimal `WriteTxn { branch, base }`, threaded as `Option<&WriteTxn>` (`Some` on the hot +mutate/load path, `None` byte-identical everywhere else); it **converges into** step 5's +`PublishPlan`. + +**Not "once" everywhere (scope, not regression):** edge endpoint / cardinality RI validation +(`ensure_node_id_exists`, the loader's RI + cardinality) still resolves through +`snapshot_for_branch` and re-validates the schema — and reads **warm**, not live. Threading +`txn.base` there to make it "once" would re-introduce the stale-read class the #298 cardinality +fix removed (it now reads live HEAD). Doing schema-once *and* fresh reads for those validations +needs the unified, re-checked read-set — **step 4 §7.1** (§1d). So #298 **un-regresses +cardinality only; it does not close write-validation freshness.** No edge-insert/load schema-once +gate yet (only the node gates above). + +Commits (off merged-#297 main): +- **Stage 0** — scope `open_count` → `data_open_count`/`internal_open_count` by URI class + (the review fix: `open_dataset_tracked` also opens `__manifest`/`_graph_commits`, so the + raw counter conflated them and the gate was unreachable). Re-baselined RED 4. +- **Commit A (schema-once)** — capture `txn` once at entry (the single validation); the 4 + validation sites collapse: S1 (entry `ensure_schema_state_valid`) removed; S3a + (`open_for_mutation_on_branch`) + S3b (`prepare_updates_for_commit`) source `txn.base`; + S4 (`commit_all`) uses new `fresh_snapshot_for_branch_unchecked` (the OCC manifest re-read + minus the schema re-validation). `fresh_snapshot_for_branch{,_unchecked}` now read the + manifest directly via `ManifestCoordinator` (drops a spurious commit-graph `exists` probe; + same `Snapshot`). +- **Commit B (open collapse 4→1)** — #1 accumulation open ELIMINATED (the node path discarded + the handle; read `txn.base.entry().table_version`); #2 staging open KEPT (the one open); + #3 commit drift-guard reads live HEAD via `entry.dataset.dataset().latest_version_id()` (a + cheap manifest-pointer probe off the staged handle, not a fresh open); #4 index build reuses + the `commit_staged` handle threaded through `CommittedMutation`/`prepare_updates_for_commit`. +- **Commit B.1 + cleanup** — named the two positional returns (`OpenedForMutation`, + `CommittedMutation`) + a `debug_assert` pinning the open-skip contract; **removed the + unearned `WriteTxn.session` field** (the collapse uses skip/probe/reuse, not a session). + +**RFC §4.1 corrections — how they resolved:** +1. *Thread the evolving handle, not a version-keyed cache* → realized as collapse #4 (carry + the `commit_staged` handle forward into the index build). +2. *Don't forbid re-resolution* → honored: the commit-time OCC re-read + (`fresh_snapshot_for_branch_unchecked` — fresh manifest, only schema-revalidation dropped) + and the fork-authority reads stay fresh. +3. *Minimal carrier* → `WriteTxn { branch, base }` (even the `session` from the original + sketch was dropped as unearned). + +**Deferred to step 5 (NOT in this PR):** session-aware write base opens. The one remaining +open (#2) stays a HEAD open; warming the shared `Session` across writes is an object-store +(S3) phenomenon invisible on local FS, so it earns its own `write_cost_s3.rs` gate in step 5, +where `txn` becomes the non-optional publish carrier. No new concurrency test was needed here: +#2 stays a HEAD open (no pinned+session base introduced), so the publisher CAS + #3 live-HEAD +probe fences are unchanged (covered by the green `writes.rs`/`consistency.rs`). + +**Guardrails (don't regress):** schema validation is deliberately uncached for drift +detection — collapse to 1 *per write*, never cache across writes on a long-lived handle +(`lifecycle::long_lived_handle_rejects_schema_*`). The commit-time fresh read is OCC +machinery, not redundancy. Keep all 3 strict-op guards. Keep fork-authority reads fresh. +Pin the *correct* branch (server-bound-to-main writing a feature branch falls to a fresh +open). A branch `rfc-013-step-3b-writetxn` exists off an earlier main; rebase onto the +post-#297 main before starting. + +--- + +## 5. Design A — the `PublishPlan` unification (step 5) = the convergent fix + +**This is the real fix for the bug class in §1c.** Collapse the four hand-rolled writers + +the maintenance path into **one `publish(txn, plan)` authority** where the CAS + bounded +retry is **unconditional and unbypassable** (no caller can "hold the queue → skip the CAS"). +Properties: +- **One interpreter of the `HEAD vs manifest` delta** — and "is this my work?" is answered + by the plan/sidecar **identity**, not a re-derived comparison. The own-HEAD-drift bug, the + §6.5 writers, the write-path guard — all close *by construction*. +- **Recovery = the same `PublishPlan` re-applied** — the crash-recovery interpreter and the + live interpreter become the same code (`iss-merge-recovery-partial-rollforward` gone). +- Each `TableAction` commits by its **class** (§1b): `Rewrite` = maintenance (Lance rebase + + reopen/replan + monotonic fast-forward, **no epoch**); load/mutate = logical (strict OCC + + `writer_epoch`). + +**Why it composes with Lance MTT (don't over-build):** +- The **unification itself is convergent** — when MTT lands, it slots *underneath* the same + authority; nothing wasted. Build this. +- The **`writer_epoch`** is the one MTT-redundant piece (MTT's commit-handler lease subsumes + a cross-process fence). Build it *last and minimally*, gated on actually deploying + multi-writer topologies. Per the deny-list, don't reimplement what the substrate will own. + +**Sequencing judgment (this cycle's strongest signal):** the bug density here (this PR alone += 3 review rounds, all "a writer re-interprets the delta") means the current N-writers interim +is high integrated-over-time liability. **Consider pulling the *convergent half* of step 5 +(the single authority + recovery-as-plan) forward — possibly ahead of 3b** — because it stops +the bug class rather than patching instances. #297 + #254 are the *de-risking inputs*: they +validate the maintenance-class and logical-class commit models in isolation first, so Design +A implements a known spec rather than designing under refactor pressure. Do NOT build more +substrate-shaped scaffolding (custom WAL / job queue / second coordination table) to paper +over the window — strictly higher liability than either Design A or waiting for MTT. + +**Deeper-than-A (post-MTT or as Lance exposes uncommitted variants):** all-uncommitted-fragments ++ one manifest commit would shrink the A-before-B window itself, blocked today by Lance not +exposing uncommitted variants for `compact_files` / `optimize_indices` / vector index (#6666 +open; delete #6658 shipped). Track, don't build yet. + +--- + +## 6. Why #297 is still needed even if you do Design A +- Design A **relocates** #297's maintenance-class commit logic into the authority's + `TableAction::Rewrite` path; it does not eliminate it. #297 is the *validated spec + tests*. +- The two regression tests + §6.6 are the **contract** Design A must keep green. +- The prod bug is **live**; Design A is the largest write-path change in the RFC. Don't hold a + correctness fix hostage to a big refactor, and don't do a big refactor under bug-fix urgency. +- Genuinely throwaway under Design A: only the loop's *location* + the `head_advanced` proxy + (~a dozen lines). Everything else relocates or persists. **#297 LANDED.** + +--- + +## 7. Open PRs and their relationships +- **#297** — maintenance-class fix (optimize vs write). **LANDED** (origin/main `6d4606a8`); + step 3b stacks on it. +- **#254** — logical-class fix (schema-apply vs optimize false-fail). Same op-class family; + both are de-risking inputs for Design A's per-class commit models. +- **#296** — recovery roll-forward converges on concurrent manifest advance. This is the fix + for the flaky `iss-schema-apply-reopen-recovery-race` (the handoff in + `handoff-schema-apply-recovery-flake.md`). It touches `recovery.rs` and is *aligned* with + #297's "postcondition is the state, not winning the CAS" principle — reconcile the monotonic + publish with #296's converge helper if #296 lands first. +- **#295** — the step-3b RFC doc (apply §4's three corrections to it). + +--- + +## 8. Remaining RFC steps after 3b (RFC §9 is canonical) +- **#298 follow-up (do on the 3b PR, before merge): the edge-`@card` stale-read regression** + (§1d.1). Restore the live-HEAD cardinality scan, add the deterministic regression test, fix + the wrong doc comment. Small, gate-safe, un-regresses an integrity check (invariant 9). The + residual concurrent TOCTOU is the §7.1 gap (step 4) — un-widen here, don't over-reach. +- **Step 4 / Phase 7** (`iss-991`): lineage into `__manifest` (publish `graph_commit` + + mutable `graph_head:` in the same merge-insert; `_graph_commits` becomes a + projection). Removes the per-write `commit_graph.refresh`; closes the manifest→commit-graph + atomicity + commit-graph-parent-under-concurrency gaps. **Hard prereq: step 2 (done).** + Carries the §7.1 *concurrent* write-skew fix (needs the `graph_head` contention row) — + **frame §7.1 as "unify the entire write-validation read-set" (endpoint + cardinality + + cross-version uniqueness), not merely "add `graph_head`"** (§1d.1): the bespoke + `edge_cardinality_read_handle` and the mutation-vs-loader freshness fork dissolve into one + pinned read-set re-validated under the `graph_head` contention, or the liability survives as + a second special-case. +- **Step 5 / Design A** — §5 above. **Acceptance item: the served-strict-write stale-view + false-fail** (§1d.2) — the op-class-aware precondition + `open_txn` probe. The contract is + two tests passing *together*: un-ignore + `writes.rs::served_strict_delete_after_external_optimize_advance_auto_refreshes` (goes green) + *while* `consistency::stale_handle_public_mutation_must_refresh_then_retry` stays green + (maintenance fast-forwards; logical fails loudly). Self-contained enough to ship standalone + like #297 if prod pain is acute; otherwise fold into the single PublishPlan delta-interpreter. +- **Step 2b** — internal-table cleanup + the Q8 monotonic watermark (a Lance boundary tag). + Deferred: only the secondary version-count/space term, touches the read/open path, and is + MTT-redundant. Land when version-count cost bites. +- **§7.1 sequential write-skew** (`iss-overwrite-orphans-committed-edges`) — inbound-RI + validation on node removal; independent, ships anytime. +- **#20** — the prod per-write `storage.ops` span metric (RFC §5.3), still owed. +- Branch ops: Lance `Clone` for create (`iss-691`). + +--- + +## 9. Gotchas / traps (learned the hard way) +- **In-process queue ≠ cross-process lock.** Any "I hold the queue → skip the retry/CAS" + reasoning is a bug across processes. This is the recurring trap. +- **Monotonic publish must be `≥`-conditional, never "no assertion."** The `__manifest` + merge-insert is unconditional `UpdateAll` keyed on `object_id` (`publisher.rs:379`), so + the equality (or monotonic) pre-check is the *only* guard — dropping it lets `UpdateAll` + regress a newer version = lost write. +- **The drift guard interprets an ambiguous delta.** Re-evaluating it in a retry over + self-mutated state is how #297's follow-on bug happened. Gate any HEAD-vs-manifest + interpretation on "have *we* committed yet." +- **`compact_files` fires Lance's auto_cleanup GC hook** (commits with + `skip_auto_cleanup=false`, no override) — optimize strips stale `lance.auto_cleanup.*` + config before compacting to stay non-destructive on upgraded graphs. The strip is a + separate commit (relevant to the partial-commit retry trap). +- **Lance rebases the common concurrent case for free** — so the data-table conflict usually + surfaces as the manifest fast-forward, not a Lance error. The Lance-Rewrite-overlap path is + rare and needs failpoint injection to test. + +--- + +## 10. Verification (the gate) +- `cargo test --workspace --locked` — the canonical gate (matches CI). +- `cargo test -p omnigraph-engine --features failpoints --test failpoints optimize` — + the optimize concurrency/recovery tests. +- `cargo test -p omnigraph-engine --test write_cost` / `write_cost_s3` (bucket-gated) — + cost gates (3b adds the schema-once + open-count asserts here). +- `cargo test -p omnigraph-engine --test maintenance` — optimize/repair/cleanup. +- Re-read [`invariants.md`](invariants.md), [`lance.md`](lance.md), [`testing.md`](testing.md) + before each change (always-on requirement). + +Lance source for re-validation: +`/Users/ragnor/.cargo/registry/src/index.crates.io-*/lance-7.0.0` (key files: `io/commit.rs`, +`io/commit/conflict_resolver.rs`, `dataset/optimize.rs`, `dataset/write/retry.rs`, +`dataset/builder.rs`). diff --git a/docs/dev/handoff-schema-apply-recovery-flake.md b/docs/dev/handoff-schema-apply-recovery-flake.md new file mode 100644 index 0000000..e57a745 --- /dev/null +++ b/docs/dev/handoff-schema-apply-recovery-flake.md @@ -0,0 +1,216 @@ +# Handoff: flaky schema-apply → reopen recovery race + +**Type:** bug investigation handoff (not yet fixed) +**Status:** root-caused to a layer + hypothesis; exact mechanism and fix NOT yet validated +**Severity:** medium — flaky CI; a real (rare) schema-apply-then-reopen failure under load +**Scope:** pre-existing on `main`; **independent of** RFC-013 step 2 (internal-table +compaction, PR #291) and step 3a (#288) — those paths never touch schema apply or +the recovery sweep, and the full `--workspace` gate passes clean on a re-run. + +> Do **not** "fix" this by changing the test to use a single handle. That was +> empirically shown to *reduce but not eliminate* the flake (see Experiments), so it +> would mask a real product race. This is a correct-by-design fix in the engine, not +> a test edit. + +--- + +## 1. Symptom + +The test +`crates/omnigraph-server/tests/schema_routes.rs::schema_apply_route_hard_drops_property_with_allow_data_loss` +intermittently fails. The HTTP schema apply **succeeds** (`applied == true`); the +*subsequent* `Omnigraph::open(graph)` (which the test does to verify the catalog) +panics on `.unwrap()` with: + +``` +OmniError::Manifest(Conflict, + "stale view of node:Person: expected manifest version 5 but current is 7", + ExpectedVersionMismatch { expected: 5, actual: 7 }) +``` + +The values (5, 7) vary; the shape is always "recovery roll-forward expected version +N, manifest is at M > N." It is raised from the **open-time recovery sweep**, i.e. +inside `Omnigraph::open`, not from the apply itself. + +--- + +## 2. Reproduction + +- **Needs sibling-test parallelism (CPU contention).** Running the target test + *alone* is rock-solid (0/20 failures). The flake only appears when other tests in + the same binary run concurrently and perturb the timing inside the apply→reopen + sequence. +- Fast repro loop (≈13–40% per run): + ```bash + cargo test -p omnigraph-server --test schema_routes --no-run + for i in $(seq 1 15); do + cargo test -p omnigraph-server --test schema_routes 2>&1 \ + | grep -q "schema_apply_route_hard_drops_property_with_allow_data_loss ... FAILED" \ + && echo "iter $i FAIL" + done + ``` +- It originally surfaced in a full `cargo test --workspace` run (max parallelism). +- Each test uses its own `tempfile::tempdir()`, so this is **not** cross-test shared + state — it's a timing race inside one test's own graph. + +--- + +## 3. Experiments run (the discriminating evidence) + +Each variant was stress-run under the full `schema_routes` suite (parallel siblings): + +| Variant | Flake rate | +|---|---| +| Target test in isolation (no sibling parallelism) | **0/20** | +| **Control** — as written (server handle + out-of-band `Omnigraph::open` load + reopen) | 6/15 ≈ 40% | +| Drop the live server handle (`drop(app)`) before the reopen | 4/15 ≈ 27% | +| Remove the out-of-band separate-handle load | 2/15 ≈ 13% | +| Remove the load **and** drop the server handle (≈ single-handle) | 8/20 ≈ 40% | + +**Interpretation:** +- It is **concurrency-triggered**, not a topology bug: 0% isolated, flaky under + parallel load. +- **No single factor eliminates it.** Removing the out-of-band load roughly halves + the rate (it amplifies the race) but leaves a ~13% base. Dropping the live server + handle does not clearly help. So the "single-handle test" patch is a **band-aid**, + not the fix. +- The residual base rate with the out-of-band load removed means there is a real + race in the **schema-apply → reopen → recovery** path itself. + +Caveat on the experiments: `drop(app)` may not synchronously tear down the server's +engine handle (it can be held by an `Arc`/spawned task), so the "single-handle" +rows are not airtight. This is one of the things to validate (§6). + +--- + +## 4. Root-cause hypothesis (NOT yet proven) + +The failing path is the **open-time recovery sweep's roll-forward** raising +`ExpectedVersionMismatch` from the publisher's `check_expected_table_versions`. + +The hard-drop schema apply (`allow_data_loss=true` → `DropMode::Hard`) is a +**multi-step migration**: it performs several Lance commits + `__manifest` publishes, +advancing `node:Person`'s manifest version across multiple versions (e.g. 5 → … → 7). +To be crash-safe across the Lance-HEAD-before-manifest-publish gap, schema apply +writes a **recovery sidecar** (`__recovery/{ulid}.json`) pinning per-table +`expected_version` / `post_commit_pin` before its Phase B. + +Hypothesis: under CPU contention, the timing of (a) the migration's multi-version +advancement, (b) the sidecar's Phase-D deletion, and (c) a later/over­lapping +`Omnigraph::open` recovery sweep interleaves such that the recovery roll-forward +reads a sidecar whose pinned `expected` is **stale relative to a manifest that +legitimately advanced several versions**, and **re-publishes at the stale `expected` +instead of recognizing the migration already completed** → `expected 5, actual 7`. + +In other words: the recovery classifier / roll-forward likely does not correctly +handle a table whose manifest is **already past `post_commit_pin`** by more than one +step (multi-step migration), or a sidecar whose operation has already fully +committed. The single-step assumption baked into the Optimize-style pin +(`post_commit_pin = expected_version + 1`) may not generalize to multi-commit schema +migrations. + +--- + +## 5. Likely solution (correct-by-design, surgical) + +Make the **open-time recovery classifier idempotent against a manifest that advanced +past the sidecar's pin**: + +- If the table's current manifest/Lance version is already `>= post_commit_pin` + (operation completed, possibly across multiple versions), classify it as + *already-rolled-forward / completed* (the `RolledPastExpected` family) and **delete + the sidecar without republishing** — never attempt a publish at the stale + `expected`. +- Ensure the schema-apply sidecar records a pin that the classifier can interpret for + a **multi-step** migration (a range / "completed at or beyond" semantics), not a + strict single-step `expected + 1`. + +This also hardens *real* crash recovery for multi-step schema apply (not just the +test), and is small + local to `recovery.rs` (+ possibly the schema-apply sidecar +write in `schema_apply.rs`). It does **not** rearchitect recovery. + +Per repo rule 12 (test-first for bug fixes): land a **deterministic** repro first — +ideally a failpoint that forces the interleaving (pause after the migration's commits +but before sidecar delete, then run an open) so the red→green is reliable, not a +stress-loop probability. See the `failpoints.rs` pattern + the schema-apply failpoints +already in the tree. + +--- + +## 6. What MUST be validated before fixing + +1. **Which sidecar is being rolled forward?** Confirm it is the *schema-apply* + sidecar (vs the out-of-band `load`'s sidecar, vs another writer). Instrument / + log the sidecar `operation_id`, `kind`, and `SidecarTablePin` at the point the + recovery sweep raises the error. +2. **The exact classifier path.** Trace which `TableClassification` arm the failing + table hits (`recovery.rs::classify_table`, ~L600) and which roll-forward call + raises `ExpectedVersionMismatch` (`heal_pending_sidecars_roll_forward` ~L761, + `roll_forward_all` ~L1215, `restore`+publish ~L1275). Confirm it is the + multi-step-advanced / already-completed case being mishandled. +3. **Is `post_commit_pin = expected + 1` the bug?** Verify the hard-drop migration + advances `node:Person` by **>1** version, and that the sidecar pins a single-step + `+1`, so the classifier can't recognize completion at +2. +4. **Engine-level reproduction (no server).** Build a deterministic engine-level + repro: persistent handle applies a multi-step hard-drop, then a fresh + `Omnigraph::open` — ideally with a failpoint forcing the interleave — to confirm + the bug is in the engine recovery path and not server-specific (runtime, handle + lifecycle). The current evidence is server-test-only. +5. **Is the out-of-band load *necessary or only amplifying*?** Confirm the ~13% base + rate (load removed) is the same root cause, not a second distinct race. If the + load is required, the bug is specifically about a second writer's version + advancement; if not, it's purely intra-apply. +6. **`drop(app)` cleanliness.** Verify whether the server's engine handle is truly + gone after `drop(app)` (it may be `Arc`-held). If not, the "single-handle" + experiments don't isolate the live-handle factor and should be redone with a + genuinely single-handle setup. + +--- + +## 7. Relationship to Lance MTT + +This bug lives in the **recovery-sidecar roll-forward**, which exists only to bridge +the Lance-HEAD-before-manifest-publish gap in omnigraph's faked multi-table +atomicity. `invariants.md` already calls recovery sidecars "scaffolding to remove +once the substrate closes the gap." Lance **MTT** (native atomic multi-table commits, +RFC §8 / lance#7264) closes that gap → retires the sidecar → **eliminates this bug +class.** + +Implications: +- **Don't wait for MTT** — it is the "strategic exit, not a current dependency," + uncertain and far off, and this bug is live now. +- **Don't over-invest** — keep the fix surgical (classifier idempotency), because the + whole sidecar layer is MTT-disposable. A surgical fix retires cleanly with the + layer; a recovery rearchitecture would be throwaway. + +--- + +## 8. Key pointers + +- Failing test: `crates/omnigraph-server/tests/schema_routes.rs` + → `schema_apply_route_hard_drops_property_with_allow_data_loss` (~L777, + `#[tokio::test(flavor = "multi_thread")]`). +- Error type: `OmniError::Manifest` / `ManifestConflictDetails::ExpectedVersionMismatch` + (`crates/omnigraph/src/error.rs`); raised by `check_expected_table_versions` + (`crates/omnigraph/src/db/manifest/publisher.rs`, ~L356). +- Recovery sweep + classifier: `crates/omnigraph/src/db/manifest/recovery.rs` + — `TableClassification` (~L335), `classify_table` (~L600), roll-forward + (`heal_pending_sidecars_roll_forward` ~L761, `roll_forward_all` ~L1215, restore + + publish ~L1275). +- Schema-apply sidecar write: `crates/omnigraph/src/db/omnigraph/schema_apply.rs` + (the `SidecarKind` schema-apply pins; `db.coordinator.write().refresh()` ~L692). +- Open entry point that runs the sweep: `Omnigraph::open` (read-write mode) → + `db/manifest/recovery.rs` sweep. +- Repro: §2 above. Stress under `schema_routes` suite parallelism; 0% isolated. + +--- + +## 9. Suggested next steps + +1. Add tracing at the recovery roll-forward error site (sidecar kind/id, pins, + observed vs expected) and capture a failing run (§6.1, §6.2). +2. Reproduce deterministically at the engine level with a failpoint (§6.4) — this is + the red test (rule 12). +3. Implement the classifier-idempotency fix (§5) in a separate commit; confirm + red→green and that the stress loop goes to 0 failures over ≥50 iterations. +4. Keep it a standalone PR (not bundled with RFC-013 follow-ons). diff --git a/docs/dev/index.md b/docs/dev/index.md index 23f0610..40aff0b 100644 --- a/docs/dev/index.md +++ b/docs/dev/index.md @@ -93,6 +93,8 @@ Working documents for in-flight feature work. Removed when the work lands. | CLI refactoring — one addressing & config model post-`omnigraph.yaml`: scope + `--graph` + derived access path, served-default / privileged-direct, profiles, named queries, capability classifier (completes RFC-008) | [rfc-011-cli-refactoring.md](rfc-011-cli-refactoring.md) | | Provider-independent embedding configuration — one resolved `EmbeddingConfig` + sealed provider enum (Gemini/OpenAI/Mock), identity recorded in the schema IR, query-time same-space validation, NFR floor | [rfc-012-embedding-provider-config.md](rfc-012-embedding-provider-config.md) | | Write-path latency — capture-once `WriteTxn`, version-pinned opens, one `GraphPublishAuthority` fed declarative `PublishPlan`s, manifest-authoritative lineage, epoch fence, bounded history (compaction + cleanup), and an IO-counted cost contract (`iss-write-s3-roundtrip-amplification`, `iss-991`) | [rfc-013-write-path-latency.md](rfc-013-write-path-latency.md) | +| RFC-013 handoff — current-state map, latest validation, and concrete next actions for finishing write-path latency and correctness work | [handoff-rfc-013-write-path.md](handoff-rfc-013-write-path.md) | +| Schema-apply recovery flake handoff — investigation notes and validation plan for the intermittent schema-apply reopen race | [handoff-schema-apply-recovery-flake.md](handoff-schema-apply-recovery-flake.md) | ## Boundary diff --git a/docs/dev/rfc-013-write-path-latency.md b/docs/dev/rfc-013-write-path-latency.md index 1954b01..53f6430 100644 --- a/docs/dev/rfc-013-write-path-latency.md +++ b/docs/dev/rfc-013-write-path-latency.md @@ -523,7 +523,10 @@ struct WriteTxn { branch: BranchRef, base: PinnedSnapshot, // {manifest_version, per-table (loc,version,e_tag), schema_hash, writer_epoch} session: Arc, // shared per-graph; warms metadata/index caches across opens - handles: HandleCache, // open-by-version; each table opened once, reused across stages + handles: HandleMap, // open the base once WITH session; thread the handle each + // commit RETURNS forward (HEAD walks N→N+1→N+2). NOT a + // version-keyed cache — HEAD moves, so a (table,version) key + // misses; reuse = forward the commit-return handle. [3b-validated] } // A typed, declarative publish plan — the COMPLETE "what", built before any HEAD moves. @@ -546,8 +549,17 @@ impl GraphPublishAuthority { Properties that make it optimal: -- **Stages take `&WriteTxn`/`&PublishPlan`, never storage** — re-resolution and - open-latest are *unrepresentable*. Invariants 2/3/15 hold by construction. +- **Stages take `&WriteTxn`/`&PublishPlan` for the BASE** — re-resolving the pinned + read base / open-latest for the pre-commit phase is unrepresentable; invariants 2/3/15 + hold for the base by construction. **Caveat [3b-validated]:** this is NOT "no + re-resolution anywhere." Three commit-boundary reads are irreducible correctness + machinery and MUST stay fresh: the commit-time `fresh_snapshot_for_branch` (cross-process + OCC), the live-HEAD drift probe (a concurrent writer may have moved HEAD since staging), + and the fork-authority reads (`classify_fork_ref` deliberately bypasses the cached base — + a pinned base there re-opens the "force-delete a live fork" bug). Model "pinned base for + the pre-commit phase + named fresh re-reads at the commit/fork boundary." The achievable + open count is **1 base open (with session) + 1 cheap `latest_version_id` probe + threaded + commit handles**, not literally one open. - **The recovery sidecar *is* the serialized `PublishPlan`.** Phase C and recovery both call `plan.apply()` — a merge that bumps tables A+B can never roll A forward and silently drop B. The