mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-27 02:39:38 +02:00
feat(engine): WriteTxn - validate schema + open each data table once per write (#298)
Some checks failed
CI / Classify Changes (push) Has been cancelled
CI / Check AGENTS.md Links (push) Has been cancelled
CI / Container Entrypoint (push) Has been cancelled
Release Edge / Prepare edge release (push) Has been cancelled
CI / Test Workspace (push) Has been cancelled
CI / Test omnigraph-server --features aws (push) Has been cancelled
CI / RustFS S3 Integration (push) Has been cancelled
Release Edge / Build edge omnigraph-linux-x86_64 (push) Has been cancelled
Release Edge / Build edge omnigraph-macos-arm64 (push) Has been cancelled
Release Edge / Build edge omnigraph-windows-x86_64 (push) Has been cancelled
Release Edge / Smoke Windows installer (push) Has been cancelled
Some checks failed
CI / Classify Changes (push) Has been cancelled
CI / Check AGENTS.md Links (push) Has been cancelled
CI / Container Entrypoint (push) Has been cancelled
Release Edge / Prepare edge release (push) Has been cancelled
CI / Test Workspace (push) Has been cancelled
CI / Test omnigraph-server --features aws (push) Has been cancelled
CI / RustFS S3 Integration (push) Has been cancelled
Release Edge / Build edge omnigraph-linux-x86_64 (push) Has been cancelled
Release Edge / Build edge omnigraph-macos-arm64 (push) Has been cancelled
Release Edge / Build edge omnigraph-windows-x86_64 (push) Has been cancelled
Release Edge / Smoke Windows installer (push) Has been cancelled
* docs(rfc-013): step-3b handoff + §4.1 corrections (validated)
Add the RFC-013 write-path handoff doc, and correct §4.1's WriteTxn sketch from the
4-subagent validation against current code:
- HandleCache → handle-threading (forward the commit-return handle; a version-keyed
cache misses because HEAD walks N→N+1→N+2 across staging + index-build commits).
- "re-resolution unrepresentable" softened to "pinned base for the pre-commit phase +
named fresh re-reads at the commit/fork boundary" — three reads (commit-time OCC, the
live-HEAD drift probe, fork authority) are irreducible correctness machinery.
- WriteParams DOES carry a session field; the real constraint is "stage off an open
Dataset," so attach the Session by opening read-style then staging off it.
* test(engine): RED step-3b capture-once fitness asserts + open_count probe
Two write-path cost gates, RED today, GREEN after the WriteTxn lands:
- write_validates_schema_contract_once: a write must validate the schema contract
once (3 read_text + 2 exists). Today re-validates at every resolve point —
measured 12 read_text / 9 exists (~4 validations) via CountingStorageAdapter
(zero production change; the write twin of the read-path schema-once test).
- keyed_insert_opens_table_at_most_once: a keyed single-table write must open its
table <=1x. Today measured 10 opens.
Adds an exact open-CALL probe: open_count + record_open() on QueryIoProbes (mirroring
probe_count/record_probe), called at both open chokepoints; surfaced as
IoCounts.open_count. forbidden_apis guarantees every write open routes through them.
* feat(engine): WriteTxn carrier + open_write_txn (3b scaffolding)
The capture-once write transaction (RFC-013 step 3b): WriteTxn{branch, base:
Snapshot, session} + Omnigraph::open_write_txn, which validates the schema contract
once and pins the base snapshot + the shared per-graph Session.
Landed as reviewed scaffolding (gated #[allow(dead_code)]); the next pass threads
Option<&WriteTxn> through open_for_mutation_on_branch / staging on the non-strict
bound-branch path — opening the base once from the pinned entry with the warm session
(a session-aware pinned opener returning a SnapshotHandle) and skipping the per-table
schema re-validation — to turn the two RED cost gates green. Strict ops / fork / the
commit-time OCC re-read keep their fresh reads.
* test(engine): scope write-path open_count to data tables (RFC-013 step 3b)
The keyed_insert_opens_table_at_most_once gate asserted open_count <= 1, but
open_count was a single unclassified counter: record_open() fires in both
open chokepoints, and open_dataset_tracked also opens the internal/system
tables (__manifest via layout.rs, _graph_commits/_graph_commit_actors via
commit_graph.rs). So the count conflated data-table opens with the publisher
CAS + commit-graph append opens — making the gate measure the wrong quantity
and unreachable by threading alone (the manifest publish keeps it >1 regardless).
Scope it by table class, mirroring the read-side counters (which already split
by URI prefix via separate wrappers): record_open(uri) classifies the open's
last path segment and feeds data_open_count vs internal_open_count. IoCounts
exposes both; the gate now asserts data_open_count <= 1.
Re-baselined: a single keyed insert is data_open_count=4 / internal_open_count=6
(sum 10, the old conflated value). The RED target for the WriteTxn threading is
now the real data-table-open count (4 -> 1), with internal opens correctly out
of scope. Pure test-harness/instrumentation; no production behavior change
(classification runs only inside the probe closure, skipped when no probes are
installed).
Also marks #297 (optimize-vs-write race) as landed in the step-3b handoff —
this branch is already stacked on origin/main after it merged.
* feat(engine): validate the schema contract once per write (RFC-013 step 3b)
A single mutate/load re-validated the schema contract ~4 times: at the entry
(ensure_schema_state_valid), per-table in open_for_mutation_on_branch
(resolved_branch_target), at the commit-time OCC re-read (fresh_snapshot_for_branch),
and in the publisher's index-build snapshot (snapshot_for_branch). Each validation
is 3 read_text + 2 exists on the storage adapter — O(touched resolve-points) of
redundant contract I/O on every write.
Thread the already-landed WriteTxn carrier through the write path: capture
`txn = open_write_txn(branch)` once at the mutate/load entry (the single validation),
then source the per-table entry and the commit/publish snapshots from `txn.base`
instead of re-resolving. When `txn` is None (branch merge, schema apply, tests) every
function is byte-identical to before.
- mutate_with_current_actor / load_jsonl_reader capture txn once (replacing the
entry-point ensure_schema_state_valid) and thread Some(&txn) through
execute_*/open_table_for_mutation, commit_all, and
commit_updates_on_branch_with_expected.
- open_for_mutation_on_branch sources (snapshot, branch) from txn.base/txn.branch
when present — skipping resolved_branch_target's re-validation. The OPEN itself is
unchanged (still HEAD via open_dataset_head_for_write), and strict ops keep
ensure_expected_version. Schema-once applies to strict and non-strict alike; the
data-open collapse is a separate change.
- commit_all uses fresh_snapshot_for_branch_unchecked (the OCC manifest re-read minus
the schema re-validation) when txn is present; the drift guard is unchanged.
- prepare_updates_for_commit uses txn.base for the publisher index-build snapshot.
fresh_snapshot_for_branch{,_unchecked} now read the manifest directly via
ManifestCoordinator instead of resolve_target. The OCC re-read consumes only the
Snapshot (per-table location + version), which ManifestCoordinator::open().snapshot()
produces identically — but resolve_target additionally opened the commit graph (a
spurious _graph_commits.lance exists probe the OCC read never consults). Dropping that
load is a pure read-cost reduction for every fresh-snapshot caller (commit_all's None
arm, optimize, repair, fork reclaim); the returned Snapshot is unchanged and the read
is a fresher cold manifest re-read, so the OCC freshness guarantee is preserved.
Greens write_validates_schema_contract_once (3 read_text / 2 exists, was 12/9).
keyed_insert_opens_table_at_most_once stays red (data_open_count=4) — the open
collapse lands next. Full engine suite green otherwise.
* feat(engine): open each data table once per write (RFC-013 step 3b)
A single keyed-node mutate opened its data table 4 times: accumulation (to read
.version()), staging (the real write base), the commit-time drift guard (to read
live HEAD), and the publisher's index build (reopen at the just-committed version).
Collapse three of the four — using the WriteTxn carrier threaded for schema-once —
so a write opens each touched data table at most once.
- #1 accumulation: open_for_mutation_on_branch now returns
(Option<SnapshotHandle>, expected_version, full_path, table_branch). On the txn's
own branch, a non-strict (Insert/Merge) op needs no open — the only thing the
caller reads is .version() (the CAS fence), which is exactly the pinned base
version (entry.table_version). So skip open_dataset_head_for_write and source the
version from txn.base. The node insert path already discarded that handle; the
edge path resolves a pinned read only when non-default cardinality needs it.
STRICT ops and any write that must fork still open live HEAD + ensure_expected_version.
- #3 commit drift guard: commit_all reads live HEAD via
entry.dataset.dataset().latest_version_id() — a cheap manifest-pointer probe off
the already-open staging handle (the same primitive ManifestCoordinator::
probe_latest_version uses) instead of a fresh open_dataset_head_for_write. The
head<current / head>current drift classification is byte-identical.
- #4 index build: commit_all now returns the per-table post-commit_staged
SnapshotHandle map; commit_updates_on_branch_with_expected threads it into
prepare_updates_for_commit, which builds indices on the threaded handle instead of
reopening at the same just-committed version. Absent a handle (other writers,
inline/delete tables) the reopen path is byte-identical.
When txn is None (branch merge, schema apply, tests) every function opens and checks
exactly as before. Greens keyed_insert_opens_table_at_most_once (data_open_count 4->1).
Schema-once gate stays 3/2. Full engine suite + failpoints (recovery sidecar lifecycle)
green.
* refactor(engine): name the write-path open/commit returns (RFC-013 step 3b)
The open collapse left two positional returns that are easy to mis-thread and
carry an unwritten contract: open_for_mutation_on_branch's
(Option<SnapshotHandle>, u64, String, Option<String>) and commit_all's 5-tuple
(updates, expected_versions, sidecar_handle, guards, committed_handles). Replace
both with named structs so each field reads at the call site and the Option's
contract is documented, not folklore.
- OpenedForMutation { handle, expected_version, full_path, table_branch } with a
require_handle(ctx) helper for the callers that must have a handle (strict ops,
the fork path, every no-txn caller — branch merge, the seed test). The handle is
None only on the non-strict-txn open-skip path (collapse #1); require_handle
panics with a named context if that contract is ever broken.
- CommittedMutation { updates, expected_versions, sidecar_handle, guards,
committed_handles } for commit_all; consumers destructure into the same local
bindings they already used, so the publish/sidecar/guard-hold logic is unchanged.
- A debug_assert in open_table_for_mutation pins the skip contract: a missing handle
is legal only on the non-strict txn path, so a future strict arm returning None
trips in debug builds instead of handing None to a require_handle consumer.
Pure refactor — no behavior change. Both cost gates stay green (schema 3/2,
data_open_count=1), full engine suite + lib (162) green.
* refactor(engine): drop the unearned session field from WriteTxn (RFC-013 step 3b)
The open collapse greens data_open_count<=1 by SKIPPING the accumulation open,
PROBING live HEAD with latest_version_id, and REUSING the commit_staged handle —
none of which consume a session. The captured WriteTxn.session was therefore dead
(`#[allow(dead_code)]`): unearned surface a reviewer rightly flags.
Remove it. The carrier is now {branch, base} — exactly what schema-once + the open
collapse use. Step 5 (PublishPlan unification) makes WriteTxn the non-optional
publish carrier and is the right home for session-aware base opens, where the
warm-session benefit on the single remaining open — an object-store (S3) phenomenon,
invisible on local FS — can be earned by its own cost gate rather than carried dead
through this PR.
No behavior change; both cost gates stay green (schema 3/2, data_open_count=1).
* docs(rfc-013): mark step 3b DONE — schema-once + open-collapse shipped, session deferred to step 5
* docs(rfc-013): capture the write-base-staleness convergence (§1d)
Three findings this cycle share one root — the write base is a stale, un-probed,
un-classified pin (the read path probes; the write path returns the warm
coordinator snapshot):
- #298 edge-@card stale-read regression (cursor High / codex P1, VALID): collapse #1
made the cardinality scan read txn.base instead of live HEAD, so a concurrent edge
is uncounted and a max can be exceeded. Fix on #298: restore the live-HEAD read +
deterministic test + correct the single-writer doc comment.
- The structural liability underneath: no unified write-validation read-set —
endpoint/cardinality/uniqueness each pick freshness ad hoc (warm/pinned/live),
the same cardinality check forks mutation-vs-loader, none re-validated at commit.
- The served-strict-write stale-view false-fail (validated on prod + a #[ignore]
repro): a strict update/delete false-fails ExpectedVersionMismatch after an external
optimize advance — the write-side mirror of #297/§6.6. The naive blanket probe is
proven wrong (breaks the cross-process lost-update OCC contract).
All three converge on Design A (step 5): open_txn's warm probe makes the base fresh,
the op-class-aware precondition (derive maintenance vs logical from Lance per-version
transaction metadata — no parallel marker) fast-forwards maintenance and fails logical,
and §7.1's read-set-in-CAS unifies + re-validates the validation read-set. §8 records
the #298 follow-up, the widened §7.1 scope, and the step-5 two-test acceptance contract.
* test(engine): RED — edge @card must scan live HEAD, not stale txn.base (#298)
Regression guard for the cursor-High/codex-P1 finding on #298: 3b's collapse #1
made the non-strict edge-insert cardinality scan read the pinned txn.base instead
of live HEAD (edge_cardinality_read_handle), so a concurrent edge committed after
txn capture is uncounted and a @card max is silently exceeded (invariant 9).
Deterministic two-handle test (no failpoint): handle A commits WorksAt(Alice->Acme)
to the @card(0..1) max; stale handle B (never read since) inserts a second WorksAt
for Alice. B's coordinator is stale by construction (the write path doesn't probe),
so B scans txn.base (Alice has 0) and wrongly commits the 2nd edge. RED: the insert
that must be rejected currently succeeds (panics at unwrap_err). Goes green when the
scan reads live HEAD.
* fix(engine): scan live HEAD for edge @card, not the pinned txn.base (#298)
3b's collapse #1 skips the non-strict edge accumulation open, so edge_cardinality_
read_handle reopened the edge table at the pinned txn.base for the @card scan. Since
cardinality is validated once (never rechecked at commit), a concurrent edge committed
after txn capture was uncounted and a @card max could be silently exceeded (invariant
9) — the cursor-High/codex-P1 regression on #298. Pre-3b the scan read live HEAD (the
mutation's own open_dataset_head_for_write handle).
Restore the live-HEAD read: take the table LOCATION from the pinned entry (stable
across versions) and open the dataset at its current HEAD via open_dataset_head_for_
write. Gate-safe — the data_open_count / merge-insert-only gates are node inserts; the
edge cardinality path (non-default @card only) is untouched by them, and the extra
live-HEAD open is exactly the pre-3b shape. Also drops the dead None-fallback's schema
re-validation (greptile P2, auto-resolved). The residual validate->commit TOCTOU is the
pre-existing §7.1 gap (RFC-013 step 4), recorded in handoff §1d/§8.
Turns cardinality_rejected_for_stale_handle_after_concurrent_edge_commit green;
validators / write_cost / writes / consistency / end_to_end / branching all green.
* docs(dev): link handoff docs from index
* docs(engine): tighten 3b claims to match the code (#298 review)
Review caught several comments/docs overclaiming what the code does (the session
drop + the #298 cardinality fix left stale/too-strong wording). No logic change.
- open_write_txn doc: drop the stale "shared per-graph Session" (WriteTxn no longer
carries one); scope "once" to the table-touch hot path and note edge/load RI
validation still re-resolves (→ step 4 §7.1) + the session-aware open is step 5.
- edge cardinality call-site comment: it said the scan uses a "pinned txn.base" — it
now opens LIVE HEAD (#298); corrected.
- write_cost.rs: "opens the base once (with the shared Session)" → session-aware base
open is deferred to step 5.
- data_open_count completeness (instrumentation.rs + write_cost.rs): forbidden_apis
only keeps engine code OUTSIDE the storage layer on the chokepoints; table_store.rs
is allow-listed and holds direct Dataset::opens for branch-management ops (not the
keyed-write hot path the gate measures). Narrowed the claim accordingly.
- handoff §4: "schema once / open once" is the node hot path (the two gates); edge
endpoint + loader RI/cardinality still re-validate and read warm — #298 un-regresses
cardinality only, it does NOT close write-validation freshness (that's step 4 §1d/§7.1).
build clean; write_cost / validators / forbidden_apis green.
This commit is contained in:
parent
6d4606a830
commit
7d3a52d674
15 changed files with 1405 additions and 106 deletions
|
|
@ -10,6 +10,7 @@ pub use commit_graph::GraphCommit;
|
|||
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
|
||||
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
|
||||
pub(crate) use omnigraph::ensure_public_branch_ref;
|
||||
pub(crate) use omnigraph::WriteTxn;
|
||||
pub use omnigraph::{
|
||||
CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, PendingIndex,
|
||||
RepairAction, RepairClassification, RepairOptions, RepairStats, SchemaApplyOptions,
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ pub use repair::{
|
|||
};
|
||||
pub use schema_apply::SchemaApplyOptions;
|
||||
pub use table_ops::PendingIndex;
|
||||
pub(crate) use table_ops::OpenedForMutation;
|
||||
|
||||
use super::commit_graph::GraphCommit;
|
||||
use super::manifest::{
|
||||
|
|
@ -79,6 +80,35 @@ pub struct SchemaApplyPreview {
|
|||
pub catalog: Catalog,
|
||||
}
|
||||
|
||||
/// A capture-once write transaction (RFC-013 step 3b). Pins the operation's read
|
||||
/// base ONCE so the per-table opens reuse the pinned version instead of
|
||||
/// re-resolving / re-validating per table. The schema contract is validated once
|
||||
/// (when `base` is captured). NOT a general "no re-resolution" handle — the
|
||||
/// commit-time OCC re-read, the live-HEAD drift probe, and the fork-authority reads
|
||||
/// stay fresh (correctness machinery). Step 5 (PublishPlan unification) makes this
|
||||
/// the non-optional publish carrier and adds session-aware base opens there, gated
|
||||
/// by an S3 cost test — the warm-session benefit on the single remaining open is an
|
||||
/// object-store phenomenon, so it earns its own gate rather than riding this PR.
|
||||
///
|
||||
/// Threaded as `Option<&WriteTxn>` through the mutate/load write chain
|
||||
/// (`open_for_mutation_on_branch`, `commit_all`, `commit_updates_on_branch_with_expected`)
|
||||
/// so a single write validates the schema contract EXACTLY ONCE — at capture. When
|
||||
/// present, the per-table resolves source the pinned `base` entry instead of calling
|
||||
/// `resolved_branch_target` / `snapshot_for_branch` / `fresh_snapshot_for_branch`
|
||||
/// (each of which re-runs `ensure_schema_state_valid`). When absent (`None` — every
|
||||
/// non-mutate/load caller), every threaded function behaves byte-identically to
|
||||
/// before. The carrier never removes a version guard or changes which dataset version
|
||||
/// the per-table open targets: strict ops keep `open_dataset_head_for_write` +
|
||||
/// `ensure_expected_version`, and the commit-time OCC re-read still opens a fresh
|
||||
/// manifest snapshot (via `fresh_snapshot_for_branch_unchecked`) — only the redundant
|
||||
/// schema re-validation is dropped.
|
||||
pub(crate) struct WriteTxn {
|
||||
/// The resolved branch (`None` = main).
|
||||
pub(crate) branch: Option<String>,
|
||||
/// The pinned base snapshot (per-table location + version + e_tag), captured once.
|
||||
pub(crate) base: Snapshot,
|
||||
}
|
||||
|
||||
/// Top-level handle to an Omnigraph database.
|
||||
///
|
||||
/// An Omnigraph is a Lance-native graph database with git-style branching.
|
||||
|
|
@ -736,6 +766,29 @@ impl Omnigraph {
|
|||
*self.coordinator.write().await = coordinator;
|
||||
}
|
||||
|
||||
/// Open a capture-once write transaction (RFC-013 step 3b): validate the schema
|
||||
/// contract ONCE and pin the base snapshot. The per-table opens take
|
||||
/// `Option<&WriteTxn>` and, on the bound branch for the non-strict (Insert/Merge)
|
||||
/// path, source the pinned base entry — instead of re-resolving (re-validating the
|
||||
/// schema) per table. Strict ops, the fork path, and the commit-time OCC re-read
|
||||
/// keep their fresh reads (those are correctness machinery — see the handoff doc).
|
||||
///
|
||||
/// "Once" covers the table-touch hot path captured here (proven by the node-insert
|
||||
/// gate `write_validates_schema_contract_once`); it does NOT yet cover edge endpoint
|
||||
/// / cardinality RI validation (`ensure_node_id_exists`, the loader's RI/cardinality),
|
||||
/// which still resolve through `snapshot_for_branch` and re-validate. Those reads must
|
||||
/// observe LIVE committed state, so unifying them (validate-once + pinned + re-checked
|
||||
/// read-set) is step 4's §7.1 work — threading `txn.base` there would re-introduce the
|
||||
/// stale-read class the #298 cardinality fix removed. A session-aware base open is
|
||||
/// likewise deferred to step 5 (handoff §1d).
|
||||
pub(crate) async fn open_write_txn(&self, branch: Option<&str>) -> Result<WriteTxn> {
|
||||
let resolved = self.resolved_branch_target(branch).await?;
|
||||
Ok(WriteTxn {
|
||||
branch: resolved.branch,
|
||||
base: resolved.snapshot,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn resolved_branch_target(
|
||||
&self,
|
||||
branch: Option<&str>,
|
||||
|
|
@ -770,12 +823,39 @@ impl Omnigraph {
|
|||
|
||||
pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
|
||||
let coord = self.coordinator.read().await;
|
||||
coord
|
||||
.resolve_target(&requested)
|
||||
.await
|
||||
.map(|resolved| resolved.snapshot)
|
||||
self.fresh_snapshot_for_branch_unchecked(branch).await
|
||||
}
|
||||
|
||||
/// Fresh per-branch manifest snapshot WITHOUT the schema-contract
|
||||
/// re-validation. Identical OCC freshness to [`fresh_snapshot_for_branch`]
|
||||
/// — a fresh manifest re-read from storage, never the warm cache — only the
|
||||
/// redundant `ensure_schema_state_valid` is dropped. Used inside a single
|
||||
/// write once a `WriteTxn` has already validated the contract at capture: the
|
||||
/// commit-time drift re-read needs the live manifest, not a second contract
|
||||
/// read. Callers with no `WriteTxn` MUST use the checked variant.
|
||||
///
|
||||
/// Reads the manifest directly via `ManifestCoordinator` rather than
|
||||
/// `resolve_target`. The OCC re-read uses only the returned `Snapshot`
|
||||
/// (per-table location + version), which `ManifestCoordinator::open().snapshot()`
|
||||
/// produces identically to `GraphCoordinator::open(...).snapshot()` — but
|
||||
/// `resolve_target` additionally opens the commit graph (an extra
|
||||
/// `_graph_commits.lance` probe) the OCC read never consults. Skipping that
|
||||
/// load is a pure read-cost reduction, not a freshness change. The checked
|
||||
/// `fresh_snapshot_for_branch` delegates here, so its no-`txn` callers
|
||||
/// (commit_all's None arm, optimize, repair, fork reclaim) get the same
|
||||
/// identical `Snapshot` via this lighter manifest-only read; they consume
|
||||
/// only the snapshot and never relied on the commit-graph side load.
|
||||
pub(crate) async fn fresh_snapshot_for_branch_unchecked(
|
||||
&self,
|
||||
branch: Option<&str>,
|
||||
) -> Result<Snapshot> {
|
||||
let manifest = match branch {
|
||||
Some(branch) => {
|
||||
crate::db::manifest::ManifestCoordinator::open_at_branch(self.uri(), branch).await?
|
||||
}
|
||||
None => crate::db::manifest::ManifestCoordinator::open(self.uri()).await?,
|
||||
};
|
||||
Ok(manifest.snapshot())
|
||||
}
|
||||
|
||||
pub(crate) async fn version(&self) -> u64 {
|
||||
|
|
@ -1599,7 +1679,7 @@ impl Omnigraph {
|
|||
&self,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
) -> Result<OpenedForMutation> {
|
||||
table_ops::open_for_mutation(self, table_key, op_kind).await
|
||||
}
|
||||
|
||||
|
|
@ -1608,8 +1688,9 @@ impl Omnigraph {
|
|||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<OpenedForMutation> {
|
||||
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind, txn).await
|
||||
}
|
||||
|
||||
/// Fork `table_key` onto `active_branch` from the given source state,
|
||||
|
|
@ -1728,6 +1809,8 @@ impl Omnigraph {
|
|||
updates: &[crate::db::SubTableUpdate],
|
||||
expected_table_versions: &std::collections::HashMap<String, u64>,
|
||||
actor_id: Option<&str>,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
committed_handles: std::collections::HashMap<String, crate::storage_layer::SnapshotHandle>,
|
||||
) -> Result<u64> {
|
||||
table_ops::commit_updates_on_branch_with_expected(
|
||||
self,
|
||||
|
|
@ -1735,6 +1818,8 @@ impl Omnigraph {
|
|||
updates,
|
||||
expected_table_versions,
|
||||
actor_id,
|
||||
txn,
|
||||
committed_handles,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -2466,10 +2551,13 @@ edge WorksAt: Person -> Company
|
|||
}
|
||||
|
||||
async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
|
||||
// No-txn entry, so the handle is always `Some` (collapse #1's skip is
|
||||
// gated on `txn.is_some()`).
|
||||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
|
||||
.await
|
||||
.unwrap();
|
||||
.unwrap()
|
||||
.require_handle("seed_person_row test");
|
||||
let schema: Arc<Schema> = Arc::new(ds.dataset().schema().into());
|
||||
let columns: Vec<Arc<dyn Array>> = schema
|
||||
.fields()
|
||||
|
|
|
|||
|
|
@ -488,18 +488,52 @@ pub(super) async fn needs_index_work_edge(
|
|||
|| !db.storage().has_btree_index(&ds, "dst").await?)
|
||||
}
|
||||
|
||||
/// Result of opening a sub-table for mutation. `handle` is `None` only when a
|
||||
/// non-strict (Insert/Merge) op on the WriteTxn's own branch skipped the
|
||||
/// accumulation open (RFC-013 step 3b collapse #1) — there the caller needs just
|
||||
/// `expected_version`. It is ALWAYS `Some` for strict ops, the fork path, and
|
||||
/// every no-`txn` caller (branch merge), which use [`Self::require_handle`].
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct OpenedForMutation {
|
||||
/// The opened dataset, or `None` on the non-strict-txn open-skip path.
|
||||
pub(crate) handle: Option<SnapshotHandle>,
|
||||
/// The publisher's CAS fence: the opened handle's version, or — when the open
|
||||
/// was skipped — the pinned base entry's version (equal absent uncovered drift).
|
||||
pub(crate) expected_version: u64,
|
||||
pub(crate) full_path: String,
|
||||
pub(crate) table_branch: Option<String>,
|
||||
}
|
||||
|
||||
impl OpenedForMutation {
|
||||
/// Destructure for a caller that REQUIRES the handle (strict ops, the fork
|
||||
/// path, every no-`txn` caller). The `None` skip fires solely on the
|
||||
/// non-strict `txn` path, which these callers are not — so a panic here means
|
||||
/// a future change broke that contract, named by `ctx`.
|
||||
pub(crate) fn require_handle(self, ctx: &str) -> (SnapshotHandle, String, Option<String>) {
|
||||
let handle = self.handle.unwrap_or_else(|| {
|
||||
panic!("{ctx}: open_for_mutation returned no handle on a path that requires one")
|
||||
});
|
||||
(handle, self.full_path, self.table_branch)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn open_for_mutation(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
) -> Result<OpenedForMutation> {
|
||||
let current_branch = db
|
||||
.coordinator
|
||||
.read()
|
||||
.await
|
||||
.current_branch()
|
||||
.map(str::to_string);
|
||||
open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind).await
|
||||
// `open_for_mutation` is the no-txn entry (branch merge). Passing `None`
|
||||
// keeps the exact pre-WriteTxn code path (a fresh `resolved_branch_target`
|
||||
// that re-validates the schema). With `txn = None` the non-strict early-skip
|
||||
// in `open_for_mutation_on_branch` never fires, so this always returns a
|
||||
// `Some(handle)` for its callers.
|
||||
open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind, None).await
|
||||
}
|
||||
|
||||
/// Open a sub-table for mutation. The `op_kind` selects the strict-vs-relaxed
|
||||
|
|
@ -513,15 +547,69 @@ pub(super) async fn open_for_mutation_on_branch(
|
|||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<OpenedForMutation> {
|
||||
db.ensure_schema_apply_not_locked("write").await?;
|
||||
let resolved = db.resolved_branch_target(branch).await?;
|
||||
let entry = resolved
|
||||
.snapshot
|
||||
// Source the resolved (snapshot, branch). With a `WriteTxn` the contract was
|
||||
// validated once at capture, so use the pinned base + resolved branch instead
|
||||
// of `resolved_branch_target` (which re-runs `ensure_schema_state_valid`). The
|
||||
// base is the same fresh per-branch manifest read the no-txn path would have
|
||||
// resolved — only the redundant schema re-validation is dropped. Without a txn
|
||||
// this is byte-identical to the prior `resolved_branch_target` call.
|
||||
let (snapshot, resolved_branch) = match txn {
|
||||
Some(txn) => (txn.base.clone(), txn.branch.clone()),
|
||||
None => {
|
||||
let resolved = db.resolved_branch_target(branch).await?;
|
||||
(resolved.snapshot, resolved.branch)
|
||||
}
|
||||
};
|
||||
let entry = snapshot
|
||||
.entry(table_key)
|
||||
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
|
||||
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
match resolved.branch.as_deref() {
|
||||
|
||||
// Collapse #1 (RFC-013 step 3b): a non-strict op (Insert/Merge) on the txn's
|
||||
// own branch needs no dataset open for ACCUMULATION — the only thing the
|
||||
// caller reads from this handle on the non-strict path is `.version()` (the
|
||||
// publisher's CAS fence), which is exactly the pinned base version. The base
|
||||
// already validated the schema contract once, and the staging reopen
|
||||
// (`reopen_for_mutation`) plus the publisher CAS in `commit_all` are the real
|
||||
// drift guards. So skip `open_dataset_head_for_write` entirely and source the
|
||||
// expected version from the pinned entry.
|
||||
//
|
||||
// Gated on `txn.is_some()`: without a txn (branch merge's `open_for_mutation`)
|
||||
// every arm below is byte-identical to before. STRICT ops (Update/Delete/
|
||||
// SchemaRewrite) always open live HEAD + run `ensure_expected_version`
|
||||
// (read-modify-write SI), and any write that must FORK (the table isn't yet on
|
||||
// the resolved branch) opens too (the fork is a real Lance state advance the
|
||||
// manifest snapshot can't substitute for).
|
||||
if txn.is_some() && !op_kind.strict_pre_stage_version_check() {
|
||||
match resolved_branch.as_deref() {
|
||||
// Non-strict, table already on the active branch → no open, no fork.
|
||||
Some(active_branch) if entry.table_branch.as_deref() == Some(active_branch) => {
|
||||
return Ok(OpenedForMutation {
|
||||
handle: None,
|
||||
expected_version: entry.table_version,
|
||||
full_path,
|
||||
table_branch: Some(active_branch.to_string()),
|
||||
});
|
||||
}
|
||||
// Main branch, non-strict → no open. (Main never forks.)
|
||||
None => {
|
||||
return Ok(OpenedForMutation {
|
||||
handle: None,
|
||||
expected_version: entry.table_version,
|
||||
full_path,
|
||||
table_branch: None,
|
||||
});
|
||||
}
|
||||
// Non-strict but the table isn't on the active branch yet — falls
|
||||
// through to fork below.
|
||||
Some(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
match resolved_branch.as_deref() {
|
||||
None => {
|
||||
let ds = db
|
||||
.storage()
|
||||
|
|
@ -531,7 +619,13 @@ pub(super) async fn open_for_mutation_on_branch(
|
|||
db.storage()
|
||||
.ensure_expected_version(&ds, table_key, entry.table_version)?;
|
||||
}
|
||||
Ok((ds, full_path, None))
|
||||
let version = ds.version();
|
||||
Ok(OpenedForMutation {
|
||||
handle: Some(ds),
|
||||
expected_version: version,
|
||||
full_path,
|
||||
table_branch: None,
|
||||
})
|
||||
}
|
||||
Some(active_branch) => {
|
||||
let (ds, table_branch) = open_owned_dataset_for_branch_write(
|
||||
|
|
@ -544,7 +638,13 @@ pub(super) async fn open_for_mutation_on_branch(
|
|||
op_kind,
|
||||
)
|
||||
.await?;
|
||||
Ok((ds, full_path, table_branch))
|
||||
let version = ds.version();
|
||||
Ok(OpenedForMutation {
|
||||
handle: Some(ds),
|
||||
expected_version: version,
|
||||
full_path,
|
||||
table_branch,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1065,12 +1165,30 @@ async fn prepare_updates_for_commit(
|
|||
db: &Omnigraph,
|
||||
branch: Option<&str>,
|
||||
updates: &[crate::db::SubTableUpdate],
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
// Post-`commit_staged` handles handed out by `StagedMutation::commit_all`
|
||||
// (RFC-013 step 3b, collapse #4): table_key → the handle already open at
|
||||
// its just-committed version. When a table's handle is present, the index
|
||||
// build below reuses it and SKIPS the `reopen_for_mutation` open. Absent
|
||||
// entries (other writers — schema apply, merge, ensure_indices, tests —
|
||||
// pass `HashMap::new()`; inline-committed/delete tables are never staged)
|
||||
// keep the byte-identical `reopen_for_mutation` path.
|
||||
mut committed_handles: std::collections::HashMap<String, SnapshotHandle>,
|
||||
) -> Result<Vec<crate::db::SubTableUpdate>> {
|
||||
if updates.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let snapshot = db.snapshot_for_branch(branch).await?;
|
||||
// With a `WriteTxn` the schema contract was validated once at capture, so
|
||||
// reuse the pinned base entries (same per-branch manifest snapshot) instead
|
||||
// of `snapshot_for_branch` (which re-runs `ensure_schema_state_valid`). Only
|
||||
// the `entry(table_key).table_path` is read out of it here, identical to the
|
||||
// no-txn path; the post-`commit_staged` index build below still reopens the
|
||||
// dataset at its just-committed version. Without a txn, byte-identical.
|
||||
let snapshot = match txn {
|
||||
Some(txn) => txn.base.clone(),
|
||||
None => db.snapshot_for_branch(branch).await?,
|
||||
};
|
||||
let mut prepared = Vec::with_capacity(updates.len());
|
||||
|
||||
for update in updates {
|
||||
|
|
@ -1084,21 +1202,34 @@ async fn prepare_updates_for_commit(
|
|||
let mut prepared_update = update.clone();
|
||||
if prepared_update.row_count > 0 {
|
||||
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
// Strict version check is correct here: this runs INSIDE
|
||||
// Reuse the post-`commit_staged` handle when the caller handed one
|
||||
// out (collapse #4): it is already open at exactly
|
||||
// `prepared_update.table_version`, so the defense-in-depth strict
|
||||
// re-check `reopen_for_mutation` would run is trivially satisfied
|
||||
// and the open is redundant. When no handle is present (other
|
||||
// writers, or any non-staged table), fall back to the byte-identical
|
||||
// `reopen_for_mutation` path.
|
||||
//
|
||||
// Strict version check is correct on the fallback: this runs INSIDE
|
||||
// the publisher commit path, after `commit_staged` already
|
||||
// advanced Lance HEAD to `prepared_update.table_version`.
|
||||
// The check is a defense-in-depth assertion that the
|
||||
// dataset state matches what we just committed; not the
|
||||
// pre-stage race the op-kind policy targets.
|
||||
let mut ds = reopen_for_mutation(
|
||||
db,
|
||||
&prepared_update.table_key,
|
||||
&full_path,
|
||||
prepared_update.table_branch.as_deref(),
|
||||
prepared_update.table_version,
|
||||
crate::db::MutationOpKind::SchemaRewrite,
|
||||
)
|
||||
.await?;
|
||||
let mut ds = match committed_handles.remove(&prepared_update.table_key) {
|
||||
Some(ds) => ds,
|
||||
None => {
|
||||
reopen_for_mutation(
|
||||
db,
|
||||
&prepared_update.table_key,
|
||||
&full_path,
|
||||
prepared_update.table_branch.as_deref(),
|
||||
prepared_update.table_version,
|
||||
crate::db::MutationOpKind::SchemaRewrite,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
// Any column not yet buildable (e.g. a vector column whose rows
|
||||
// have null embeddings) is deferred and logged inside
|
||||
// build_indices; a later ensure_indices/optimize materializes it.
|
||||
|
|
@ -1237,7 +1368,14 @@ pub(super) async fn commit_updates(
|
|||
.await
|
||||
.current_branch()
|
||||
.map(str::to_string);
|
||||
let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?;
|
||||
let prepared = prepare_updates_for_commit(
|
||||
db,
|
||||
current_branch.as_deref(),
|
||||
updates,
|
||||
None,
|
||||
std::collections::HashMap::new(),
|
||||
)
|
||||
.await?;
|
||||
commit_prepared_updates(db, &prepared, None).await
|
||||
}
|
||||
|
||||
|
|
@ -1281,9 +1419,12 @@ pub(super) async fn commit_updates_on_branch_with_expected(
|
|||
updates: &[crate::db::SubTableUpdate],
|
||||
expected_table_versions: &std::collections::HashMap<String, u64>,
|
||||
actor_id: Option<&str>,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
committed_handles: std::collections::HashMap<String, SnapshotHandle>,
|
||||
) -> Result<u64> {
|
||||
db.ensure_schema_apply_not_locked("write commit").await?;
|
||||
let prepared = prepare_updates_for_commit(db, branch, updates).await?;
|
||||
let prepared =
|
||||
prepare_updates_for_commit(db, branch, updates, txn, committed_handles).await?;
|
||||
commit_prepared_updates_on_branch_with_expected(
|
||||
db,
|
||||
branch,
|
||||
|
|
|
|||
|
|
@ -1068,10 +1068,13 @@ async fn publish_rewritten_merge_table(
|
|||
// source onto target). The inline `delete_where` later in this
|
||||
// function operates on rows the rewrite chose to remove, not
|
||||
// user-facing predicates, so Merge is the correct policy here.
|
||||
let (ds, full_path, table_branch) = target_db
|
||||
// `open_for_mutation` is the no-txn entry, so collapse #1's non-strict
|
||||
// open-skip (gated on `txn.is_some()`) never fires here — the handle is
|
||||
// always `Some`.
|
||||
let (mut current_ds, full_path, table_branch) = target_db
|
||||
.open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
|
||||
.await?;
|
||||
let mut current_ds = ds;
|
||||
.await?
|
||||
.require_handle("branch merge");
|
||||
|
||||
// Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
|
||||
// existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
|
||||
|
|
@ -1237,10 +1240,13 @@ async fn publish_adopted_delta(
|
|||
table_key: &str,
|
||||
delta: &AdoptDelta,
|
||||
) -> Result<crate::db::SubTableUpdate> {
|
||||
let (ds, full_path, table_branch) = target_db
|
||||
// `open_for_mutation` is the no-txn entry, so collapse #1's non-strict
|
||||
// open-skip (gated on `txn.is_some()`) never fires here — the handle is
|
||||
// always `Some`.
|
||||
let (mut current_ds, full_path, table_branch) = target_db
|
||||
.open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
|
||||
.await?;
|
||||
let mut current_ds = ds;
|
||||
.await?
|
||||
.require_handle("branch merge");
|
||||
|
||||
// Phase 1a: append the NEW rows. `stage_append_stream` is a streaming
|
||||
// `Operation::Append` — no hash join — so it never buffers the delta and
|
||||
|
|
|
|||
|
|
@ -601,13 +601,51 @@ use super::staging::{MutationStaging, PendingMode};
|
|||
/// away once Lance exposes a two-phase delete API
|
||||
/// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658))
|
||||
/// and we can stage deletes on the same path as inserts/updates.
|
||||
impl Omnigraph {
|
||||
/// Resolve a LIVE-HEAD read handle for an edge table's committed-state `@card`
|
||||
/// scan when collapse #1 skipped the accumulation open. The edge-insert path no
|
||||
/// longer opens the edge dataset (non-strict op + txn), but cardinality is
|
||||
/// validated ONCE (never rechecked at commit), so the scan must observe the
|
||||
/// freshest committed edges — NOT the pinned `txn.base`. A concurrent writer can
|
||||
/// commit edges to this table after `txn` capture; counting against the stale
|
||||
/// base undercounts and lets a violating insert through (invariant 9). The table
|
||||
/// LOCATION is read from the pinned entry (stable across versions); the dataset is
|
||||
/// opened at live HEAD via `open_dataset_head_for_write` (a read here despite the
|
||||
/// name — no lock/stage), restoring the pre-3b image (the mutation's own open).
|
||||
/// The residual validate→commit race (a writer committing between this scan and
|
||||
/// the end-of-query commit) is the §7.1 gap, closed by RFC-013 step 4.
|
||||
async fn edge_cardinality_read_handle(
|
||||
&self,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
table_key: &str,
|
||||
) -> Result<SnapshotHandle> {
|
||||
let branch = txn.and_then(|t| t.branch.as_deref());
|
||||
match txn.and_then(|t| t.base.entry(table_key)) {
|
||||
Some(entry) => {
|
||||
let full_path = self.storage().dataset_uri(&entry.table_path);
|
||||
self.storage()
|
||||
.open_dataset_head_for_write(table_key, &full_path, branch)
|
||||
.await
|
||||
}
|
||||
// Unreachable today (the `None` handle only reaches here under a txn whose
|
||||
// base contains the table). Defensive: resolve the table fresh (live)
|
||||
// without the schema re-validation `snapshot_for_branch` would re-run.
|
||||
None => {
|
||||
let snapshot = self.fresh_snapshot_for_branch_unchecked(branch).await?;
|
||||
self.storage().open_snapshot_at_table(&snapshot, table_key).await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn open_table_for_mutation(
|
||||
db: &Omnigraph,
|
||||
staging: &mut MutationStaging,
|
||||
branch: Option<&str>,
|
||||
table_key: &str,
|
||||
op_kind: crate::db::MutationOpKind,
|
||||
) -> Result<(SnapshotHandle, String, Option<String>)> {
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<(Option<SnapshotHandle>, String, Option<String>)> {
|
||||
if let Some(prior) = staging.inline_committed.get(table_key) {
|
||||
let path = staging.paths.get(table_key).ok_or_else(|| {
|
||||
OmniError::manifest_internal(format!(
|
||||
|
|
@ -615,6 +653,10 @@ async fn open_table_for_mutation(
|
|||
table_key
|
||||
))
|
||||
})?;
|
||||
// The inline-committed reopen does NOT validate the schema contract
|
||||
// (it reopens at the post-inline-commit Lance version directly), so it
|
||||
// takes no `txn` — threading it here would change nothing. Deletes are
|
||||
// strict ops, so this always opens (returns `Some`).
|
||||
let ds = db
|
||||
.reopen_for_mutation(
|
||||
table_key,
|
||||
|
|
@ -624,20 +666,32 @@ async fn open_table_for_mutation(
|
|||
op_kind,
|
||||
)
|
||||
.await?;
|
||||
return Ok((ds, path.full_path.clone(), path.table_branch.clone()));
|
||||
return Ok((Some(ds), path.full_path.clone(), path.table_branch.clone()));
|
||||
}
|
||||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation_on_branch(branch, table_key, op_kind)
|
||||
// `open_for_mutation_on_branch` returns the expected version even when it
|
||||
// skips the open (collapse #1, the non-strict insert/merge path): the version
|
||||
// is the pinned base's, identical to the opened handle's `.version()`. Use it
|
||||
// directly for `ensure_path` so the no-open path still captures the publisher
|
||||
// CAS fence.
|
||||
let opened = db
|
||||
.open_for_mutation_on_branch(branch, table_key, op_kind, txn)
|
||||
.await?;
|
||||
let expected_version = ds.version();
|
||||
// Pin the open-skip contract (collapse #1): a missing handle is legal ONLY on
|
||||
// the non-strict `txn` path. A future change that returns `None` elsewhere
|
||||
// (e.g. a new strict arm) trips this in debug builds rather than silently
|
||||
// handing a `None` to a `require_handle` consumer.
|
||||
debug_assert!(
|
||||
opened.handle.is_some() || (txn.is_some() && !op_kind.strict_pre_stage_version_check()),
|
||||
"open_for_mutation_on_branch returned no handle outside the non-strict txn open-skip path",
|
||||
);
|
||||
staging.ensure_path(
|
||||
table_key,
|
||||
full_path.clone(),
|
||||
table_branch.clone(),
|
||||
expected_version,
|
||||
opened.full_path.clone(),
|
||||
opened.table_branch.clone(),
|
||||
opened.expected_version,
|
||||
op_kind,
|
||||
);
|
||||
Ok((ds, full_path, table_branch))
|
||||
Ok((opened.handle, opened.full_path, opened.table_branch))
|
||||
}
|
||||
|
||||
/// D₂ parse-time check: a single mutation query is either insert/update-only
|
||||
|
|
@ -720,14 +774,14 @@ impl Omnigraph {
|
|||
params: &ParamMap,
|
||||
actor_id: Option<&str>,
|
||||
) -> Result<MutationResult> {
|
||||
self.ensure_schema_state_valid().await?;
|
||||
// Converge any pending recovery sidecar (a previously failed
|
||||
// writer's Phase B → Phase C residual) before executing: the
|
||||
// inline delete path advances Lance HEAD during execution and
|
||||
// the staged path's commit-time drift guard refuses
|
||||
// sidecar-covered drift, so a long-lived handle must heal here
|
||||
// — not at restart. One `list_dir` when no sidecars exist (the
|
||||
// steady state).
|
||||
// steady state). MUST run before `open_write_txn` below — the heal
|
||||
// may advance the manifest, so the pinned base must be captured after.
|
||||
self.heal_pending_recovery_sidecars().await?;
|
||||
let requested = Self::normalize_branch_name(branch)?;
|
||||
// Reject internal `__run__*` / system-prefixed branches at the
|
||||
|
|
@ -737,6 +791,16 @@ impl Omnigraph {
|
|||
if let Some(name) = requested.as_deref() {
|
||||
crate::db::ensure_public_branch_ref(name, "mutate")?;
|
||||
}
|
||||
// Capture-once write transaction (RFC-013 step 3b). `open_write_txn`
|
||||
// validates the schema contract ONCE (it resolves the branch target,
|
||||
// whose first line is `ensure_schema_state_valid`) and pins the base
|
||||
// snapshot for this write. Threaded as `Some(&txn)` through execution,
|
||||
// staging commit, and the manifest publish so the per-table opens and
|
||||
// the commit-time OCC re-read reuse the pinned base instead of
|
||||
// re-validating the contract at every resolve point. Captured AFTER the
|
||||
// recovery heal (which may advance the manifest) and AFTER `requested`
|
||||
// is known so it pins the post-heal snapshot for the correct branch.
|
||||
let txn = self.open_write_txn(requested.as_deref()).await?;
|
||||
let resolved_params = enrich_mutation_params(params)?;
|
||||
|
||||
// Per-query staging accumulator. Inserts and updates push batches
|
||||
|
|
@ -785,7 +849,13 @@ impl Omnigraph {
|
|||
};
|
||||
|
||||
let exec_result = self
|
||||
.execute_named_mutation(&ir, &resolved_params, requested.as_deref(), &mut staging)
|
||||
.execute_named_mutation(
|
||||
&ir,
|
||||
&resolved_params,
|
||||
requested.as_deref(),
|
||||
&mut staging,
|
||||
Some(&txn),
|
||||
)
|
||||
.await;
|
||||
|
||||
match exec_result {
|
||||
|
|
@ -799,13 +869,20 @@ impl Omnigraph {
|
|||
// interleave between our commit_staged and our publish
|
||||
// (which would correctly fail our CAS but leave Lance
|
||||
// HEAD advanced — the residual class MR-870 recovers).
|
||||
let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
|
||||
let super::staging::CommittedMutation {
|
||||
updates,
|
||||
expected_versions,
|
||||
sidecar_handle,
|
||||
guards: _queue_guards,
|
||||
committed_handles,
|
||||
} = staged
|
||||
.commit_all(
|
||||
self,
|
||||
requested.as_deref(),
|
||||
crate::db::manifest::SidecarKind::Mutation,
|
||||
actor_id,
|
||||
fork_queue_guards,
|
||||
Some(&txn),
|
||||
)
|
||||
.await?;
|
||||
// Failpoint that wedges the documented finalize→publisher
|
||||
|
|
@ -824,6 +901,8 @@ impl Omnigraph {
|
|||
&updates,
|
||||
&expected_versions,
|
||||
actor_id,
|
||||
Some(&txn),
|
||||
committed_handles,
|
||||
)
|
||||
.await?;
|
||||
// Phase C succeeded — sidecar can be deleted. If this
|
||||
|
|
@ -938,6 +1017,7 @@ impl Omnigraph {
|
|||
params: &ParamMap,
|
||||
branch: Option<&str>,
|
||||
staging: &mut MutationStaging,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<MutationResult> {
|
||||
let mut total = MutationResult::default();
|
||||
for op in &ir.ops {
|
||||
|
|
@ -946,7 +1026,7 @@ impl Omnigraph {
|
|||
type_name,
|
||||
assignments,
|
||||
} => {
|
||||
self.execute_insert(type_name, assignments, params, branch, staging)
|
||||
self.execute_insert(type_name, assignments, params, branch, staging, txn)
|
||||
.await?
|
||||
}
|
||||
MutationOpIR::Update {
|
||||
|
|
@ -954,14 +1034,16 @@ impl Omnigraph {
|
|||
assignments,
|
||||
predicate,
|
||||
} => {
|
||||
self.execute_update(type_name, assignments, predicate, params, branch, staging)
|
||||
.await?
|
||||
self.execute_update(
|
||||
type_name, assignments, predicate, params, branch, staging, txn,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
MutationOpIR::Delete {
|
||||
type_name,
|
||||
predicate,
|
||||
} => {
|
||||
self.execute_delete(type_name, predicate, params, branch, staging)
|
||||
self.execute_delete(type_name, predicate, params, branch, staging, txn)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
|
@ -978,6 +1060,7 @@ impl Omnigraph {
|
|||
params: &ParamMap,
|
||||
branch: Option<&str>,
|
||||
staging: &mut MutationStaging,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<MutationResult> {
|
||||
let mut resolved: HashMap<String, Literal> = HashMap::new();
|
||||
for a in assignments {
|
||||
|
|
@ -1025,8 +1108,12 @@ impl Omnigraph {
|
|||
} else {
|
||||
crate::db::MutationOpKind::Insert
|
||||
};
|
||||
// Node inserts are non-strict (Insert/Merge), so with a `WriteTxn`
|
||||
// this opens NOTHING (collapse #1) — the handle is discarded anyway;
|
||||
// only `ensure_path`'s captured version (read inside
|
||||
// `open_table_for_mutation`) is used downstream.
|
||||
let (_ds, _full_path, _table_branch) =
|
||||
open_table_for_mutation(self, staging, branch, &table_key, insert_kind).await?;
|
||||
open_table_for_mutation(self, staging, branch, &table_key, insert_kind, txn).await?;
|
||||
// Accumulate. @key inserts go into the Merge stream (so a
|
||||
// later update on the same id coalesces correctly); no-key
|
||||
// inserts go into the Append stream.
|
||||
|
|
@ -1059,13 +1146,16 @@ impl Omnigraph {
|
|||
)?;
|
||||
}
|
||||
let table_key = format!("edge:{}", type_name);
|
||||
// Capture pre-write metadata on first touch (no Lance write).
|
||||
let (ds, _full_path, _table_branch) = open_table_for_mutation(
|
||||
// Capture pre-write metadata on first touch. Edge inserts are
|
||||
// non-strict, so with a `WriteTxn` this opens NOTHING (collapse #1)
|
||||
// and returns `None`.
|
||||
let (handle, _full_path, _table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
&table_key,
|
||||
crate::db::MutationOpKind::Insert,
|
||||
txn,
|
||||
)
|
||||
.await?;
|
||||
// Accumulate the new edge row. Edge IDs are ULID-generated so
|
||||
|
|
@ -1075,9 +1165,27 @@ impl Omnigraph {
|
|||
// Edge cardinality validation: scan committed edges via Lance
|
||||
// + iterate pending edges in-memory for the `src` column,
|
||||
// group-by-src. The pending side already includes the row
|
||||
// we just appended (above).
|
||||
validate_edge_cardinality_with_pending(self, &ds, staging, &table_key, edge_type)
|
||||
// we just appended (above). When the open was skipped (collapse
|
||||
// #1), resolve a read handle for the committed scan at LIVE HEAD
|
||||
// (`edge_cardinality_read_handle`, #298) — NOT the pinned txn.base,
|
||||
// which would undercount edges a concurrent writer committed since
|
||||
// capture. Only when cardinality is non-default, so the common
|
||||
// default-cardinality edge keeps the open-free path. (The residual
|
||||
// validate→commit race is the §7.1 gap — step 4.)
|
||||
if !edge_type.cardinality.is_default() {
|
||||
let committed_ds = match handle {
|
||||
Some(h) => h,
|
||||
None => self.edge_cardinality_read_handle(txn, &table_key).await?,
|
||||
};
|
||||
validate_edge_cardinality_with_pending(
|
||||
self,
|
||||
&committed_ds,
|
||||
staging,
|
||||
&table_key,
|
||||
edge_type,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
self.invalidate_graph_index().await;
|
||||
|
||||
|
|
@ -1098,6 +1206,7 @@ impl Omnigraph {
|
|||
params: &ParamMap,
|
||||
branch: Option<&str>,
|
||||
staging: &mut MutationStaging,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<MutationResult> {
|
||||
// Defense in depth: ensure this is a node type
|
||||
if !self.catalog().node_types.contains_key(type_name) {
|
||||
|
|
@ -1122,14 +1231,18 @@ impl Omnigraph {
|
|||
let blob_props = self.catalog().node_types[type_name].blob_properties.clone();
|
||||
|
||||
let table_key = format!("node:{}", type_name);
|
||||
let (ds, _full_path, _table_branch) = open_table_for_mutation(
|
||||
let (handle, _full_path, _table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
&table_key,
|
||||
crate::db::MutationOpKind::Update,
|
||||
txn,
|
||||
)
|
||||
.await?;
|
||||
// Update is a STRICT op, so collapse #1 never skips its open — the
|
||||
// handle is always `Some` (and it's needed for the committed scan below).
|
||||
let ds = handle.expect("strict Update op always opens its dataset");
|
||||
|
||||
// Scan committed via Lance + apply the same predicate to pending
|
||||
// batches via DataFusion `MemTable` (read-your-writes for prior
|
||||
|
|
@ -1228,13 +1341,14 @@ impl Omnigraph {
|
|||
params: &ParamMap,
|
||||
branch: Option<&str>,
|
||||
staging: &mut MutationStaging,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<MutationResult> {
|
||||
let is_node = self.catalog().node_types.contains_key(type_name);
|
||||
if is_node {
|
||||
self.execute_delete_node(type_name, predicate, params, branch, staging)
|
||||
self.execute_delete_node(type_name, predicate, params, branch, staging, txn)
|
||||
.await
|
||||
} else {
|
||||
self.execute_delete_edge(type_name, predicate, params, branch, staging)
|
||||
self.execute_delete_edge(type_name, predicate, params, branch, staging, txn)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
@ -1246,18 +1360,22 @@ impl Omnigraph {
|
|||
params: &ParamMap,
|
||||
branch: Option<&str>,
|
||||
staging: &mut MutationStaging,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<MutationResult> {
|
||||
let pred_sql = predicate_to_sql(predicate, params, false)?;
|
||||
|
||||
let table_key = format!("node:{}", type_name);
|
||||
let (ds, full_path, table_branch) = open_table_for_mutation(
|
||||
let (handle, full_path, table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
&table_key,
|
||||
crate::db::MutationOpKind::Delete,
|
||||
txn,
|
||||
)
|
||||
.await?;
|
||||
// Delete is a STRICT op, so collapse #1 never skips its open.
|
||||
let ds = handle.expect("strict Delete op always opens its dataset");
|
||||
let initial_version = ds.version();
|
||||
|
||||
// Scan matching IDs for cascade. Per D₂ this never overlaps with
|
||||
|
|
@ -1347,14 +1465,17 @@ impl Omnigraph {
|
|||
|
||||
let edge_table_key = format!("edge:{}", edge_name);
|
||||
let cascade_filter = cascade_filters.join(" OR ");
|
||||
let (edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation(
|
||||
let (edge_handle, edge_full_path, edge_table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
&edge_table_key,
|
||||
crate::db::MutationOpKind::Delete,
|
||||
txn,
|
||||
)
|
||||
.await?;
|
||||
// Delete is a STRICT op, so collapse #1 never skips its open.
|
||||
let edge_ds = edge_handle.expect("strict Delete op always opens its dataset");
|
||||
|
||||
let (_new_edge_ds, edge_delete) = self
|
||||
.storage_inline_residual()
|
||||
|
|
@ -1391,18 +1512,22 @@ impl Omnigraph {
|
|||
params: &ParamMap,
|
||||
branch: Option<&str>,
|
||||
staging: &mut MutationStaging,
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<MutationResult> {
|
||||
let pred_sql = predicate_to_sql(predicate, params, true)?;
|
||||
|
||||
let table_key = format!("edge:{}", type_name);
|
||||
let (ds, full_path, table_branch) = open_table_for_mutation(
|
||||
let (handle, full_path, table_branch) = open_table_for_mutation(
|
||||
self,
|
||||
staging,
|
||||
branch,
|
||||
&table_key,
|
||||
crate::db::MutationOpKind::Delete,
|
||||
txn,
|
||||
)
|
||||
.await?;
|
||||
// Delete is a STRICT op, so collapse #1 never skips its open.
|
||||
let ds = handle.expect("strict Delete op always opens its dataset");
|
||||
|
||||
let (_new_ds, delete_state) = self
|
||||
.storage_inline_residual()
|
||||
|
|
|
|||
|
|
@ -440,6 +440,26 @@ struct StagedTableEntry {
|
|||
staged_write: StagedHandle,
|
||||
}
|
||||
|
||||
/// Output of [`StagedMutation::commit_all`] (Phase B): the publisher's input plus
|
||||
/// the queue guards the caller must hold across the manifest publish.
|
||||
pub(crate) struct CommittedMutation {
|
||||
/// Per-table updates to publish to the manifest.
|
||||
pub(crate) updates: Vec<SubTableUpdate>,
|
||||
/// Per-table manifest pins refreshed under the write queue — the publisher's CAS fence.
|
||||
pub(crate) expected_versions: HashMap<String, u64>,
|
||||
/// Recovery sidecar to delete after Phase C succeeds (`None` when nothing staged).
|
||||
pub(crate) sidecar_handle: Option<RecoverySidecarHandle>,
|
||||
/// Per-`(table, branch)` write-queue guards — the caller MUST hold these across
|
||||
/// the manifest publish (see `commit_all`) so no writer interleaves between
|
||||
/// `commit_staged` and the publish.
|
||||
pub(crate) guards: Vec<tokio::sync::OwnedMutexGuard<()>>,
|
||||
/// Post-`commit_staged` handle per STAGED table (table_key → handle at the
|
||||
/// just-committed version). Carried out (RFC-013 step 3b, collapse #4) so the
|
||||
/// publish-prepare index build reuses it instead of a fresh `reopen_for_mutation`
|
||||
/// at the same version. Inline-committed / delete tables are absent (no staged handle).
|
||||
pub(crate) committed_handles: HashMap<String, SnapshotHandle>,
|
||||
}
|
||||
|
||||
impl StagedMutation {
|
||||
/// **Phase B** of the two-phase commit: acquire per-`(table_key,
|
||||
/// branch)` queues, revalidate manifest pins, write the recovery
|
||||
|
|
@ -485,12 +505,8 @@ impl StagedMutation {
|
|||
Vec<(String, Option<String>)>,
|
||||
Vec<tokio::sync::OwnedMutexGuard<()>>,
|
||||
)>,
|
||||
) -> Result<(
|
||||
Vec<SubTableUpdate>,
|
||||
HashMap<String, u64>,
|
||||
Option<RecoverySidecarHandle>,
|
||||
Vec<tokio::sync::OwnedMutexGuard<()>>,
|
||||
)> {
|
||||
txn: Option<&crate::db::WriteTxn>,
|
||||
) -> Result<CommittedMutation> {
|
||||
let StagedMutation {
|
||||
inline_committed,
|
||||
mut staged,
|
||||
|
|
@ -585,7 +601,18 @@ impl StagedMutation {
|
|||
// Multi-coordinator deployments (§VI.27 aspirational) get
|
||||
// genuine cross-process drift detection from this read for
|
||||
// free.
|
||||
let snapshot = db.fresh_snapshot_for_branch(branch).await?;
|
||||
//
|
||||
// This MUST be a FRESH per-branch manifest read (never the warm
|
||||
// cache) for the OCC re-capture below — but with a `WriteTxn` the
|
||||
// schema contract was already validated at capture, so use the
|
||||
// `_unchecked` variant, which drops the redundant
|
||||
// `ensure_schema_state_valid` AND the commit-graph load the OCC read
|
||||
// never consults (a fresh manifest read yields the same `Snapshot`).
|
||||
// Without a txn this is byte-identical to the prior checked call.
|
||||
let snapshot = match txn {
|
||||
Some(_) => db.fresh_snapshot_for_branch_unchecked(branch).await?,
|
||||
None => db.fresh_snapshot_for_branch(branch).await?,
|
||||
};
|
||||
for entry in staged.iter_mut() {
|
||||
let current = snapshot
|
||||
.entry(&entry.table_key)
|
||||
|
|
@ -619,15 +646,20 @@ impl StagedMutation {
|
|||
// live Lance HEAD still equals that manifest pin. If an external
|
||||
// raw Lance write or a pre-fix maintenance path moved HEAD without
|
||||
// publishing `__manifest`, this write must not silently fold it.
|
||||
let head = db
|
||||
.storage()
|
||||
.open_dataset_head_for_write(
|
||||
&entry.table_key,
|
||||
&entry.path.full_path,
|
||||
entry.path.table_branch.as_deref(),
|
||||
)
|
||||
.await?
|
||||
.version();
|
||||
//
|
||||
// `latest_version_id` reads the latest manifest pointer off the
|
||||
// already-open staged handle (the #2 staging open) WITHOUT a fresh
|
||||
// `Dataset::open` — the same cheap live-HEAD probe
|
||||
// `ManifestCoordinator::probe_latest_version` uses. This replaces a
|
||||
// redundant `open_dataset_head_for_write` (RFC-013 step 3b, collapse
|
||||
// #3): the drift comparison below is byte-identical; only how `head`
|
||||
// is obtained changes (probe vs cold open).
|
||||
let head = entry
|
||||
.dataset
|
||||
.dataset()
|
||||
.latest_version_id()
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
if head < current {
|
||||
return Err(OmniError::manifest_internal(format!(
|
||||
"table '{}' Lance HEAD version {} is behind manifest version {}",
|
||||
|
|
@ -786,6 +818,12 @@ impl StagedMutation {
|
|||
|
||||
let mut updates: Vec<SubTableUpdate> = inline_committed.into_values().collect();
|
||||
|
||||
// Carry each staged table's post-`commit_staged` handle out so the
|
||||
// publish-prepare index build reuses it (collapse #4) instead of
|
||||
// re-opening the dataset at the same just-committed version.
|
||||
let mut committed_handles: HashMap<String, SnapshotHandle> =
|
||||
HashMap::with_capacity(staged.len());
|
||||
|
||||
for entry in staged {
|
||||
let StagedTableEntry {
|
||||
table_key,
|
||||
|
|
@ -798,15 +836,22 @@ impl StagedMutation {
|
|||
let new_ds = db.storage().commit_staged(dataset, staged_write).await?;
|
||||
let state = db.storage().table_state(&path.full_path, &new_ds).await?;
|
||||
updates.push(SubTableUpdate {
|
||||
table_key,
|
||||
table_key: table_key.clone(),
|
||||
table_version: state.version,
|
||||
table_branch: path.table_branch.clone(),
|
||||
row_count: state.row_count,
|
||||
version_metadata: state.version_metadata,
|
||||
});
|
||||
committed_handles.insert(table_key, new_ds);
|
||||
}
|
||||
|
||||
Ok((updates, expected_versions, sidecar_handle, guards))
|
||||
Ok(CommittedMutation {
|
||||
updates,
|
||||
expected_versions,
|
||||
sidecar_handle,
|
||||
guards,
|
||||
committed_handles,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,6 +43,23 @@ pub struct QueryIoProbes {
|
|||
/// handle cache (Fix 3) serves them.
|
||||
pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
pub probe_count: Arc<AtomicU64>,
|
||||
/// Counts DATA-table open CALLS through the two instrumented chokepoints
|
||||
/// (`open_dataset_tracked` / `open_table_dataset`), classified by URI so the
|
||||
/// internal/system tables (`__manifest`, `_graph_commits*`) are EXCLUDED — the
|
||||
/// publisher CAS and commit-graph append open those every write, and counting
|
||||
/// them would make the `data_open_count <= |touched_tables|` write gate
|
||||
/// (RFC-013 step 3b) unreachable by threading alone. Unlike the opener-read
|
||||
/// term (which mixes with the merge-insert/RI scan on the write path), this is
|
||||
/// an exact open-invocation count. `forbidden_apis` keeps engine code OUTSIDE the
|
||||
/// storage layer (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) from opening
|
||||
/// datasets except through these chokepoints, so the count is complete for the
|
||||
/// keyed-write data path the gate measures. (`table_store.rs` is allow-listed and
|
||||
/// does hold direct `Dataset::open`s — but only for branch-management ops
|
||||
/// (`delete_branch`/`list_branches`/`force_delete_branch`), never that hot path.)
|
||||
pub data_open_count: Arc<AtomicU64>,
|
||||
/// Internal/system-table (`__manifest`, `_graph_commits*`) open CALLS — the
|
||||
/// complement of `data_open_count`, kept for symmetry and debugging.
|
||||
pub internal_open_count: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
tokio::task_local! {
|
||||
|
|
@ -80,6 +97,39 @@ pub(crate) fn record_probe() {
|
|||
let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
|
||||
}
|
||||
|
||||
/// Internal/system table directory names. An open of one of these is a metadata
|
||||
/// open (publisher CAS, commit-graph append, recovery audit), NOT a data-table
|
||||
/// open. Kept in sync with the dir constants in `db/manifest/layout.rs`,
|
||||
/// `db/commit_graph.rs`, and `db/recovery_audit.rs`.
|
||||
const INTERNAL_TABLE_DIRS: [&str; 4] = [
|
||||
"__manifest",
|
||||
"_graph_commits.lance",
|
||||
"_graph_commit_actors.lance",
|
||||
"_graph_commit_recoveries.lance",
|
||||
];
|
||||
|
||||
/// True when `uri`'s last path segment names an internal/system table.
|
||||
fn open_is_internal(uri: &str) -> bool {
|
||||
let trimmed = uri.trim_end_matches('/');
|
||||
let last = trimmed.rsplit('/').next().unwrap_or(trimmed);
|
||||
INTERNAL_TABLE_DIRS.contains(&last)
|
||||
}
|
||||
|
||||
/// Record one table-open call against the active per-query probes, classified by
|
||||
/// table class (the URI's last segment) so the write gate counts DATA-table opens
|
||||
/// only and ignores the publisher/commit-graph metadata opens. No-op in production
|
||||
/// (the classification runs only inside the probe closure, which `current` skips
|
||||
/// when no probes are installed). Called at both open chokepoints.
|
||||
pub(crate) fn record_open(uri: &str) {
|
||||
let _ = current(|p| {
|
||||
if open_is_internal(uri) {
|
||||
p.internal_open_count.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
p.data_open_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Per-operation staged-write counts, installed for a task via
|
||||
/// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write
|
||||
/// primitive an operation invokes — e.g. that an append-only fast-forward merge
|
||||
|
|
@ -177,6 +227,7 @@ pub(crate) async fn open_dataset_tracked(
|
|||
uri: &str,
|
||||
wrapper: Option<Arc<dyn WrappingObjectStore>>,
|
||||
) -> Result<Dataset> {
|
||||
record_open(uri);
|
||||
let result = match wrapper {
|
||||
None => Dataset::open(uri).await,
|
||||
Some(wrapper) => {
|
||||
|
|
@ -203,6 +254,7 @@ pub(crate) async fn open_table_dataset(
|
|||
version: u64,
|
||||
session: Option<&Arc<lance::session::Session>>,
|
||||
) -> Result<Dataset> {
|
||||
record_open(location);
|
||||
let mut builder = DatasetBuilder::from_uri(location).with_version(version);
|
||||
if let Some(session) = session {
|
||||
builder = builder.with_session(session.clone());
|
||||
|
|
|
|||
|
|
@ -187,7 +187,10 @@ impl Omnigraph {
|
|||
&omnigraph_policy::ResourceScope::Branch(branch.to_string()),
|
||||
actor_id,
|
||||
)?;
|
||||
self.ensure_schema_state_valid().await?;
|
||||
// Schema-contract validation is captured ONCE per write via the
|
||||
// `WriteTxn` opened in `load_jsonl_reader` (after branch resolution).
|
||||
// The redundant `ensure_schema_state_valid` that used to run here is
|
||||
// subsumed by `open_write_txn`'s `resolved_branch_target` call.
|
||||
// Converge any pending recovery sidecar (a previously failed
|
||||
// writer's Phase B → Phase C residual) before staging anything:
|
||||
// without this, sidecar-covered drift wedges every load on the
|
||||
|
|
@ -397,7 +400,16 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
// inline path.
|
||||
|
||||
let mut result = LoadResult::default();
|
||||
let snapshot = db.snapshot_for_branch(branch).await?;
|
||||
// Capture-once write transaction (RFC-013 step 3b). `open_write_txn`
|
||||
// validates the schema contract ONCE and pins the base snapshot. Threaded
|
||||
// as `Some(&txn)` through the per-table opens and the manifest publish so
|
||||
// each resolve point reuses the pinned base instead of re-validating the
|
||||
// contract. The branch already exists here (fork-if-missing ran in
|
||||
// `load_as` before this), so this captures the post-fork snapshot. The
|
||||
// load's own base read (`db.snapshot_for_branch` previously) is the same
|
||||
// per-branch snapshot, so reuse `txn.base` for it — dropping a validation.
|
||||
let txn = db.open_write_txn(branch).await?;
|
||||
let snapshot = txn.base.clone();
|
||||
let mut staging = MutationStaging::default();
|
||||
let pending_mode = match mode {
|
||||
LoadMode::Merge => PendingMode::Merge,
|
||||
|
|
@ -481,15 +493,18 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
// Phase 2b: accumulate every node type in memory. Fragment writes are
|
||||
// delayed until after all validation succeeds.
|
||||
for (type_name, table_key, batch, loaded_count) in prepared_nodes {
|
||||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
|
||||
// The loader only needs the captured expected version (the publisher's
|
||||
// CAS fence) for `ensure_path` — it discards the handle. With a
|
||||
// non-strict load op (Merge/Append) and a `WriteTxn`, collapse #1 skips
|
||||
// the dataset open and returns the pinned base version directly.
|
||||
let opened = db
|
||||
.open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn))
|
||||
.await?;
|
||||
let expected_version = ds.version();
|
||||
staging.ensure_path(
|
||||
&table_key,
|
||||
full_path,
|
||||
table_branch,
|
||||
expected_version,
|
||||
opened.full_path,
|
||||
opened.table_branch,
|
||||
opened.expected_version,
|
||||
load_op_kind,
|
||||
);
|
||||
let schema = batch.schema();
|
||||
|
|
@ -553,15 +568,16 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
|
||||
// Phase 2e: accumulate every edge type. Same dispatch as Phase 2b.
|
||||
for (edge_name, table_key, batch, loaded_count) in prepared_edges {
|
||||
let (ds, full_path, table_branch) = db
|
||||
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
|
||||
// Same as the node phase: only the captured expected version is used;
|
||||
// collapse #1 skips the open for a non-strict load op under a `WriteTxn`.
|
||||
let opened = db
|
||||
.open_for_mutation_on_branch(branch, &table_key, load_op_kind, Some(&txn))
|
||||
.await?;
|
||||
let expected_version = ds.version();
|
||||
staging.ensure_path(
|
||||
&table_key,
|
||||
full_path,
|
||||
table_branch,
|
||||
expected_version,
|
||||
opened.full_path,
|
||||
opened.table_branch,
|
||||
opened.expected_version,
|
||||
load_op_kind,
|
||||
);
|
||||
let schema = batch.schema();
|
||||
|
|
@ -589,13 +605,20 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
// `_queue_guards` holds per-(table_key, branch) write queues
|
||||
// across the manifest publish below — see exec/mutation.rs for
|
||||
// the rationale (interleaving prevention).
|
||||
let (updates, expected_versions, sidecar_handle, _queue_guards) = staged
|
||||
let crate::exec::staging::CommittedMutation {
|
||||
updates,
|
||||
expected_versions,
|
||||
sidecar_handle,
|
||||
guards: _queue_guards,
|
||||
committed_handles,
|
||||
} = staged
|
||||
.commit_all(
|
||||
db,
|
||||
branch,
|
||||
crate::db::manifest::SidecarKind::Load,
|
||||
actor_id,
|
||||
fork_queue_guards,
|
||||
Some(&txn),
|
||||
)
|
||||
.await?;
|
||||
// Same finalize → publisher residual as mutations: per-table
|
||||
|
|
@ -603,8 +626,15 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
// publish has not run yet. Reuse the mutation failpoint name so
|
||||
// one failpoint pins the shared `MutationStaging` boundary.
|
||||
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
|
||||
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions, actor_id)
|
||||
.await?;
|
||||
db.commit_updates_on_branch_with_expected(
|
||||
branch,
|
||||
&updates,
|
||||
&expected_versions,
|
||||
actor_id,
|
||||
Some(&txn),
|
||||
committed_handles,
|
||||
)
|
||||
.await?;
|
||||
// The recovery sidecar protects the per-table commit_staged →
|
||||
// manifest publish window. Phase C succeeded — clean up
|
||||
// best-effort: failing the user here would error out a write
|
||||
|
|
|
|||
|
|
@ -58,6 +58,14 @@ pub struct IoCounts {
|
|||
pub commit_graph_reads: u64,
|
||||
/// Version-probe invocations (the cheap freshness check).
|
||||
pub version_probes: u64,
|
||||
/// DATA-table open CALL count through the two instrumented chokepoints — an
|
||||
/// exact open-invocation count (not the opener-read term), classified by URI so
|
||||
/// internal/system-table opens are excluded. Step-3b target:
|
||||
/// `data_open_count <= |touched_tables|` for a write.
|
||||
pub data_open_count: u64,
|
||||
/// Internal/system-table (`__manifest`, `_graph_commits*`) open CALL count —
|
||||
/// the complement of `data_open_count` (publisher CAS + commit-graph append).
|
||||
pub internal_open_count: u64,
|
||||
}
|
||||
|
||||
impl IoCounts {
|
||||
|
|
@ -225,6 +233,8 @@ struct ProbeHandles {
|
|||
commit_graph: IOTracker,
|
||||
table: PrefixCounter,
|
||||
probe_count: Arc<AtomicU64>,
|
||||
data_open_count: Arc<AtomicU64>,
|
||||
internal_open_count: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl ProbeHandles {
|
||||
|
|
@ -234,6 +244,8 @@ impl ProbeHandles {
|
|||
commit_graph: IOTracker::default(),
|
||||
table: PrefixCounter::default(),
|
||||
probe_count: Arc::new(AtomicU64::new(0)),
|
||||
data_open_count: Arc::new(AtomicU64::new(0)),
|
||||
internal_open_count: Arc::new(AtomicU64::new(0)),
|
||||
};
|
||||
let probes = QueryIoProbes {
|
||||
manifest_wrapper: Some(Arc::new(h.manifest.clone()) as Arc<dyn WrappingObjectStore>),
|
||||
|
|
@ -242,6 +254,8 @@ impl ProbeHandles {
|
|||
),
|
||||
table_wrapper: Some(Arc::new(h.table.clone()) as Arc<dyn WrappingObjectStore>),
|
||||
probe_count: Arc::clone(&h.probe_count),
|
||||
data_open_count: Arc::clone(&h.data_open_count),
|
||||
internal_open_count: Arc::clone(&h.internal_open_count),
|
||||
};
|
||||
(probes, h)
|
||||
}
|
||||
|
|
@ -256,6 +270,8 @@ impl ProbeHandles {
|
|||
manifest_reads: self.manifest.stats().read_iops,
|
||||
commit_graph_reads: self.commit_graph.stats().read_iops,
|
||||
version_probes: self.probe_count.load(Ordering::Relaxed),
|
||||
data_open_count: self.data_open_count.load(Ordering::Relaxed),
|
||||
internal_open_count: self.internal_open_count.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -237,6 +237,58 @@ async fn cardinality_rejected_on_mutation_insert_edge() {
|
|||
);
|
||||
}
|
||||
|
||||
/// RFC-013 step 3b regression guard (cursor High / codex P1 on #298): edge `@card`
|
||||
/// validation must scan LIVE committed HEAD, not the pinned `txn.base`. Collapse #1
|
||||
/// skips the edge accumulation open, so a non-strict edge insert under a `WriteTxn`
|
||||
/// reopens for the cardinality scan — and that scan must observe edges a concurrent
|
||||
/// writer committed after this mutation captured its base, or a `@card` max is
|
||||
/// silently exceeded (invariant 9). The residual validate→commit TOCTOU is the §7.1
|
||||
/// gap (step 4); this only un-widens what 3b widened (live HEAD vs mutation-start base).
|
||||
///
|
||||
/// Deterministic — no failpoint: handle B's coordinator is stale by construction
|
||||
/// (the write path does not probe the manifest version, unlike the read path). B MUST
|
||||
/// NOT read between A's commit and B's insert — a read refreshes B's coordinator and
|
||||
/// masks the bug (the same caveat as the served stale-view repro in `writes.rs`).
|
||||
#[tokio::test]
|
||||
async fn cardinality_rejected_for_stale_handle_after_concurrent_edge_commit() {
|
||||
let (dir, mut db_a) = init_with(CARDINALITY_SCHEMA, CARDINALITY_SEED).await;
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
// Handle B opens the same graph at the seed version (no edges yet); it then
|
||||
// never reads again, so its in-memory coordinator stays pinned at the seed.
|
||||
let mut db_b = Omnigraph::open(uri).await.unwrap();
|
||||
|
||||
// Handle A commits WorksAt(Alice -> Acme): Alice is now at the @card(0..1) max.
|
||||
// This advances the on-disk manifest; B's coordinator is now stale.
|
||||
mutate_main(
|
||||
&mut db_a,
|
||||
CARDINALITY_MUTATIONS,
|
||||
"add_employment",
|
||||
¶ms(&[("$person", "Alice"), ("$company", "Acme")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Handle B (stale, never read since A committed) inserts a second WorksAt for
|
||||
// Alice. B is non-strict + under a WriteTxn, so collapse #1 skips the open and the
|
||||
// cardinality scan reopens: it MUST read live HEAD (Alice has 1) → reject (1+1 > 1),
|
||||
// not the stale base (Alice has 0) → which would wrongly pass and commit a 2nd edge.
|
||||
let err = mutate_main(
|
||||
&mut db_b,
|
||||
CARDINALITY_MUTATIONS,
|
||||
"add_employment",
|
||||
¶ms(&[("$person", "Alice"), ("$company", "Beta")]),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
err.to_string().to_lowercase().contains("cardinality")
|
||||
|| err.to_string().to_lowercase().contains("@card"),
|
||||
"a stale-handle edge insert must be rejected by @card against live HEAD, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cardinality_rejected_on_jsonl_load() {
|
||||
// Already covered by existing loader Phase 3 logic but assert the
|
||||
|
|
|
|||
|
|
@ -24,10 +24,10 @@
|
|||
mod helpers;
|
||||
|
||||
use helpers::cost::{
|
||||
IoCounts, assert_flat, assert_grows, local_graph, measure_insert, measure_insert_as,
|
||||
IoCounts, assert_flat, assert_grows, local_graph, measure, measure_insert, measure_insert_as,
|
||||
measure_with_staged,
|
||||
};
|
||||
use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, mixed_params};
|
||||
use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixed_params};
|
||||
|
||||
// ── (A) The internal-table LOCK — the acceptance test for step 2 (compaction) ──
|
||||
//
|
||||
|
|
@ -169,3 +169,86 @@ async fn keyed_insert_routes_through_merge_insert_only() {
|
|||
assert_eq!(staged.stage_append, 0, "keyed insert must not stage_append");
|
||||
assert_eq!(staged.create_vector_index, 0, "no inline vector-index build on a plain insert");
|
||||
}
|
||||
|
||||
// ── (D) Step-3b capture-once fitness asserts (RED today → GREEN after WriteTxn) ──
|
||||
|
||||
/// A write must validate the schema contract EXACTLY ONCE (3 `read_text` + 2 `exists`).
|
||||
/// Today the write path re-validates at every resolve point (entry, per-table
|
||||
/// `resolved_branch_target`, commit-time `fresh_snapshot_for_branch`), so the delta is
|
||||
/// a multiple of that. Step 3b's `WriteTxn` validates once and threads it. The shape is
|
||||
/// the write twin of `warm_read_cost.rs::warm_query_validates_schema_contract_once`,
|
||||
/// built with ZERO production change via the counting storage adapter.
|
||||
#[tokio::test]
|
||||
async fn write_validates_schema_contract_once() {
|
||||
use omnigraph::instrumentation::CountingStorageAdapter;
|
||||
use omnigraph::storage::storage_for_uri;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let _ = init_and_load(&dir).await;
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
let (adapter, counts) = CountingStorageAdapter::new(storage_for_uri(uri).unwrap());
|
||||
let db = omnigraph::db::Omnigraph::open_with_storage(uri, adapter)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let before_read_text = counts.read_text();
|
||||
let before_exists = counts.exists();
|
||||
db.mutate(
|
||||
"main",
|
||||
MUTATION_QUERIES,
|
||||
"insert_person",
|
||||
&mixed_params(&[("$name", "schema_once")], &[("$age", 30)]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let read_text_delta = counts.read_text() - before_read_text;
|
||||
let exists_delta = counts.exists() - before_exists;
|
||||
eprintln!("schema-contract reads on one write: read_text={read_text_delta} exists={exists_delta}");
|
||||
assert_eq!(
|
||||
read_text_delta, 3,
|
||||
"a write must validate the schema contract once (3 reads), not N times",
|
||||
);
|
||||
assert_eq!(
|
||||
exists_delta, 2,
|
||||
"a write must probe contract-file existence once (2 probes), not N times",
|
||||
);
|
||||
}
|
||||
|
||||
/// A keyed single-table write must open its DATA table AT MOST ONCE. Today it opens
|
||||
/// ~4× (accumulation, staging, commit drift-guard, publish-prepare/index-build), each
|
||||
/// a fresh cold `Dataset::open`. Step 3b opens the base once (a *session-aware* base
|
||||
/// open is deferred to step 5), threads the commit-return handle, and replaces the
|
||||
/// drift-guard open with a cheap `latest_version_id` probe — collapsing to 1 open.
|
||||
/// Counted by `data_open_count`, the
|
||||
/// table-class-scoped chokepoint probe: the internal-table opens (publisher CAS +
|
||||
/// commit-graph append) are EXCLUDED, since they are unrelated to data-table reuse and
|
||||
/// would otherwise keep this count >1 regardless of threading. (`forbidden_apis` keeps
|
||||
/// engine code outside the storage layer from opening datasets except through the
|
||||
/// instrumented chokepoints — `table_store.rs`'s own direct opens are branch-management
|
||||
/// ops, not this keyed-write path.)
|
||||
#[tokio::test]
|
||||
async fn keyed_insert_opens_table_at_most_once() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = local_graph(&dir).await;
|
||||
let io = {
|
||||
let (res, io) = measure(db.mutate(
|
||||
"main",
|
||||
MUTATION_QUERIES,
|
||||
"insert_person",
|
||||
&mixed_params(&[("$name", "opens")], &[("$age", 30)]),
|
||||
))
|
||||
.await;
|
||||
res.unwrap();
|
||||
io
|
||||
};
|
||||
eprintln!(
|
||||
"data_open_count={} internal_open_count={} for a single-table keyed insert",
|
||||
io.data_open_count, io.internal_open_count
|
||||
);
|
||||
assert!(
|
||||
io.data_open_count <= 1,
|
||||
"a keyed single-table write must open its data table at most once, got {}",
|
||||
io.data_open_count,
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue