Merge pull request #68 from ModernRelay/ragnorc/mr794-rewire

MR-794 step 2: in-memory accumulator rewire for mutate_as + load
This commit is contained in:
Ragnor Comerford 2026-05-01 23:06:10 +02:00 committed by GitHub
commit 6f60c0cbcf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 3093 additions and 697 deletions

View file

@ -161,6 +161,16 @@ jobs:
OMNIGRAPH_UPDATE_OPENAPI: ${{ (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) && '1' || '' }}
run: cargo test --workspace --locked
- name: Run failpoints feature test
if: needs.classify_changes.outputs.run_full_ci == 'true'
# Run after the workspace test so the build cache is warm —
# enabling --features failpoints is just an incremental rebuild
# of omnigraph-engine + the small `fail` crate, not the full
# dep tree (lance, datafusion). A separate job with its own
# cache key would be a fresh ~20min build on first run; this
# is ~30s on a warm cache.
run: cargo test --locked -p omnigraph-engine --features failpoints --test failpoints
- name: Commit regenerated openapi.json to PR branch
if: |
needs.classify_changes.outputs.run_full_ci == 'true' &&

View file

@ -32,7 +32,7 @@ OmniGraph is a typed property-graph engine built as a coordination layer over ma
- **Languages**: a `.pg` schema language and a `.gq` query language, both Pest-based, with a typed IR.
- **Multi-modal querying**: vector ANN (`nearest`), full-text (`search`/`fuzzy`/`match_text`/`bm25`), Reciprocal Rank Fusion (`rrf`), and graph traversal (`Expand`, anti-join `not { … }`) in one runtime.
- **Branches and commits across the whole graph**: Git-style — every successful publish appends to a commit DAG; merges are three-way at the row level.
- **Atomic per-query writes**: `mutate_as` and `load` capture per-table `expected_table_versions` before writing and call `ManifestBatchPublisher::publish` once at the end. Cross-table OCC enforced inside the publisher's row-level CAS; no staging branches, no run state machine.
- **Atomic per-query writes**: `mutate_as` and `load` accumulate insert/update batches into an in-memory `MutationStaging.pending` per touched table; one `stage_*` + `commit_staged` per table runs at end-of-query, then `ManifestBatchPublisher::publish` commits the manifest atomically with per-table `expected_table_versions` CAS. A mid-query failure leaves Lance HEAD untouched on staged tables — no drift, no run state machine, no staging branches. Deletes still inline-commit; D₂ at parse time prevents inserts/updates and deletes from coexisting in one query.
- **HTTP server**: Axum + utoipa OpenAPI, bearer auth (SHA-256 hashed, optional AWS Secrets Manager), Cedar policy gating.
- **CLI** driven by a single `omnigraph.yaml`; multi-format output (json/jsonl/csv/kv/table).
@ -211,7 +211,7 @@ omnigraph policy explain --actor act-alice --action change --branch main
| Query language | — | `.gq` + Pest grammar + IR + lowering + linter |
| Schema migration planning | — | `plan_schema_migration` + `apply_schema` step types + `__schema_apply_lock__` |
| Commit graph (DAG) across whole repo | — | `_graph_commits.lance` with linear + merge parents, ULID ids, actor map |
| Per-query atomic writes | — | `MutationStaging` accumulator + `commit_with_expected` publisher CAS, single commit per `mutate_as` / `load` |
| Per-query atomic writes | — | In-memory `MutationStaging.pending` accumulator + `stage_*` / `commit_staged` per touched table at end-of-query + publisher CAS via `commit_with_expected` (single manifest commit per `mutate_as` / `load`); D₂ parse-time rule keeps inserts/updates and deletes from mixing |
| Three-way row-level merge | — | `OrderedTableCursor` + `StagedTableWriter`, structured `MergeConflictKind` |
| Change feeds | — | `diff_between` / `diff_commits` with manifest fast path + ID streaming |
| Cedar policy | — | 8 actions, branch / target_branch / protected scopes, validate/test/explain CLI |

1
Cargo.lock generated
View file

@ -4647,6 +4647,7 @@ dependencies = [
"async-trait",
"base64",
"chrono",
"datafusion",
"fail",
"futures",
"lance",

View file

@ -20,6 +20,7 @@ arrow-select = "57"
arrow-cast = { version = "57", features = ["prettyprint"] }
arrow-ord = "57"
datafusion = { version = "52", default-features = false }
datafusion-physical-plan = "52"
datafusion-physical-expr = "52"
datafusion-execution = "52"

View file

@ -1905,7 +1905,7 @@ fn cli_fails_for_invalid_merge_requests() {
);
}
// MR-771: `omnigraph run list/show/publish/abort` subcommands removed
// `omnigraph run list/show/publish/abort` subcommands removed
// alongside the run state machine. Direct-to-target writes leave nothing
// for these CLIs to manage. Audit history is now visible via
// `omnigraph commit list` reading the commit graph.

View file

@ -307,7 +307,7 @@ fn local_cli_end_to_end_branch_change_merge_flow() {
assert_eq!(main_read["row_count"], 1);
assert_eq!(main_read["rows"][0]["p.name"], "Zoe");
// MR-771: `omnigraph run list` removed. Audit visible via commit list.
// `omnigraph run list` removed. Audit visible via commit list.
let commits_payload = parse_stdout_json(&output_success(
cli()
.arg("commit")
@ -597,8 +597,8 @@ fn local_cli_failed_load_keeps_target_state_unchanged() {
snapshot_table_row_count(&repo, "edge:Knows"),
knows_rows_before
);
// MR-771: failed loads no longer leave a RunRecord. The atomicity
// guarantee is verified above (target tables are unchanged).
// Failed loads leave no run record (the run lifecycle has been
// removed); atomicity is verified above by the unchanged target.
}
#[test]
@ -631,9 +631,8 @@ fn local_cli_failed_change_keeps_target_state_unchanged() {
.arg("--json"),
));
assert_eq!(friends_payload["row_count"], 2);
// MR-771: failed mutations no longer leave a RunRecord. The atomicity
// guarantee is verified above (the friends_of read above shows main
// unchanged).
// Failed mutations leave no run record (the run lifecycle has been
// removed); atomicity is verified above by the unchanged target.
}
#[test]
@ -941,12 +940,13 @@ query vector_search($q: String) {
assert_eq!(result["rows"][0]["d.slug"], "alpha-doc");
}
// MR-771: the publisher CAS conflict shape is verified end-to-end at the
// engine level in `crates/omnigraph/tests/runs.rs::concurrent_writers_one_succeeds_one_gets_expected_version_mismatch`
// The publisher CAS conflict shape is verified end-to-end at the engine
// level in
// `crates/omnigraph/tests/runs.rs::concurrent_writers_one_succeeds_one_gets_expected_version_mismatch`
// and at the HTTP boundary in
// `crates/omnigraph-server/tests/server.rs::change_conflict_returns_manifest_conflict_409`.
// The pre-MR-771 CLI-level race was timing-dependent; with direct-publish
// the surface is the same engine path the unit test already covers.
// A CLI-level race would be timing-dependent; with direct-publish the
// surface is the same engine path the unit test already covers.
#[test]
fn local_cli_policy_tooling_is_end_to_end_while_local_writes_stay_unenforced() {

View file

@ -192,7 +192,7 @@ query insert_person($name: String, $age: I32) {
assert_eq!(local_verify["row_count"], 1);
assert_eq!(local_verify["rows"][0]["p.name"], "Mina");
// MR-771: `run publish` / `run list` removed. Direct-to-target writes
// `run publish` / `run list` removed. Direct-to-target writes
// already landed via the change call above; the commit graph is now
// the audit surface (verified separately by `commit list`).
}

View file

@ -1346,7 +1346,7 @@ async fn policy_blocks_non_admin_merge_to_main_and_allows_admin() {
#[tokio::test(flavor = "multi_thread")]
async fn authenticated_change_stamps_actor_on_commits() {
// MR-771: with the Run state machine removed, actor_id is recorded
// With the Run state machine removed, actor_id is recorded
// directly on the commit graph (no intermediate run record).
let (_temp, app) = app_for_loaded_repo_with_auth_tokens(&[("act-andrew", "token-one")]).await;
@ -2108,10 +2108,10 @@ query vector_search_string($q: String) {
#[tokio::test(flavor = "multi_thread")]
async fn change_conflict_returns_manifest_conflict_409() {
// MR-771: a write that races with another writer surfaces as HTTP 409
// with a structured `manifest_conflict` body — `table_key`, `expected`,
// and `actual` — so clients can detect-and-retry without parsing the
// message. (Replaces the old run-publish merge-conflict shape.)
// A write that races with another writer surfaces as HTTP 409 with
// a structured `manifest_conflict` body — `table_key`, `expected`,
// and `actual` — so clients can detect-and-retry without parsing
// the message.
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());

View file

@ -19,6 +19,7 @@ failpoints = ["dep:fail", "fail/failpoints"]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.3.1" }
lance = { workspace = true }
lance-datafusion = { workspace = true }
datafusion = { workspace = true }
lance-file = { workspace = true }
lance-index = { workspace = true }
lance-linalg = { workspace = true }

View file

@ -1430,12 +1430,12 @@ edge WorksAt: Person -> Company
#[tokio::test]
async fn test_apply_schema_succeeds_after_load() {
// MR-670 + MR-674: schema apply used to be blocked by leftover
// __run__ branches. MR-670 added a defense-in-depth filter that
// skips internal system branches. MR-674 made run branches
// ephemeral on every terminal state, so in practice no __run__
// branch survives publish — but the filter still guards the
// invariant.
// Historical: schema apply used to be blocked by leftover
// `__run__` branches. A defense-in-depth filter now skips
// internal system branches, and run branches were made
// ephemeral on every terminal state — so in practice no
// `__run__` branch survives publish. The filter still guards
// the invariant.
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
@ -1451,7 +1451,7 @@ edge WorksAt: Person -> Company
let all_branches = db.coordinator.all_branches().await.unwrap();
assert!(
!all_branches.iter().any(|b| is_internal_run_branch(b)),
"MR-674: run branch should be deleted after publish, got: {:?}",
"run branch should be deleted after publish, got: {:?}",
all_branches
);

View file

@ -15,8 +15,8 @@
//! retention. Destructive to version history — callers should gate this
//! behind an explicit confirm flag at the CLI layer.
//!
//! Both walk every node + edge table on the `main` branch. Run branches are
//! ephemeral by design (MR-670 / MR-674) so we do not optimize them.
//! Both walk every node + edge table on the `main` branch. Run branches
//! are ephemeral by design so we do not optimize them.
use std::time::Duration;

View file

@ -34,9 +34,9 @@ pub(super) async fn apply_schema_with_lock(
let branches = db.coordinator.all_branches().await?;
// Skip `main` and internal system branches. The schema-apply lock branch
// is excluded because it is the cluster-wide schema-apply serializer.
// `__run__*` branches are no longer created (MR-771); the filter remains
// as defense-in-depth for legacy repos with leftover staging branches
// MR-770 will sweep them and this guard can go.
// `__run__*` branches are no longer created; the filter remains as
// defense-in-depth for legacy repos with leftover staging branches.
// A future production sweep will let this guard go.
let blocking_branches = branches
.into_iter()
.filter(|branch| branch != "main" && !is_internal_system_branch(branch))

View file

@ -1,12 +1,12 @@
// Run state-machine has been removed (MR-771). Mutations now write directly
// to target tables and use the publisher's `expected_table_versions` CAS for
// cross-table OCC; the `__run__<id>` staging branches and `_graph_runs.lance`
// state machine no longer exist.
// The Run state machine has been removed. Mutations now write directly
// to target tables and use the publisher's `expected_table_versions`
// CAS for cross-table OCC; `__run__<id>` staging branches and the
// `_graph_runs.lance` state machine no longer exist.
//
// What remains is the branch-name predicate, kept as a defense-in-depth guard
// against users naming a public branch `__run__*`. MR-770 owns the production
// sweep of legacy `_graph_runs.lance` rows and stale `__run__*` branches; once
// that lands the predicate (and this file) can go too.
// What remains is the branch-name predicate, kept as a defense-in-depth
// guard against users naming a public branch `__run__*`. A future
// production sweep of legacy `_graph_runs.lance` rows and stale
// `__run__*` branches will let this predicate (and this file) go too.
pub(crate) const INTERNAL_RUN_BRANCH_PREFIX: &str = "__run__";

View file

@ -46,3 +46,4 @@ mod merge;
mod mutation;
mod projection;
mod query;
pub(crate) mod staging;

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,662 @@
//! Per-query staging accumulator for direct-publish writes.
//!
//! `MutationStaging` accumulates per-table input batches in memory during a
//! `mutate_as` or `load` query, then at end-of-query commits each touched
//! table via Lance's distributed-write API (one `stage_*` + `commit_staged`
//! per table) and returns the publisher inputs (`SubTableUpdate` list +
//! `expected_table_versions`).
//!
//! Read-your-writes within the same query is satisfied by the in-memory
//! pending batches (see `pending_batches`) — read sites union the committed
//! Lance scan with the pending Arrow batches via DataFusion `MemTable` (see
//! `crate::table_store::TableStore::scan_with_pending`).
//!
//! This module is shared by the engine's mutation path (`exec/mutation.rs`)
//! and the bulk loader (`loader/mod.rs`); both feed insert/update batches
//! into `pending` and route end-of-query commits through `finalize`.
//! Deletes follow the inline-commit path and are recorded via
//! `record_inline` (parse-time D₂ rule prevents mixed insert/delete in a
//! single query, so no flushing is required).
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use arrow_array::{Array, RecordBatch, StringArray, UInt32Array};
use arrow_schema::SchemaRef;
use lance::Dataset;
use omnigraph_compiler::catalog::EdgeType;
use crate::db::SubTableUpdate;
use crate::error::{OmniError, Result};
/// Whether the per-table accumulator should commit via `stage_append`
/// (no @key inserts, edge inserts) or `stage_merge_insert` (any @key insert
/// or update). Once set to `Merge` for a table within a query, subsequent
/// inserts on that table are rolled into the same merge — a `WhenNotMatched
/// = InsertAll` merge is correct for both cases.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PendingMode {
Append,
Merge,
}
/// Per-table accumulator. Each insert/update op pushes a `RecordBatch` into
/// `batches`; at end-of-query the accumulated batches concat into a single
/// stage call.
#[derive(Debug)]
pub(crate) struct PendingTable {
pub(crate) schema: SchemaRef,
pub(crate) mode: PendingMode,
pub(crate) batches: Vec<RecordBatch>,
}
impl PendingTable {
fn new(schema: SchemaRef, mode: PendingMode) -> Self {
Self {
schema,
mode,
batches: Vec::new(),
}
}
fn total_rows(&self) -> usize {
self.batches.iter().map(|b| b.num_rows()).sum()
}
}
/// Stable per-table identifiers captured on first touch and reused at
/// finalize time. Avoids re-resolving the dataset path / branch.
#[derive(Debug, Clone)]
pub(crate) struct StagedTablePath {
pub(crate) full_path: String,
pub(crate) table_branch: Option<String>,
}
/// Per-query staging state.
///
/// Replaces the legacy inline-commit `MutationStaging.latest` map with
/// an in-memory accumulator that defers all Lance HEAD advances to
/// end-of-query. After this rewire the bug class "Lance HEAD drifts ahead
/// of `__manifest`" is unreachable in `mutate_as` and `load` for inserts
/// and updates by construction.
#[derive(Default)]
pub(crate) struct MutationStaging {
/// Pre-write manifest version per table — the publisher's CAS fence at
/// end-of-query.
pub(crate) expected_versions: HashMap<String, u64>,
/// Per-table identifiers captured on first touch.
pub(crate) paths: HashMap<String, StagedTablePath>,
/// In-memory accumulated batches per table (insert/update path).
pub(crate) pending: HashMap<String, PendingTable>,
/// Inline-committed updates from delete-touching ops (D₂ guarantees no
/// pending batches exist on a delete-touched table).
pub(crate) inline_committed: HashMap<String, SubTableUpdate>,
}
impl MutationStaging {
/// Capture pre-write metadata on first touch of a table. Subsequent
/// touches are no-ops (paths and `expected_version` are stable for the
/// lifetime of one query).
pub(crate) fn ensure_path(
&mut self,
table_key: &str,
full_path: String,
table_branch: Option<String>,
expected_version: u64,
) {
self.paths.entry(table_key.to_string()).or_insert(StagedTablePath {
full_path,
table_branch,
});
self.expected_versions
.entry(table_key.to_string())
.or_insert(expected_version);
}
/// Append a batch to the per-table accumulator.
///
/// `mode` is asserted-consistent with prior pushes for the same table:
/// `Append`+`Append` stays Append; any `Merge` upgrades the table to
/// Merge (e.g. an `update Person` after `insert Knows from='X' to='Y'`
/// when both produce content on `node:Person`). Once Merge is set,
/// subsequent appends roll into the merge stream — `WhenNotMatched =
/// InsertAll` correctly inserts append-shaped rows.
pub(crate) fn append_batch(
&mut self,
table_key: &str,
schema: SchemaRef,
mode: PendingMode,
batch: RecordBatch,
) -> Result<()> {
if batch.num_rows() == 0 {
// No-op — staging is purely additive; an empty batch should not
// be appended.
return Ok(());
}
// If we've already accumulated a batch on this table, the new
// batch's schema MUST match the existing accumulator's schema.
// The mismatch case in practice is a blob-bearing table that
// sees an `insert` (full schema, blob columns included) and
// then an `update` whose `apply_assignments` output omits
// unassigned blob columns (subset schema). Concat-time and
// MemTable-construction errors would catch this later, but
// surfacing it at the offending `append_batch` call gives the
// caller a clearer point of failure attached to the specific
// op that introduced the drift.
if let Some(existing) = self.pending.get(table_key) {
if !schemas_compatible(&existing.schema, &batch.schema()) {
return Err(OmniError::manifest(format!(
"table '{}' accumulated mutation batches with mismatched schemas: \
prior batches have {} columns, this batch has {}. \
This typically happens on a blob-bearing table when one \
op uses the full schema (e.g. an `insert`) and another \
omits unassigned blob columns (e.g. an `update` that \
doesn't set every blob property). Split the mutation \
into two queries: one for the inserts, one for the \
updates.",
table_key,
existing.schema.fields().len(),
batch.schema().fields().len(),
)));
}
}
let entry = self
.pending
.entry(table_key.to_string())
.or_insert_with(|| PendingTable::new(schema.clone(), mode));
// Upgrade Append -> Merge if any op needs merge semantics.
if mode == PendingMode::Merge {
entry.mode = PendingMode::Merge;
}
entry.batches.push(batch);
Ok(())
}
/// Record a delete that already inline-committed at the Lance layer.
pub(crate) fn record_inline(&mut self, update: SubTableUpdate) {
self.inline_committed.insert(update.table_key.clone(), update);
}
/// Read-your-writes accessor: the accumulated pending batches for
/// `table_key`, or `&[]` if none.
pub(crate) fn pending_batches(&self, table_key: &str) -> &[RecordBatch] {
self.pending
.get(table_key)
.map(|p| p.batches.as_slice())
.unwrap_or(&[])
}
/// Schema of the accumulated batches for `table_key`, or `None` if no
/// op has touched the table. Used by `scan_with_pending` to construct
/// the in-memory `MemTable`.
pub(crate) fn pending_schema(&self, table_key: &str) -> Option<SchemaRef> {
self.pending.get(table_key).map(|p| p.schema.clone())
}
/// `true` if neither pending nor inline_committed has any state — the
/// query made no observable writes.
pub(crate) fn is_empty(&self) -> bool {
self.pending.is_empty() && self.inline_committed.is_empty()
}
/// Total count of pending rows across all tables. Used by tests and
/// (eventually) memory-budget enforcement.
#[allow(dead_code)]
pub(crate) fn pending_row_count(&self) -> usize {
self.pending.values().map(|p| p.total_rows()).sum()
}
/// End-of-query: for each pending table, concat batches and commit via
/// `stage_append` or `stage_merge_insert` followed by `commit_staged`.
/// Merge with inline-committed entries. Return `(updates,
/// expected_versions)` for `commit_updates_on_branch_with_expected`.
///
/// Sequential per-table — no cross-table dependency, but a parallel
/// version is a perf optimization for multi-table writes (loader with
/// many node + edge types). v1 ships sequential; the fan-out can land
/// in a follow-up.
pub(crate) async fn finalize(
self,
db: &crate::db::Omnigraph,
_branch: Option<&str>,
) -> Result<(Vec<SubTableUpdate>, HashMap<String, u64>)> {
let MutationStaging {
expected_versions,
paths,
pending,
inline_committed,
} = self;
let mut updates: Vec<SubTableUpdate> =
inline_committed.into_values().collect();
for (table_key, table) in pending {
let path = paths.get(&table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing path for table '{}'",
table_key
))
})?;
let expected = *expected_versions.get(&table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing expected version for table '{}'",
table_key
))
})?;
// Reopen at the pre-write version. Lance HEAD has not advanced
// since `ensure_path` captured it — no prior op committed to
// this dataset.
let ds = db
.reopen_for_mutation(
&table_key,
&path.full_path,
path.table_branch.as_deref(),
expected,
)
.await?;
if table.batches.is_empty() {
continue;
}
// For Merge mode, dedupe accumulated batches by `id`, keeping
// the LAST occurrence (last-write-wins for the query). This
// is required because Lance's `MergeInsertBuilder` produces
// arbitrary results on duplicate keys in the source. Append
// mode is exempt because no-key node and edge inserts use
// ULID-generated ids that are unique within a query.
let combined = match table.mode {
PendingMode::Merge => {
dedupe_merge_batches_by_id(&table.schema, table.batches)?
}
PendingMode::Append => {
if table.batches.len() == 1 {
table.batches.into_iter().next().unwrap()
} else {
arrow_select::concat::concat_batches(
&table.schema,
&table.batches,
)
.map_err(|e| OmniError::Lance(e.to_string()))?
}
}
};
// Commit via Lance's two-phase write: stage produces
// uncommitted fragments + transaction; commit advances HEAD.
let staged = match table.mode {
PendingMode::Append => {
db.table_store().stage_append(&ds, combined, &[]).await?
}
PendingMode::Merge => {
db.table_store()
.stage_merge_insert(
ds.clone(),
combined,
vec!["id".to_string()],
lance::dataset::WhenMatched::UpdateAll,
lance::dataset::WhenNotMatched::InsertAll,
)
.await?
}
};
let new_ds = db
.table_store()
.commit_staged(Arc::new(ds), staged.transaction)
.await?;
let state = db
.table_store()
.table_state(&path.full_path, &new_ds)
.await?;
updates.push(SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: path.table_branch.clone(),
row_count: state.row_count,
version_metadata: state.version_metadata,
});
}
Ok((updates, expected_versions))
}
}
/// Walk `batches` in reverse, tracking seen `id` values; for each row
/// whose id we have NOT seen yet, mark it as a keeper. After the walk,
/// take the kept rows in forward (input) order and concat into one batch.
///
/// Result: a deduped batch where each `id` appears at most once, with
/// the LAST occurrence's column values. Required by `stage_merge_insert`,
/// which needs unique source keys (Lance's `MergeInsertBuilder` produces
/// arbitrary results on duplicates).
///
/// `batches` must be non-empty and all share `schema` (caller enforces).
/// Compare two schemas for the purposes of `MutationStaging::append_batch`'s
/// accumulator-compatibility check. We treat schemas as compatible if
/// they have the same field names and data types in the same order.
/// Nullability and field metadata differences are tolerated — Lance and
/// Arrow round-trip these freely and the accumulator's downstream
/// `concat_batches` is also permissive on those.
fn schemas_compatible(a: &SchemaRef, b: &SchemaRef) -> bool {
if a.fields().len() != b.fields().len() {
return false;
}
for (af, bf) in a.fields().iter().zip(b.fields().iter()) {
if af.name() != bf.name() || af.data_type() != bf.data_type() {
return false;
}
}
true
}
fn dedupe_merge_batches_by_id(
schema: &SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<RecordBatch> {
if batches.is_empty() {
return Err(OmniError::manifest_internal(
"dedupe_merge_batches_by_id: batches is empty".to_string(),
));
}
// Walk in reverse, tracking seen ids. For each row whose id we
// haven't seen yet, record (batch_idx, row_idx) for the kept set.
let mut seen: HashSet<String> = HashSet::new();
let mut keep: Vec<Vec<u32>> = vec![Vec::new(); batches.len()];
let mut any_duplicates = false;
for (b_idx, batch) in batches.iter().enumerate().rev() {
let id_col = batch
.column_by_name("id")
.ok_or_else(|| {
OmniError::manifest_internal(
"dedupe_merge_batches_by_id: batch has no 'id' column".to_string(),
)
})?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
OmniError::manifest_internal(
"dedupe_merge_batches_by_id: 'id' column is not Utf8".to_string(),
)
})?;
for r_idx in (0..batch.num_rows()).rev() {
if !id_col.is_valid(r_idx) {
// NULL ids — keep all (NULL != NULL in Lance/SQL semantics).
keep[b_idx].push(r_idx as u32);
continue;
}
let id = id_col.value(r_idx);
if seen.insert(id.to_string()) {
keep[b_idx].push(r_idx as u32);
} else {
any_duplicates = true;
}
}
// We pushed in reverse-row order; flip to forward order so the
// emitted batch reflects insertion order.
keep[b_idx].reverse();
}
// Fast path: no duplicates → simple concat.
if !any_duplicates {
if batches.len() == 1 {
return Ok(batches.into_iter().next().unwrap());
}
return arrow_select::concat::concat_batches(schema, &batches)
.map_err(|e| OmniError::Lance(e.to_string()));
}
// Slow path: build per-batch slices via `take`, then concat.
let mut sliced: Vec<RecordBatch> = Vec::with_capacity(batches.len());
for (b_idx, idxs) in keep.into_iter().enumerate() {
if idxs.is_empty() {
continue;
}
let take_array = UInt32Array::from(idxs);
let columns: Vec<Arc<dyn Array>> = batches[b_idx]
.columns()
.iter()
.map(|col| arrow_select::take::take(col, &take_array, None))
.collect::<std::result::Result<_, _>>()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let new_batch = RecordBatch::try_new(batches[b_idx].schema(), columns)
.map_err(|e| OmniError::Lance(e.to_string()))?;
sliced.push(new_batch);
}
if sliced.is_empty() {
return Err(OmniError::manifest_internal(
"dedupe_merge_batches_by_id: all rows were dropped (unexpected)".to_string(),
));
}
if sliced.len() == 1 {
return Ok(sliced.into_iter().next().unwrap());
}
arrow_select::concat::concat_batches(schema, &sliced)
.map_err(|e| OmniError::Lance(e.to_string()))
}
// ─── Cardinality helpers (shared by mutation + loader paths) ────────────────
/// Count edges per `src` value across committed (Lance scan) + pending
/// (in-memory). Caller supplies an opened committed dataset so the
/// mutation path (which already has one) and the loader path (which
/// opens via snapshot) share the same body.
///
/// `dedupe_key_column` controls whether committed rows are shadowed by
/// pending:
/// - `None` — every committed row counts, every pending row counts.
/// Correct when committed and pending cannot share a primary key
/// (engine inserts always use fresh ULID edge ids; loader Append
/// mode uses fresh ids too).
/// - `Some(col)` — committed rows whose `col` value also appears in any
/// pending batch are EXCLUDED from the committed count, so a Merge-mode
/// load that *updates* an existing edge (potentially changing its
/// `src`) counts the post-update row exactly once. Without this,
/// `LoadMode::Merge` double-counts.
pub(crate) async fn count_src_per_edge(
db: &crate::db::Omnigraph,
committed_ds: &Dataset,
table_key: &str,
staging: &MutationStaging,
dedupe_key_column: Option<&str>,
) -> Result<HashMap<String, u32>> {
let mut counts: HashMap<String, u32> = HashMap::new();
let pending_batches = staging.pending_batches(table_key);
// Collect pending key values (for shadow-on-merge dedupe). Only when
// dedupe is requested AND there's anything pending.
let pending_keys: Option<HashSet<String>> = match dedupe_key_column {
Some(col) if !pending_batches.is_empty() => {
let mut set = HashSet::new();
for batch in pending_batches {
if let Some(arr) = batch
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
{
for i in 0..arr.len() {
if arr.is_valid(i) {
set.insert(arr.value(i).to_string());
}
}
}
}
Some(set)
}
_ => None,
};
// Committed side: scan `src` plus the dedupe key column when set, so
// we can both count and shadow in one pass.
let projection: Vec<&str> = match dedupe_key_column {
Some(col) if pending_keys.as_ref().is_some_and(|s| !s.is_empty()) => vec!["src", col],
_ => vec!["src"],
};
let committed = db
.table_store()
.scan(committed_ds, Some(&projection), None, None)
.await?;
for batch in &committed {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
// Optional shadow-key column (only present when dedupe is on).
let key_arr = match (&pending_keys, dedupe_key_column) {
(Some(set), Some(col)) if !set.is_empty() => batch
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<StringArray>()),
_ => None,
};
for i in 0..srcs.len() {
if !srcs.is_valid(i) {
continue;
}
// Shadow this committed row if its key is in pending.
if let (Some(arr), Some(set)) = (key_arr, pending_keys.as_ref()) {
if arr.is_valid(i) && set.contains(arr.value(i)) {
continue;
}
}
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
// Pending side: walk in-memory batches for `src`. When dedupe is on,
// collapse rows that share `dedupe_key_column` to their last occurrence
// — mirrors `dedupe_merge_batches_by_id`'s last-write-wins applied at
// finalize time, so cardinality counts what `commit_staged` will
// actually publish, not raw input duplicates.
//
// Without this, a Merge-mode load whose input JSONL has two rows with
// the same edge id would be double-counted here, even though the
// finalize-time dedupe would collapse them to one. The result: spurious
// `@card` violations on perfectly valid Merge inputs.
match dedupe_key_column {
Some(key_col) => count_pending_src_with_dedupe(pending_batches, key_col, &mut counts)?,
None => count_pending_src_naive(pending_batches, &mut counts),
}
Ok(counts)
}
/// Count pending edges per `src` with NO dedup. Correct when caller
/// guarantees pending rows have unique primary keys (engine inserts via
/// fresh ULID; loader Append mode).
fn count_pending_src_naive(
pending_batches: &[RecordBatch],
counts: &mut HashMap<String, u32>,
) {
for batch in pending_batches {
let Some(col) = batch.column_by_name("src") else {
continue;
};
let Some(srcs) = col.as_any().downcast_ref::<StringArray>() else {
continue;
};
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
}
/// Count pending edges per `src` after deduping rows that share
/// `dedupe_key_column`. Last occurrence wins (mirrors
/// `dedupe_merge_batches_by_id`'s walk-in-reverse contract). Required for
/// `LoadMode::Merge` where the same edge id may appear multiple times in
/// one load and finalize will collapse them to the last value.
fn count_pending_src_with_dedupe(
pending_batches: &[RecordBatch],
dedupe_key_column: &str,
counts: &mut HashMap<String, u32>,
) -> Result<()> {
// Walk in reverse, track seen keys, keep one (key, src) pair per key.
let mut seen: HashSet<String> = HashSet::new();
let mut kept_srcs: Vec<String> = Vec::new();
for batch in pending_batches.iter().rev() {
let Some(key_col) = batch.column_by_name(dedupe_key_column) else {
// Pending batch is missing the key column. By construction
// this is unreachable: callers in dedupe mode always push
// batches whose schema contains the key (loader Merge mode
// builds via build_edge_batch which always emits `id`; the
// append_batch schema-compatibility check at the call site
// would also reject a heterogeneous mix). If it ever fires
// it's a programmer error — fail loudly rather than skip
// counting (which would let `@card` violations slip).
return Err(OmniError::manifest_internal(format!(
"count_pending_src_with_dedupe: pending batch missing dedup key column '{}' \
(schema-compat check at append_batch should have rejected this)",
dedupe_key_column
)));
};
let key_arr = key_col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(
"count_src_per_edge: pending '{}' column is not Utf8",
dedupe_key_column
))
})?;
let src_arr = batch
.column_by_name("src")
.and_then(|c| c.as_any().downcast_ref::<StringArray>());
let Some(srcs) = src_arr else {
continue;
};
for i in (0..batch.num_rows()).rev() {
if !srcs.is_valid(i) {
continue;
}
// NULL key: keep (NULL != NULL semantics — every NULL counts).
if !key_arr.is_valid(i) {
kept_srcs.push(srcs.value(i).to_string());
continue;
}
let key = key_arr.value(i);
if seen.insert(key.to_string()) {
kept_srcs.push(srcs.value(i).to_string());
}
}
}
for src in kept_srcs {
*counts.entry(src).or_insert(0) += 1;
}
Ok(())
}
/// Apply `@card(min..max)` bounds to a per-source count map.
///
/// Both bounds are checked. The `min` check produces a misleading error
/// during a per-op insert mid-query (a bound of `2..` requires both
/// edges to be inserted before validation passes), but the historical
/// behavior was to enforce min per-op anyway — keeping users from
/// accidentally publishing a graph that violates the schema. Consumers
/// that need end-of-query semantics call this from after all edge ops
/// are accumulated (the loader does, via Phase 3).
pub(crate) fn enforce_cardinality_bounds(
edge_type: &EdgeType,
counts: &HashMap<String, u32>,
) -> Result<()> {
let card = &edge_type.cardinality;
for (src, count) in counts {
if let Some(max) = card.max {
if *count > max {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (max {})",
edge_type.name, src, count, max
)));
}
}
if *count < card.min {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (min {})",
edge_type.name, src, count, card.min
)));
}
}
Ok(())
}

View file

@ -21,6 +21,7 @@ use serde_json::Value as JsonValue;
use crate::db::Omnigraph;
use crate::error::{OmniError, Result};
use crate::exec::staging::{MutationStaging, PendingMode};
/// Result of a load operation.
#[derive(Debug, Clone, Default)]
@ -154,11 +155,10 @@ impl Omnigraph {
pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
self.ensure_schema_state_valid().await?;
// Reject internal `__run__*` / system-prefixed branches at the public
// write boundary. The pre-MR-771 path got this guard transitively via
// `begin_run`'s `ensure_public_branch_ref` call; the direct-publish
// path needs to assert it explicitly so a caller can't write to
// legacy or system staging branches by passing the prefix verbatim.
// Reject internal `__run__*` / system-prefixed branches at the
// public write boundary. Direct-publish paths assert this
// explicitly so a caller can't write to legacy or system
// staging branches by passing the prefix verbatim.
crate::db::ensure_public_branch_ref(branch, "load")?;
// Branch convention: `None` represents `main`. Re-normalizing to
// `Some("main")` here would route the publisher commit through a
@ -304,21 +304,29 @@ async fn load_jsonl_reader<R: BufRead>(
}
}
// Phase 2: Build per-type RecordBatches and write to Lance.
//
// Writes to different tables are independent in Lance (each table has its
// own manifest + fragments), so we parallelize across types with a bounded
// concurrency limit. Serial writes against S3 were the dominant cost of
// load — batching and parallelizing per-type cuts wall time by roughly
// `LOAD_WRITE_CONCURRENCY`× for wide schemas (see MR-677).
// Phase 2: Build per-type RecordBatches and accumulate into the
// staging pipeline. For Append/Merge, batches go into an in-memory
// accumulator and a single `stage_*` + `commit_staged` per touched
// table runs at end-of-load — a mid-load failure (RI / cardinality
// violation) leaves Lance HEAD untouched. For Overwrite, the legacy
// inline-commit path is preserved (truncate+append doesn't fit the
// staged shape cleanly, and overwrite has no in-flight read-your-writes
// requirement).
let mut updates = Vec::new();
let mut result = LoadResult::default();
let snapshot = db.snapshot_for_branch(branch).await?;
// Capture per-table manifest versions before any write so the publisher
// CAS at commit-time can detect concurrent writers landing between our
// read snapshot and our publish.
let mut expected_table_versions: HashMap<String, u64> = HashMap::new();
let use_staging = !matches!(mode, LoadMode::Overwrite);
let mut staging = MutationStaging::default();
let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
let pending_mode = match mode {
LoadMode::Merge => PendingMode::Merge,
// Append-mode loads accumulate as Append. Edge tables (no @key)
// and no-key node tables stay safe on the stage_append path. The
// Merge mode applies dedupe-by-id; Append assumes unique inputs.
LoadMode::Append => PendingMode::Append,
LoadMode::Overwrite => PendingMode::Append, // unused
};
// Phase 2a: build and validate every node batch up front. Cheap and
// synchronous — surfaces validation errors before any S3 traffic.
@ -338,46 +346,89 @@ async fn load_jsonl_reader<R: BufRead>(
let entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
expected_table_versions.insert(table_key.clone(), entry.table_version);
if !use_staging {
overwrite_expected.insert(table_key.clone(), entry.table_version);
}
prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
}
// Phase 2b: write every node type concurrently, bounded.
let node_write_results = write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.nodes_loaded.insert(type_name, loaded_count);
// Phase 2b: write every node type. Append/Merge → in-memory
// accumulator. Overwrite → concurrent inline-commit (legacy path).
if use_staging {
for (type_name, table_key, batch, loaded_count) in prepared_nodes {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.nodes_loaded.insert(type_name, loaded_count);
}
} else {
let node_write_results =
write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.nodes_loaded.insert(type_name, loaded_count);
}
}
// Phase 2b: Validate edge referential integrity — every src/dst must
// reference an existing node ID in the appropriate type.
// Phase 2c: Validate edge referential integrity — every src/dst must
// reference an existing node ID in the appropriate type. For staged
// loads, the lookup unions snapshot-committed IDs with the in-memory
// pending batches (which carry the just-staged node inserts).
for (edge_name, rows) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let from_ids = collect_node_ids(
db,
branch,
&edge_type.from_type,
&node_rows,
&catalog,
&updates,
)
.await?;
let to_ids = collect_node_ids(
db,
branch,
&edge_type.to_type,
&node_rows,
&catalog,
&updates,
)
.await?;
let from_ids = if use_staging {
collect_node_ids_with_pending(
db,
branch,
&edge_type.from_type,
&staging,
)
.await?
} else {
collect_node_ids(
db,
branch,
&edge_type.from_type,
&node_rows,
&catalog,
&overwrite_updates,
)
.await?
};
let to_ids = if use_staging {
collect_node_ids_with_pending(
db,
branch,
&edge_type.to_type,
&staging,
)
.await?
} else {
collect_node_ids(
db,
branch,
&edge_type.to_type,
&node_rows,
&catalog,
&overwrite_updates,
)
.await?
};
for (i, (src, dst, _)) in rows.iter().enumerate() {
if !from_ids.contains(src.as_str()) {
@ -401,7 +452,7 @@ async fn load_jsonl_reader<R: BufRead>(
}
}
// Write edges (parallel per edge type, same pattern as nodes)
// Phase 2d: build edge batches.
let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
Vec::with_capacity(edge_rows.len());
for (edge_name, rows) in &edge_rows {
@ -417,29 +468,62 @@ async fn load_jsonl_reader<R: BufRead>(
let entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
expected_table_versions.insert(table_key.clone(), entry.table_version);
if !use_staging {
overwrite_expected.insert(table_key.clone(), entry.table_version);
}
prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
}
let edge_write_results = write_batches_concurrently(db, branch, mode, prepared_edges).await?;
for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.edges_loaded.insert(edge_name, loaded_count);
// Phase 2e: write every edge type. Same dispatch as Phase 2b.
if use_staging {
for (edge_name, table_key, batch, loaded_count) in prepared_edges {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.edges_loaded.insert(edge_name, loaded_count);
}
} else {
let edge_write_results =
write_batches_concurrently(db, branch, mode, prepared_edges).await?;
for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.edges_loaded.insert(edge_name, loaded_count);
}
}
// Phase 3: Validate edge cardinality constraints (before commit — invalid
// data must not be committed). Opens edge sub-tables at their just-written
// versions, not through the snapshot (which still pins to pre-write state).
// Phase 3: Validate edge cardinality constraints (before commit —
// invalid data must not be committed). Staged path scans committed
// edges via Lance + iterates pending edges in-memory. Overwrite path
// opens the just-written version (legacy behavior).
for (edge_name, _) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let table_key = format!("edge:{}", edge_name);
if let Some(update) = updates.iter().find(|u| u.table_key == table_key) {
if use_staging {
validate_edge_cardinality_with_pending_loader(
db,
branch,
edge_type,
&table_key,
&staging,
mode,
)
.await?;
} else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
validate_edge_cardinality(
db,
branch,
@ -452,8 +536,18 @@ async fn load_jsonl_reader<R: BufRead>(
}
// Phase 4: Atomic manifest commit with publisher-level OCC.
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_table_versions)
if use_staging {
let (updates, expected_versions) = staging.finalize(db, branch).await?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
.await?;
} else {
db.commit_updates_on_branch_with_expected(
branch,
&overwrite_updates,
&overwrite_expected,
)
.await?;
}
Ok(result)
}
@ -1456,6 +1550,123 @@ pub(crate) async fn validate_edge_cardinality(
Ok(())
}
/// Validate edge `@card` cardinality with in-memory pending edges visible.
///
/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
/// opens the committed dataset at the pre-load snapshot version, then
/// delegates to the shared `count_src_per_edge` + `enforce_cardinality_bounds`
/// helpers in `exec::staging`. Used by Append/Merge loads (the Overwrite
/// path uses `validate_edge_cardinality` which opens the just-written
/// Lance version).
///
/// `mode` controls dedup behavior. `LoadMode::Merge` passes `Some("id")`
/// so committed edges that the load is *updating* (same edge id,
/// possibly changed `src`) are not double-counted. `LoadMode::Append`
/// passes `None` because each line generates a fresh ULID id that
/// never collides with committed.
async fn validate_edge_cardinality_with_pending_loader(
db: &Omnigraph,
branch: Option<&str>,
edge_type: &omnigraph_compiler::catalog::EdgeType,
table_key: &str,
staging: &MutationStaging,
mode: LoadMode,
) -> Result<()> {
if edge_type.cardinality.is_default() {
return Ok(());
}
let snapshot = db.snapshot_for_branch(branch).await?;
let Some(entry) = snapshot.entry(table_key) else {
// No manifest entry — table doesn't exist yet. Pending-only is
// fine; the helper handles empty committed scans.
return Ok(());
};
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
.await?;
let dedupe_key = match mode {
LoadMode::Merge => Some("id"),
LoadMode::Append | LoadMode::Overwrite => None,
};
let counts =
crate::exec::staging::count_src_per_edge(db, &ds, table_key, staging, dedupe_key)
.await?;
crate::exec::staging::enforce_cardinality_bounds(edge_type, &counts)
}
/// Collect all valid node IDs for a given type, with in-memory pending
/// node inserts visible. Used by the staged loader's Phase 2c
/// referential-integrity validation.
///
/// Union of:
/// - IDs from the staged loader's pending batches (in-memory; just-staged
/// inserts of this type)
/// - IDs from the committed sub-table at the pre-load snapshot version
async fn collect_node_ids_with_pending(
db: &Omnigraph,
branch: Option<&str>,
type_name: &str,
staging: &MutationStaging,
) -> Result<HashSet<String>> {
let mut ids = HashSet::new();
let table_key = format!("node:{}", type_name);
// From staging.pending: walk the in-memory accumulator's id column.
for batch in staging.pending_batches(&table_key) {
if let Some(col) = batch.column_by_name("id") {
if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
for i in 0..arr.len() {
if arr.is_valid(i) {
ids.insert(arr.value(i).to_string());
}
}
}
}
}
// From the committed Lance sub-table at the pre-load snapshot version.
let snapshot = db.snapshot_for_branch(branch).await?;
let Some(entry) = snapshot.entry(&table_key) else {
return Ok(ids);
};
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
.await?;
let batches = db
.table_store()
.scan(&ds, Some(&["id"]), None, None)
.await?;
for batch in &batches {
let id_col = batch
.column_by_name("id")
.ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
for i in 0..batch.num_rows() {
// Defensive: `id` is the @key column on every node type and
// is non-nullable by schema, but a committed-row corruption
// (or future schema change) could surface a NULL. Skip
// rather than insert "" — pending-side does the same.
if id_col.is_valid(i) {
ids.insert(id_col.value(i).to_string());
}
}
}
Ok(ids)
}
/// Collect all valid node IDs for a given type. Union of:
/// - IDs from the just-loaded batch (in memory, from node_rows)
/// - IDs from the sub-table at the just-written version (if it was updated)
@ -1512,6 +1723,9 @@ async fn collect_node_ids(
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..batch.num_rows() {
if !id_col.is_valid(i) {
continue;
}
ids.insert(id_col.value(i).to_string());
}
}

View file

@ -40,12 +40,12 @@ pub struct DeleteState {
/// A Lance write that has produced fragment files on object storage but is
/// not yet committed to the dataset's manifest. The staged-write primitives
/// are defined here for later integration in `MutationStaging`
/// (`exec/mutation.rs`) and the loader (`loader/mod.rs`) — those rewires
/// land in [MR-794](https://linear.app/modernrelay/issue/MR-794) step 2+.
/// The intent: defer Lance commits to end-of-query so a mid-query failure
/// leaves the touched table at the pre-mutation HEAD instead of drifting
/// ahead.
/// are consumed by `MutationStaging` (`exec/staging.rs`,
/// `exec/mutation.rs`) and the bulk loader (`loader/mod.rs`). The
/// intent: defer Lance commits to end-of-query so a mid-query failure
/// leaves the touched table at the pre-mutation HEAD instead of
/// drifting ahead. See `docs/runs.md` for the publisher-CAS contract
/// this builds on.
///
/// `transaction` is opaque from our side — Lance owns its semantics. We
/// commit it via `CommitBuilder::execute(transaction)` (see
@ -545,7 +545,7 @@ impl TableStore {
})
}
// ─── Staged-write API (MR-794) ───────────────────────────────────────────
// ─── Staged-write API ────────────────────────────────────────────────────
//
// These primitives wrap Lance's distributed-write API: each call writes
// fragment files to object storage but does NOT advance the dataset's
@ -672,16 +672,16 @@ impl TableStore {
///
/// This is intrinsic to the underlying Lance API: there is no public
/// way to make `MergeInsertBuilder` see uncommitted fragments. The
/// engine's mutation path enforces the rule "per touched table: all
/// stage_append OR exactly one stage_merge_insert" at parse time
/// (the D₂ check landing with [MR-794](https://linear.app/modernrelay/issue/MR-794)
/// step 2+ in `exec/mutation.rs`). Multi-table queries and append-chains
/// remain safe; only chained merges on a single table are rejected.
/// engine's `MutationStaging` accumulator works around this by
/// concatenating per-table batches in memory and issuing exactly
/// one `stage_merge_insert` per touched table at end-of-query (with
/// last-write-wins dedupe by id) — see `exec/staging.rs`. Direct
/// callers of this primitive must respect the contract themselves.
///
/// Lift path: either a Lance API extension that lets
/// `MergeInsertBuilder` accept additional staged fragments, or an
/// in-memory pre-merge here that folds prior staged batches into the
/// input stream. See `docs/runs.md` and MR-793.
/// input stream. See `docs/runs.md`.
pub async fn stage_merge_insert(
&self,
ds: Dataset,
@ -780,10 +780,10 @@ impl TableStore {
/// filtered scan even when their data would match. Staged-fragment
/// rows are silently absent from the result. `scanner.use_stats(false)`
/// does not fix this in lance 4.0.0. Callers needing correct filtered
/// reads against staged data should use a different strategy (the
/// engine's MR-794 step 2+ design uses in-memory pending-batch
/// accumulation + DataFusion `MemTable` instead — see
/// `.context/mr-794-step2-design.md`).
/// reads against staged data should use a different strategy the
/// engine's `MutationStaging` accumulator unions in-memory pending
/// batches with the committed scan via DataFusion `MemTable` (see
/// `scan_with_pending`).
///
/// This method remains on the surface for primitive-level testing
/// (basic stage + scan correctness without filters works) and for
@ -821,6 +821,106 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Scan committed via Lance + apply the same filter to in-memory
/// pending batches via DataFusion `MemTable`, concat the two result
/// streams. The replacement for `scan_with_staged` in engine code:
/// the staged-write writer accumulates input batches in memory and
/// unions them with the committed snapshot at read time,
/// sidestepping the `Scanner::with_fragments` filter-pushdown
/// limitation documented on `scan_with_staged`.
///
/// `committed_ds` should be opened at the pre-mutation
/// `expected_version` (the same version captured in `MutationStaging::expected_versions`
/// at first touch of the table). `pending_batches` are the per-table
/// accumulator's batches in their input shape. `pending_schema` is
/// the schema of the accumulated batches; passing `None` falls back
/// to the schema of the first pending batch.
///
/// `filter` is the Lance / DataFusion SQL predicate. It is applied
/// to both sides — Lance pushes it down on the committed side; the
/// pending side runs it through a fresh DataFusion `SessionContext`
/// with the batches registered as a `MemTable` named `pending`.
///
/// `key_column` controls how committed and pending are unioned:
/// - **`None` (union semantics)**: every committed row that matches
/// the filter and every pending row that matches the filter is
/// returned. Correct when committed and pending cannot share a
/// primary key — e.g., Append-mode loads with ULID-generated ids,
/// or any read where pending hasn't been used to update committed
/// rows.
/// - **`Some(col)` (merge / shadow semantics)**: committed rows whose
/// `col` value appears in any pending batch are EXCLUDED from the
/// result; only pending's view of those rows is returned. Required
/// for Merge-mode reads (e.g., `execute_update` on the engine path)
/// so a chained `update` doesn't see stale committed values that
/// a prior op already updated in pending. Without this, a predicate
/// like `where age > 30` can match a row that an earlier
/// `set age = 20` already moved out of range.
///
/// When `pending_batches` is empty this delegates to the regular
/// scan path.
pub async fn scan_with_pending(
&self,
committed_ds: &Dataset,
pending_batches: &[RecordBatch],
pending_schema: Option<SchemaRef>,
projection: Option<&[&str]>,
filter: Option<&str>,
key_column: Option<&str>,
) -> Result<Vec<RecordBatch>> {
// Contract: when merge-shadow semantics are requested via
// `key_column`, the committed-side projection MUST include that
// column so we can filter committed rows whose key appears in
// pending. Silently dropping the shadow when projection omits
// the key would re-introduce union semantics behind the
// caller's back. Reject up front with a clear error so callers
// either (a) include the key in projection or (b) drop
// `key_column` if union is what they wanted.
if let (Some(key_col), Some(cols)) = (key_column, projection) {
if !cols.iter().any(|c| *c == key_col) {
return Err(OmniError::Lance(format!(
"scan_with_pending: key_column '{}' must appear in projection \
when merge-shadow semantics are requested (got projection = {:?})",
key_col, cols
)));
}
}
let committed = self.scan(committed_ds, projection, filter, None).await?;
if pending_batches.is_empty() {
return Ok(committed);
}
// Shadow committed rows whose key value also appears in pending.
// This makes scan_with_pending implement merge semantics rather
// than naive union: any row that has a pending update is
// represented ONLY by its pending value, never by both its
// (stale) committed value and its (current) pending value.
let committed = match key_column {
Some(key_col) => {
let pending_keys = collect_string_column_values(pending_batches, key_col)?;
if pending_keys.is_empty() {
committed
} else {
filter_out_rows_where_string_in(committed, key_col, &pending_keys)?
}
}
None => committed,
};
let pending = scan_pending_batches(
pending_batches,
pending_schema,
projection,
filter,
)
.await?;
let mut out = committed;
out.extend(pending);
Ok(out)
}
/// `count_rows` variant that respects staged writes. Used for
/// edge-cardinality validation that needs to see staged edges before
/// commit. Same `committed - removed + new` composition as
@ -1068,6 +1168,139 @@ fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<(
Ok(())
}
/// Collect the set of values in a Utf8 column across multiple batches.
/// Used by `scan_with_pending`'s merge-semantic path to identify
/// committed rows that are shadowed by pending writes. NULL values are
/// skipped.
fn collect_string_column_values(
batches: &[RecordBatch],
column: &str,
) -> Result<std::collections::HashSet<String>> {
use arrow_array::{Array, StringArray};
let mut out = std::collections::HashSet::new();
for batch in batches {
let Some(col) = batch.column_by_name(column) else {
return Err(OmniError::Lance(format!(
"scan_with_pending: pending batch missing key column '{}'",
column
)));
};
let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(
"scan_with_pending: key column '{}' is not Utf8",
column
))
})?;
for i in 0..arr.len() {
if arr.is_valid(i) {
out.insert(arr.value(i).to_string());
}
}
}
Ok(out)
}
/// Drop rows from `batches` whose Utf8 `column` value is in `excluded`.
/// Used by `scan_with_pending`'s merge-semantic path to shadow committed
/// rows that pending has already updated. Returns the surviving rows.
///
/// `scan_with_pending` validates up front that the projection contains
/// `column`, so a missing column here is a programmer error — error
/// loudly instead of silently passing batches through (which would
/// re-introduce the union semantics the caller asked us to avoid).
fn filter_out_rows_where_string_in(
batches: Vec<RecordBatch>,
column: &str,
excluded: &std::collections::HashSet<String>,
) -> Result<Vec<RecordBatch>> {
use arrow_array::{Array, BooleanArray, StringArray};
let mut out = Vec::with_capacity(batches.len());
for batch in batches {
if batch.num_rows() == 0 {
out.push(batch);
continue;
}
let col = batch.column_by_name(column).ok_or_else(|| {
OmniError::manifest_internal(format!(
"scan_with_pending: committed batch missing key column '{}' \
(the up-front projection check should have rejected this)",
column
))
})?;
let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(
"scan_with_pending: committed column '{}' is not Utf8",
column
))
})?;
let mask: BooleanArray = (0..arr.len())
.map(|i| {
if arr.is_valid(i) {
Some(!excluded.contains(arr.value(i)))
} else {
Some(true)
}
})
.collect();
let filtered = arrow_select::filter::filter_record_batch(&batch, &mask)
.map_err(|e| OmniError::Lance(e.to_string()))?;
out.push(filtered);
}
Ok(out)
}
/// Apply `projection` and `filter` to in-memory pending batches via a
/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
/// the read-your-writes side of the in-memory staging accumulator.
///
/// `pending_batches` must be non-empty (the caller short-circuits on
/// empty).
///
/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
/// on the committed side. Lance and DataFusion both accept standard
/// SQL comparison predicates (`col op literal`) and OmniGraph's
/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
/// scanner extension (vector search, FTS, `_rowid` references) into
/// the filter, this function will need explicit translation — DataFusion
/// won't recognize those operators against the in-memory `MemTable`.
async fn scan_pending_batches(
pending_batches: &[RecordBatch],
pending_schema: Option<SchemaRef>,
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
let schema = pending_schema.unwrap_or_else(|| pending_batches[0].schema());
let ctx = datafusion::execution::context::SessionContext::new();
let mem = datafusion::datasource::MemTable::try_new(
schema,
vec![pending_batches.to_vec()],
)
.map_err(|e| OmniError::Lance(e.to_string()))?;
ctx.register_table("pending", Arc::new(mem))
.map_err(|e| OmniError::Lance(e.to_string()))?;
let proj = projection
.map(|cols| {
cols.iter()
.map(|c| format!("\"{}\"", c.replace('"', "\"\"")))
.collect::<Vec<_>>()
.join(", ")
})
.unwrap_or_else(|| "*".to_string());
let where_clause = filter
.map(|f| format!("WHERE {f}"))
.unwrap_or_default();
let sql = format!("SELECT {proj} FROM pending {where_clause}");
let df = ctx
.sql(&sql)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
df.collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()

View file

@ -522,11 +522,12 @@ query high_value() {
#[tokio::test]
async fn stale_handle_public_mutation_must_refresh_then_retry() {
// MR-771: with the Run state machine removed, the engine no longer
// auto-rebases stale-handle mutations onto the latest target head. The
// publisher's `expected_table_versions` CAS makes the contract explicit
// — a stale writer fails loudly with `ExpectedVersionMismatch` and the
// client decides whether to refresh-and-retry.
// With the Run state machine removed, the engine no longer
// auto-rebases stale-handle mutations onto the latest target head.
// The publisher's `expected_table_versions` CAS makes the contract
// explicit — a stale writer fails loudly with
// `ExpectedVersionMismatch` and the client decides whether to
// refresh-and-retry.
let dir = tempfile::tempdir().unwrap();
let _db = init_and_load(&dir).await;
drop(_db);

View file

@ -140,6 +140,131 @@ async fn schema_apply_recovers_partial_rename() {
assert_no_staging_files(dir.path());
}
/// Pin the documented "finalize → publisher residual" of the
/// staged-write commit path.
///
/// `MutationStaging::finalize` runs `commit_staged` per touched table
/// sequentially before the publisher commits the manifest. Lance has no
/// multi-dataset atomic commit primitive, so a failure between the
/// per-table staged commits and the manifest commit leaves Lance HEAD
/// advanced on the touched tables with no manifest update — and the
/// next mutation surfaces `ExpectedVersionMismatch` on those tables.
///
/// This isn't a code bug we can fix without an upstream Lance change;
/// it's the documented residual (see `docs/runs.md` "Finalize →
/// publisher residual"). The test pins the behavior so future code
/// changes catch any silent regression: if someone widens the residual
/// (e.g. failing earlier in finalize without rolling back), this test
/// will surface a different error than `ExpectedVersionMismatch`. If
/// someone narrows the residual (e.g. lance ships multi-dataset commit
/// and we plumb it), this test will start passing the next mutation
/// — and someone has to update the assertion + the docs.
#[tokio::test]
async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers() {
use omnigraph::error::{ManifestConflictDetails, OmniError};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
// First mutation: finalize succeeds (commit_staged advances Lance
// HEAD on node:Person), then the failpoint kicks before the
// publisher's manifest commit. The caller sees the synthetic
// error.
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: mutation.post_finalize_pre_publisher"
),
"unexpected error: {err}"
);
}
// Failpoint dropped — subsequent calls are not synthetic-failed.
// Next mutation against the same table surfaces the documented
// residual: Lance HEAD on node:Person advanced (commit_staged ran),
// manifest didn't, so the publisher CAS at next-mutation time
// surfaces ExpectedVersionMismatch.
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.unwrap_err();
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
let Some(ManifestConflictDetails::ExpectedVersionMismatch {
ref table_key,
expected,
actual,
}) = manifest_err.details
else {
panic!(
"expected ExpectedVersionMismatch (the documented residual), got {:?}",
manifest_err.details
);
};
assert_eq!(
table_key, "node:Person",
"drift should be on the table the failed finalize touched"
);
assert!(
actual > expected,
"Lance HEAD on the drifted table should be ahead of manifest pinned: actual={actual} expected={expected}",
);
}
/// Companion to the above — confirms that a finalize→publisher failure
/// on one table leaves OTHER tables untouched. Subsequent writes to
/// non-drifted tables proceed normally; the drift is contained.
#[tokio::test]
async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _ = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect_err("synthetic failpoint must fire");
}
// node:Person drifted. node:Company didn't — try a Company write.
use omnigraph::loader::{LoadMode, load_jsonl};
load_jsonl(
&mut db,
r#"{"type": "Company", "data": {"name": "Acme"}}"#,
LoadMode::Append,
)
.await
.expect("Company write on a non-drifted table should succeed");
}
fn assert_no_staging_files(repo: &std::path::Path) {
for name in [
"_schema.pg.staging",

File diff suppressed because it is too large Load diff

View file

@ -44,7 +44,7 @@ async fn s3_compatible_repo_lifecycle_works() {
.await
.unwrap();
// Direct-to-target load (MR-771): no run lifecycle, single publisher
// Direct-to-target load: no run lifecycle, single publisher
// commit lands the row.
reopened
.load(
@ -153,7 +153,7 @@ async fn s3_public_load_uses_hidden_run_and_publishes() {
.await
.unwrap();
// Direct-to-target writes (MR-771): no run state machine, just the
// Direct-to-target writes: no run state machine, just the
// published commit lands the row. Verify by reopening and reading.
let mut reopened = Omnigraph::open(&uri).await.unwrap();
let loaded = query_main(

View file

@ -1,20 +1,21 @@
//! Primitive-level tests for `TableStore`'s staged-write API
//! (MR-794 step 1). These exercise `stage_append`, `stage_merge_insert`,
//! `scan_with_staged`, and `count_rows_with_staged` directly against a
//! Lance dataset — no Omnigraph engine involved. The engine-level rewire
//! (MR-794 step 2+) lives in `tests/runs.rs` once it lands.
//! Primitive-level tests for `TableStore`'s staged-write API. These
//! exercise `stage_append`, `stage_merge_insert`, `scan_with_staged`,
//! and `count_rows_with_staged` directly against a Lance dataset — no
//! Omnigraph engine involved. The engine-level use of these primitives
//! is exercised by `tests/runs.rs`.
//!
//! Test surface here:
//! 1. `stage_append` + `scan_with_staged` shows committed + staged data
//! without duplicates.
//! 2. `stage_merge_insert` of a row that supersedes a committed fragment
//! surfaces only the rewritten row, not both — the
//! `removed_fragment_ids` dedup landed in PR #66's `730631c`.
//! 3. **Documented contract**: chained `stage_merge_insert` calls on the
//! same dataset whose source rows share keys produce duplicate rows in
//! `scan_with_staged`. The engine's parse-time D₂ check (MR-794 step
//! 2+) prevents callers from triggering this; this test pins the
//! primitive's behavior so a future change either (a) preserves it or
//! surfaces only the rewritten row, not both, via the
//! `removed_fragment_ids` dedup contract.
//! 3. **Documented contract**: chained `stage_merge_insert` calls on
//! the same dataset whose source rows share keys produce duplicate
//! rows in `scan_with_staged`. The engine's accumulator dedupes by
//! id at finalize time so this primitive-level pitfall doesn't
//! surface in production paths; this test pins the primitive's
//! behavior so a future change either (a) preserves it or
//! (b) consciously fixes it (and updates this test).
use arrow_array::{Array, Int32Array, RecordBatch, StringArray, UInt64Array};
@ -120,8 +121,9 @@ async fn stage_merge_insert_dedupes_superseded_committed_fragment() {
assert!(
!staged.removed_fragment_ids.is_empty(),
"merge_insert that rewrites a committed row must set removed_fragment_ids \
(this is the dedup invariant from PR #66 commit 730631c its absence \
was caught by Cubic/Cursor/Codex on PR #66)"
so the scan-with-staged composer can shadow the superseded committed \
fragment without it, the committed row and its rewrite both appear, \
producing duplicates by key"
);
// scan_with_staged: alice appears exactly once, with the new age.
@ -347,11 +349,11 @@ async fn stage_merge_insert_then_commit_persists_merged_view() {
/// silently absent. `scanner.use_stats(false)` does not bypass this in
/// lance 4.0.0.
///
/// This test pins the actual behavior so a future change either preserves
/// it (and updates the doc) or fixes it (and rewrites this test). The
/// engine's MR-794 step 2+ design uses in-memory pending-batch
/// accumulation + DataFusion `MemTable` for read-your-writes instead, so
/// production code is unaffected.
/// This test pins the actual behavior so a future change either
/// preserves it (and updates the doc) or fixes it (and rewrites this
/// test). The engine's `MutationStaging` accumulator unions in-memory
/// pending batches with the committed scan via DataFusion `MemTable`
/// for read-your-writes instead, so production code is unaffected.
#[tokio::test]
async fn scan_with_staged_with_filter_silently_drops_staged_rows() {
let dir = tempfile::tempdir().unwrap();
@ -485,6 +487,8 @@ async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior
here because this assertion failed: either (a) the primitive was \
improved to dedupe across stages (good update to assert == 1) \
or (b) something subtler broke (investigate before changing the \
assertion). See PR #67 Codex P1 thread + .context/mr-794-step2-design.md §3.1."
assertion). The engine's MutationStaging accumulator dedupes by \
id at finalize time so this primitive-level pitfall doesn't \
surface in production paths see exec/staging.rs."
);
}

View file

@ -63,7 +63,7 @@ flowchart TB
subgraph engine[omnigraph engine]
plan[exec query and mutation]:::l2
gi[graph index CSR/CSC<br/>RuntimeCache LRU 8]:::l2
coord[coordinator<br/>ManifestRepo · CommitGraph · RunRegistry]:::l2
coord[coordinator<br/>ManifestRepo · CommitGraph]:::l2
end
subgraph storage[storage trait — wraps Lance]
@ -134,7 +134,7 @@ flowchart TB
coord[GraphCoordinator]:::l2
mr[ManifestRepo<br/>db/manifest.rs]:::l2
cg[CommitGraph<br/>_graph_commits.lance]:::l2
rr[RunRegistry<br/>_graph_runs.lance]:::l2
stg[MutationStaging<br/>per-query in-memory accumulator<br/>exec/staging.rs]:::l2
end
subgraph idx[graph index]
@ -149,17 +149,18 @@ flowchart TB
eq --> gi
eq --> ts
em --> stg
em --> ts
ld --> stg
ld --> ts
eq --> mr
em --> mr
coord --> mr
coord --> cg
coord --> rr
ts --> st
```
The engine binds the compiler IR to Lance. It owns multi-dataset coordination, the graph topology index, the run registry, and the snapshot/manifest read path.
The engine binds the compiler IR to Lance. It owns multi-dataset coordination, the graph topology index, the per-query staging accumulator, and the snapshot/manifest read path.
Code paths:
@ -169,6 +170,46 @@ Code paths:
- Graph index: `crates/omnigraph/src/graph_index/`
- Loader: `Omnigraph::ingest` at `crates/omnigraph/src/loader/mod.rs:74`
### Mutation atomicity — in-memory accumulator (MR-794)
Inserts and updates inside `mutate_as` and the bulk loader's
Append/Merge modes go through `MutationStaging`
([`crates/omnigraph/src/exec/staging.rs`](../crates/omnigraph/src/exec/staging.rs)),
a per-query in-memory accumulator. No Lance HEAD advance happens during
op execution; one `stage_*` + `commit_staged` per touched table runs
at end-of-query, then the publisher commits the manifest atomically.
```
op-1 (insert/update) → push RecordBatch → MutationStaging.pending[table]
op-2 (insert/update) → read committed via Lance + pending via DataFusion
MemTable (read-your-writes) → push batch
op-N → push batch
─── end of query ───────────────────────────────────────
finalize: per pending table:
concat batches → stage_append OR stage_merge_insert → commit_staged
publisher: ManifestBatchPublisher::publish (one cross-table CAS)
```
A failed op leaves Lance HEAD untouched on the staged tables: the next
mutation proceeds normally with no drift to reconcile. Concrete
contracts:
- `D₂` parse-time rule: a query is either insert/update-only or
delete-only. Mixed → reject. Deletes still inline-commit (Lance
4.0.0 has no public two-phase delete); D₂ keeps the inline path safe.
- `LoadMode::Overwrite` keeps the inline-commit path
(truncate-then-append doesn't fit the staged shape; overwrite has no
in-flight read-your-writes requirement).
- Read sites consume `TableStore::scan_with_pending`, which Lance-scans
the committed snapshot at the captured `expected_version` and unions
with a DataFusion `MemTable` over the pending batches.
This pattern realizes [docs/invariants.md §VI.25](invariants.md)
(read-your-writes within a multi-statement mutation) and §VI.32
(failure scope bounded) for inserts/updates by construction at the
writer layer. See [docs/runs.md](runs.md) for the publisher CAS
contract this builds on.
### Storage trait — today vs. roadmap
```mermaid
@ -256,13 +297,13 @@ Throughout the docs, capabilities are split into:
- **MVCC**: every Lance write bumps a per-dataset version; the OmniGraph manifest version coordinates which sub-table versions are visible together.
- **Snapshot isolation**: a query holds one `Snapshot` for its lifetime; concurrent writes don't leak in.
- **Cross-branch isolation**: copy-on-write means readers and writers on different branches don't block each other.
- **Run isolation**: each transactional run lives on its own `__run__<id>` branch.
- **Per-query staging**: `mutate_as` and `load` (Append/Merge) accumulate insert/update batches in an in-memory `MutationStaging`; one `stage_*` + `commit_staged` per touched table runs at end-of-query, then the publisher commits the manifest atomically. A mid-query failure leaves Lance HEAD untouched on staged tables. (MR-794; pre-v0.4.0 used a `__run__<id>` staging branch + Run state machine, removed in MR-771.)
- **Schema-apply lock**: `__schema_apply_lock__` system branch serializes schema migrations.
- **Fail-points** (`failpoints` cargo feature): `failpoints::maybe_fail("operation.step")?` in `branch_create`, publish, etc., for deterministic failure injection in tests.
## Workspace crates
- `omnigraph-compiler` — schema and query grammars, catalog, IR, lowering, type checker, lint, migration planner, OpenAI-style embedding client.
- `omnigraph` (engine, published as `omnigraph-engine` on crates.io since v0.2.2) — the Lance-backed runtime: manifest, commit graph, run registry, snapshot, exec, merge, loader, Gemini embedding client.
- `omnigraph` (engine, published as `omnigraph-engine` on crates.io since v0.2.2) — the Lance-backed runtime: manifest, commit graph, snapshot, exec (incl. per-query `MutationStaging` accumulator), merge, loader, Gemini embedding client.
- `omnigraph-cli` — the `omnigraph` binary.
- `omnigraph-server` — the `omnigraph-server` binary (Axum HTTP server).

View file

@ -1,6 +1,7 @@
# Audit / Actor tracking
- `Omnigraph::audit_actor_id: Option<String>` is the actor in effect.
- `_as` variants of every write API let callers override the actor: `begin_run_as`, `publish_run_as`, `ingest_as`, `mutate_as`, `branch_merge_as`, etc.
- Actor IDs are persisted both on `RunRecord.actor_id` and on `GraphCommit.actor_id`, with optional split storage in `_graph_commit_actors.lance` and `_graph_run_actors.lance`.
- `_as` variants of every write API let callers override the actor: `mutate_as`, `ingest_as`, `branch_merge_as`, `apply_schema_as`, etc.
- Actor IDs are persisted on `GraphCommit.actor_id` with split storage in `_graph_commit_actors.lance` (the commit graph is split into `_graph_commits.lance` for the linkage and `_graph_commit_actors.lance` for the actor map).
- HTTP server uses the bearer-token actor automatically; CLI uses the local user / explicit env (no implicit actor).
- Pre-v0.4.0 repos also stored actor IDs on `RunRecord.actor_id` in `_graph_runs.lance` / `_graph_run_actors.lance`. The Run state machine was removed in MR-771; those files are inert post-v0.4.0 and reclaimed by MR-770's production sweep.

View file

@ -37,7 +37,7 @@ Storage is split across two Lance datasets (both with stable row IDs):
Notes:
- Every successful publish (load / change / merge / schema_apply / publish_run) appends one commit.
- Every successful publish (load / change / merge / schema_apply) appends one commit.
- Merge commits have two parents; linear commits have one.
- API: `list_commits(branch)`, `get_commit(id)`, `head_commit_id_for_branch(branch)`.
@ -53,5 +53,5 @@ Notes:
Filtered from `branch_list()` but visible to internals:
- `__run__<run-id>` — ephemeral isolation branch for a transactional run.
- `__schema_apply_lock__` — serializes schema migrations.
- `__run__<run-id>` — legacy from the pre-v0.4.0 Run state machine (removed in MR-771). The branch-name guard predicate `is_internal_run_branch` is kept as defense-in-depth so users cannot create a branch matching the legacy prefix; the filter will be removed once production legacy branches are swept (MR-770).

View file

@ -56,12 +56,12 @@ omnigraph policy validate --config ./omnigraph.yaml
omnigraph policy test --config ./omnigraph.yaml
omnigraph policy explain --config ./omnigraph.yaml --actor act-alice --action read --branch main
omnigraph run list ./repo.omni --json
omnigraph run show --uri ./repo.omni <run-id> --json
omnigraph run publish --uri ./repo.omni <run-id> --json
omnigraph run abort --uri ./repo.omni <run-id> --json
omnigraph commit list ./repo.omni --json
omnigraph commit show --uri ./repo.omni <commit-id> --json
```
(The legacy `omnigraph run list/show/publish/abort` subcommands were removed in MR-771; mutations and loads publish atomically and the commit graph (`omnigraph commit list`) is the audit surface.)
`query lint` and `query check` are the same command surface. In v1, repo-backed
lint uses local or `s3://` repo URIs; HTTP targets are only supported when you
also pass `--schema`.

View file

@ -4,8 +4,8 @@
|---|---|---|
| `MANIFEST_DIR` | `__manifest` | `db/manifest/layout.rs` |
| Commit graph dir | `_graph_commits.lance` | `db/commit_graph.rs` |
| Run registry dir | `_graph_runs.lance` | `db/run_registry.rs` |
| Run branch prefix | `__run__` | `db/run_registry.rs` |
| Run registry dir (legacy, removed MR-771) | `_graph_runs.lance` | inert post-v0.4.0; reclaimed by MR-770 |
| Run branch prefix (legacy, removed MR-771) | `__run__` | filtered by `is_internal_run_branch` defense-in-depth |
| Schema apply lock | `__schema_apply_lock__` | `db/mod.rs` |
| Manifest publisher retry budget | `PUBLISHER_RETRY_BUDGET = 5` | `db/manifest/publisher.rs` |
| Internal manifest schema version | `INTERNAL_MANIFEST_SCHEMA_VERSION = 2` | `db/manifest/migrations.rs` |

View file

@ -9,6 +9,7 @@
- `Manifest(ManifestError { kind: BadRequest|NotFound|Conflict|Internal, details: Option<ManifestConflictDetails>, … })`
- `ManifestConflictDetails::ExpectedVersionMismatch { table_key, expected, actual }` — caller's `expected_table_versions` did not match the manifest's current latest non-tombstoned version (set by `OmniError::manifest_expected_version_mismatch`).
- `ManifestConflictDetails::RowLevelCasContention` — Lance row-level CAS rejected the publish because a concurrent writer landed the same `object_id`. Retried internally by the publisher; only surfaces if the retry budget exhausts.
- **D₂ parse-time rejection** (MR-794): a single mutation query that mixes inserts/updates with deletes errors out *before any I/O* with kind `BadRequest`. Message: `mutation '<name>' on the same query mixes inserts/updates and deletes; split into separate mutations: (1) inserts and updates, then (2) deletes`. See [docs/query-language.md](query-language.md) for the rule and [docs/runs.md](runs.md) for the underlying staged-write rationale.
- `MergeConflicts(Vec<MergeConflict>)`
Compiler-side `NanoError` covers parse / catalog / type / storage / plan / execution / arrow / lance / IO / manifest / unique-constraint, each with structured spans (`SourceSpan { start, end }`) for ariadne-style diagnostics.

View file

@ -79,13 +79,16 @@ Hybrid example: `order { rrf(nearest($d.embedding, $q), bm25($d.body, $q_text))
## Mutation execution (`exec/mutation.rs`)
Resolves expression values to literals, converts to typed Arrow arrays (`literal_to_typed_array(lit, DataType, num_rows)`), then writes:
Resolves expression values to literals, converts to typed Arrow arrays (`literal_to_typed_array(lit, DataType, num_rows)`), then writes via Lance's two-phase distributed-write API at end-of-query:
- `insert` → Lance `WriteMode::Append`
- `update` → Lance `merge_insert(WhenMatched::Update)`
- `delete` → Lance `merge_insert(WhenMatched::Delete)` (logical) or filtered overwrite.
- `insert` (no `@key`, edges) → accumulate into `MutationStaging.pending` (Append mode); finalize calls `stage_append` once per touched table.
- `insert` (`@key` node) → accumulate into `pending` (Merge mode); finalize calls `stage_merge_insert` once per touched table.
- `update` → scan committed via Lance + pending via DataFusion `MemTable` (read-your-writes), apply assignments, accumulate into `pending` (Merge mode).
- `delete` → still inline-commits via `delete_where` (Lance 4.0.0 has no public two-phase delete); recorded into `MutationStaging.inline_committed`.
Multi-statement mutations are atomic at the manifest commit boundary.
**D₂ parse-time rule.** A single mutation query is either insert/update-only or delete-only. Mixed → reject before any I/O. The check fires in `enforce_no_mixed_destructive_constructive(&ir)` inside `execute_named_mutation`.
Multi-statement mutations are atomic at the publisher commit boundary: every insert/update batch lives in memory until end-of-query, then exactly one `stage_*` + `commit_staged` runs per touched table, then `ManifestBatchPublisher::publish` commits the manifest atomically with per-table `expected_table_versions` CAS.
### Mutation flow — sequence
@ -93,57 +96,58 @@ Multi-statement mutations are atomic at the manifest commit boundary.
sequenceDiagram
autonumber
participant client as Client
participant og as Omnigraph::mutate<br/>(mutation.rs:511)
participant og as Omnigraph::mutate_as<br/>(mutation.rs)
participant cmp as omnigraph-compiler
participant runs as RunRegistry
participant stg as MutationStaging<br/>(exec/staging.rs)
participant ts as table_store
participant lance as Lance dataset
participant mr as ManifestRepo<br/>(manifest.rs:280)
participant pub as ManifestBatchPublisher
client->>og: mutate(target, source, name, params)
og->>cmp: parse + typecheck_query
cmp-->>og: CheckedQuery (Mutation IR)
og->>runs: begin_run(target, op_hash)<br/>fork __run__<id> from target head
runs-->>og: RunRecord
loop for each mutation statement (on __run__<id>)
og->>og: resolve expression literals<br/>literal_to_typed_array(lit, type, n)
alt insert
og->>ts: append RecordBatches
ts->>lance: WriteMode::Append → new fragment(s)
else update
og->>ts: merge_insert keyed by id
ts->>lance: merge_insert(WhenMatched::Update)
else delete
og->>ts: merge_insert with delete predicate
ts->>lance: merge_insert(WhenMatched::Delete)
client->>og: mutate_as(branch, source, name, params, actor_id)
og->>cmp: parse + typecheck + lower_mutation_query
cmp-->>og: MutationIR
og->>og: enforce_no_mixed_destructive_constructive (D₂)
loop for each mutation op
og->>og: resolve literals + build batch
alt insert / update (accumulate)
og->>ts: open dataset @ pre-write version (first touch)
og->>stg: ensure_path + append_batch (PendingMode)
opt update — scan committed + pending
og->>ts: scan_with_pending (Lance + DataFusion MemTable union)
ts-->>og: matched batches
end
else delete (inline-commit, D₂ keeps separate)
og->>ts: delete_where (advances Lance HEAD)
og->>stg: record_inline (SubTableUpdate)
end
lance-->>ts: new dataset version
og->>mr: commit_updates(SubTableUpdate)<br/>per-statement commit on __run__<id>
mr-->>og: ack
end
og->>og: OCC: target head unchanged since begin_run?
og->>og: publish_run(run_id)
alt fast path (target hasn't moved)
og->>mr: commit_updates_on_branch(target, updates)<br/>promote run snapshot
else merge path (target advanced)
og->>og: branch_merge_internal(__run__<id>, target)<br/>three-way merge
og->>stg: finalize(db, branch)
loop per pending table
stg->>ts: stage_append OR stage_merge_insert (one per table)
ts-->>stg: StagedWrite (transaction + fragments)
stg->>ts: commit_staged (advances Lance HEAD)
ts-->>stg: new Dataset
end
mr-->>og: new target snapshot
og->>runs: terminate_run(Published)
stg-->>og: (updates: Vec<SubTableUpdate>, expected_versions)
og->>pub: commit_updates_on_branch_with_expected
pub->>pub: publisher CAS (cross-table OCC on __manifest)
pub-->>og: new manifest version
og-->>client: MutationResult
```
**Code paths:**
- Entry: `Omnigraph::mutate` at `crates/omnigraph/src/exec/mutation.rs:511`
- Per-mutation orchestration: `mutate_with_current_actor` at `crates/omnigraph/src/exec/mutation.rs:539`
- Per-statement commit on the run-branch: `commit_updates` (called from `execute_insert` / `execute_update` / `execute_delete` in `crates/omnigraph/src/exec/mutation.rs`)
- Run publish: `Omnigraph::publish_run` at `crates/omnigraph/src/db/omnigraph.rs:858`
- Manifest commit primitive: `ManifestRepo::commit` at `crates/omnigraph/src/db/manifest.rs:280` (called from both per-statement `commit_updates` and the publish path)
- Entry: `Omnigraph::mutate_as` at `crates/omnigraph/src/exec/mutation.rs`
- Per-mutation orchestration: `mutate_with_current_actor` at `crates/omnigraph/src/exec/mutation.rs`
- D₂ check: `enforce_no_mixed_destructive_constructive` (in the same file)
- Per-op execution: `execute_insert`, `execute_update`, `execute_delete_node`, `execute_delete_edge`
- Pending-aware reads: `TableStore::scan_with_pending` / `count_rows_with_pending` at `crates/omnigraph/src/table_store.rs`
- Edge cardinality with pending: `validate_edge_cardinality_with_pending` at `crates/omnigraph/src/exec/mutation.rs`
- Per-query accumulator: `crates/omnigraph/src/exec/staging.rs` (`MutationStaging`, `PendingTable`, `PendingMode`, `finalize`)
- End-of-query Lance commit: `TableStore::stage_append`, `stage_merge_insert`, `commit_staged` at `crates/omnigraph/src/table_store.rs`
- Manifest commit primitive: `commit_updates_on_branch_with_expected` at `crates/omnigraph/src/db/omnigraph/table_ops.rs`
Multi-statement mutations don't get atomicity from a single final `commit` — they get it from the **run-branch + publish_run** pattern. By default a mutation forks a fresh `__run__<id>` branch (`begin_run`); each statement individually commits its sub-table updates to that run-branch. After all statements complete, an OCC pre-check verifies the target hasn't moved since the run started, then `publish_run` atomically promotes the run-branch into the target — either via the fast path (direct promotion if the target hasn't moved) or a three-way merge. That final publish is what gives multi-statement mutations their atomicity guarantee (per [`docs/invariants.md`](invariants.md) §VI.26). If anything fails mid-run, the run is failed and the run-branch is dropped without affecting the target.
One exception: if the caller already targets a `__run__<id>` branch (mutation.rs:555), the mutation runs directly on that branch with no nested run wrapping — the assumption is the caller is managing the surrounding run lifecycle themselves. See [runs.md](runs.md) for the full run lifecycle.
Atomicity guarantee for multi-statement mutations: a mid-query failure leaves Lance HEAD untouched on staged tables (no inline commit happened during op execution), so the next mutation proceeds normally with no `ExpectedVersionMismatch`. The publisher CAS at the very end either succeeds (manifest advances atomically across all touched sub-tables) or fails with a typed `ManifestConflictDetails::ExpectedVersionMismatch` (no partial publish). See [docs/invariants.md §VI.25 / §VI.32](invariants.md) and [docs/runs.md](runs.md).
## Bulk loader (`loader/mod.rs`)
@ -156,19 +160,18 @@ One exception: if the caller already targets a `__run__<id>` branch (mutation.rs
## Load modes (`LoadMode`)
| Mode | Semantics |
|---|---|
| `Overwrite` | Replace all data in the target tables on the branch |
| `Append` | Strict insert; duplicates error |
| `Merge` | Upsert by id (`merge_insert`) |
| Mode | Semantics | Path (post-MR-794) |
|---|---|---|
| `Overwrite` | Replace all data in the target tables on the branch | Inline-commit per type, then publisher CAS at end-of-load. Truncate-then-append doesn't fit the staged shape; documented residual. |
| `Append` | Strict insert; duplicates error | In-memory `MutationStaging` accumulator; one `stage_append` + `commit_staged` per touched table at end-of-load; publisher CAS. |
| `Merge` | Upsert by `id` (`merge_insert`) | Same accumulator; one `stage_merge_insert` per touched table at end-of-load (Merge mode dedupes by `id`, last-write-wins); publisher CAS. |
For Append/Merge, a mid-load failure (RI / cardinality violation, validation error) leaves Lance HEAD untouched on the staged tables — the next load on the same tables proceeds normally with no `ExpectedVersionMismatch`. For Overwrite, a mid-load failure can still leave Lance HEAD on a partially-truncated table; the next overwrite replaces it.
## `load` vs `ingest`
- `load(branch, data, mode)` — direct load to a branch.
- `ingest(branch, from, data, mode)` — branch-creating, transactional load:
1. If target advanced since the run started, fork a fresh run branch from `from`.
2. Load into the run branch (Append).
3. If target hasn't moved, fast-publish; otherwise abort.
- `load(branch, data, mode)` — direct load to a branch (single publisher commit per call).
- `ingest(branch, from, data, mode)` — branch-creating wrapper: if `branch` doesn't exist, fork it from `from` (default `main`) via `branch_create_from`, then call `load(branch, data, mode)`.
- Returns `IngestResult { branch, base_branch, branch_created, mode, tables[] }`.
- `ingest_as(actor_id)` records the actor on the resulting commit.

View file

@ -110,7 +110,7 @@ Specific defaults (timeout values, memory caps, TTL windows) are *configuration*
*Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.*
25. **Isolation: per-query snapshot; read-your-writes within and across queries in a session.** Each query reads from one consistent manifest version. Within a multi-statement mutation, the read subplan inside each write operator sees the writes from earlier statements. Across queries in a session, reads always resolve the latest manifest version — no reader pinning to older snapshots.
*Status: open — read-your-writes within a multi-statement mutation requires Kuzu-style local-uncommitted scan path; deferred per MR-737 §10.10.*
*Status: upheld for inserts/updates after MR-794 step 2+ — `MutationStaging`'s in-memory accumulator + `TableStore::scan_with_pending` (DataFusion `MemTable` union with the committed Lance scan, with merge-shadow semantics for chained updates) implements read-your-writes within a multi-statement mutation. Delete-touching mutations are limited to delete-only by parse-time D₂; closing the within-query RYW gap for deletes requires Lance's two-phase delete API (tracked: MR-793 / Lance-upstream lance-format/lance#6658). The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures (the partial-failure test pins this), but a narrowed residual remains at the finalize→publisher boundary because Lance has no multi-dataset commit primitive — see [docs/runs.md](runs.md) "Finalize → publisher residual".*
26. **Durability before acknowledgement.** Commit returns only after the substrate has confirmed durable persistence. No "fast" or "fire-and-forget" durability levels.

View file

@ -64,6 +64,14 @@ Used inside MATCH or as expressions inside RETURN/ORDER:
`<value>` is a literal, `$param`, or `now()`. Multi-statement mutations execute atomically (added in v0.2.0).
### D₂ — mixed insert/update + delete is rejected at parse time
A single mutation query must be **either insert/update-only or delete-only**. Mixed → rejected before any I/O with the message:
> `mutation '<name>' on the same query mixes inserts/updates and deletes; split into separate mutations: (1) inserts and updates, then (2) deletes. This restriction lifts when Lance exposes a two-phase delete API (tracked: MR-793 / Lance-upstream).`
Reason: under the staged-write rewire (MR-794), inserts and updates accumulate in memory and commit at end-of-query, while deletes still inline-commit (Lance 4.0.0 has no public two-phase delete). Mixing creates ordering hazards (same-row insert→delete becomes a no-op because the staged insert isn't visible to delete; cascading deletes of just-inserted edges break referential integrity by silent design). Until Lance exposes `DeleteJob::execute_uncommitted`, the parse-time rejection keeps both paths atomic and correct. See [docs/runs.md](runs.md) and [docs/invariants.md §VI.25](invariants.md).
## IR (Intermediate Representation)
`QueryIR { name, params, pipeline: Vec<IROp>, return_exprs, order_by, limit }`

143
docs/releases/v0.4.1.md Normal file
View file

@ -0,0 +1,143 @@
# Omnigraph v0.4.1
Omnigraph v0.4.1 closes the multi-statement-mutation atomicity gap that
v0.4.0 documented as a known limitation. Inserts and updates now route
through an in-memory `MutationStaging` accumulator and commit via Lance's
two-phase distributed-write API at end-of-query. A failed mid-query op
no longer leaves Lance HEAD drifted on the touched table — the next
mutation proceeds normally.
## Highlights
- **Staged-write rewire (MR-794)**: `mutate_as` and `load` (Append /
Merge modes) accumulate insert/update batches into
`MutationStaging.pending` per touched table. No Lance HEAD advance
happens during op execution; one `stage_*` + `commit_staged` per
table runs at end-of-query, then `ManifestBatchPublisher::publish`
commits the manifest atomically. **For op-execution failures**
(validation errors, missing endpoints, parse-time D₂ rejection), Lance
HEAD on every staged table is untouched and the next mutation
proceeds normally. A narrowed residual remains at the
finalize→publisher boundary (multi-table `commit_staged` is not
atomic with the manifest commit) — see [docs/runs.md](../runs.md)
"Finalize → publisher residual" for details.
- **D₂ parse-time rule**: a single mutation query is either
insert/update-only or delete-only. Mixed → rejected with a clear
error directing the caller to split into two queries. Lance 4.0.0
has no public two-phase delete; deletes still inline-commit, and D₂
keeps that path safe.
- **Read-your-writes via DataFusion `MemTable`**: read sites in
multi-statement mutations consume `TableStore::scan_with_pending`,
which Lance-scans the committed snapshot at the captured
`expected_version` and unions with a DataFusion `MemTable` over the
pending batches. Replaces the previous "reopen at staged Lance
version" pattern.
- **Coordinator swap-restore eliminated** from `mutate_with_current_actor`.
Branch is threaded explicitly through the per-op execution path
(`execute_named_mutation`, `execute_insert`, `execute_update`,
`execute_delete*`, `validate_edge_insert_endpoints`,
`ensure_node_id_exists`). The `swap_coordinator_for_branch` /
`restore_coordinator` API and `CoordinatorRestoreGuard` are removed
from `mutation.rs`. (`merge.rs` keeps its own swap pattern; that's
a separate workflow tracked in MR-793.)
- **`docs/invariants.md` §VI.25** flips from `aspirational/open` to
`upheld for inserts/updates`. The within-query read-your-writes
guarantee is now load-bearing for the publisher CAS contract.
## Behavior changes
- A failed multi-statement mutation no longer surfaces
`ExpectedVersionMismatch` on the *next* mutation against the same
table. The next call proceeds normally — Lance HEAD on staged
tables is unchanged.
- Mixed insert/update + delete in one query is rejected at parse
time. Existing test queries that mixed both must be split.
- `MutationStaging`'s shape changed: `pending: HashMap<String, PendingTable>`
+ `inline_committed: HashMap<String, SubTableUpdate>` replaces the
previous `latest: HashMap<String, StagedTable>`. This is an internal
type; no public API impact.
## Residual / out of scope
- **`LoadMode::Overwrite`** keeps the legacy inline-commit path
(truncate-then-append doesn't fit the staged shape). A mid-overwrite
failure can still drift Lance HEAD on a partially-truncated table;
the next overwrite replaces it. Operator-driven, rare.
- **Delete-only multi-statement mutations** still inline-commit per op.
D₂ keeps inserts/updates from coexisting with deletes, so the
inline path remains atomic per op but not per query for delete-only
cascades. Closing this requires Lance to expose
`DeleteJob::execute_uncommitted`; tracked in MR-793 / Lance-upstream.
- **`schema_apply`, `branch_merge_internal`, `ensure_indices`** still
use Lance's inline-commit APIs. The two-phase pattern is in
`mutate_as` and `load` only; hoisting it to a storage-trait
invariant covering all writers is MR-793.
## Tests added
- `tests/runs.rs::partial_failure_leaves_target_queryable_and_unblocks_next_mutation`
(replaces the old `partial_failure_observably_rolls_back_but_blocks_next_mutation_on_same_table`)
- `tests/runs.rs::mutation_rejects_mixed_insert_and_delete_at_parse_time`
- `tests/runs.rs::mixed_insert_and_update_on_same_person_coalesces_to_one_merge`
- `tests/runs.rs::multiple_appends_to_same_edge_coalesce_to_one_append`
- `tests/runs.rs::multi_statement_inserts_publish_exactly_once`
- `tests/runs.rs::load_with_bad_edge_reference_unblocks_next_load`
- `tests/runs.rs::load_with_cardinality_violation_unblocks_next_load`
## Files changed
- `crates/omnigraph/src/exec/staging.rs` (NEW) — `MutationStaging`,
`PendingTable`, `PendingMode`, `StagedTablePath`,
`dedupe_merge_batches_by_id`.
- `crates/omnigraph/src/exec/mutation.rs` — D₂ check; per-op
rewires (`execute_insert`, `execute_update`, `execute_delete*`);
branch threading; coordinator-swap removal; helper
`validate_edge_cardinality_with_pending`; helper
`concat_match_batches_to_schema`; `apply_assignments` updated to
copy unassigned blob columns from full-schema scans.
- `crates/omnigraph/src/loader/mod.rs``load_jsonl_reader` split:
staged path for Append/Merge, legacy inline-commit path for
Overwrite. Helpers `collect_node_ids_with_pending` and
`validate_edge_cardinality_with_pending_loader`.
- `crates/omnigraph/src/table_store.rs``scan_with_pending`,
`count_rows_with_pending` (DataFusion `MemTable`-backed union with
Lance scan).
- `Cargo.toml` (workspace) + `crates/omnigraph/Cargo.toml` — added
`datafusion = "52"` direct dep (transitively pulled by Lance
already; required for `MemTable`).
- `docs/runs.md` — removed "Known limitation" section; documented
the new accumulator + D₂ + LoadMode::Overwrite residual.
- `docs/invariants.md` — §VI.25 status flipped to `upheld for
inserts/updates`.
- `docs/architecture.md` — added "Mutation atomicity — in-memory
accumulator (MR-794)" subsection; refreshed the engine + state
diagrams to drop `RunRegistry` and add `MutationStaging`.
- `docs/execution.md` — rewrote the mutation flow sequence diagram
for the staged-write path; updated the `LoadMode` table to call
out per-mode commit semantics; rewrote `load` vs `ingest`.
- `docs/query-language.md` — documented the D₂ parse-time rule.
- `docs/errors.md` — added the D₂ `BadRequest` rejection path.
- `docs/storage.md` — dropped the live `_graph_runs.lance` reference
(legacy from MR-771) from the layout diagram and prose.
- `docs/branches-commits.md` — moved `__run__<id>` to a legacy note;
removed `publish_run` from the publish-trigger list.
- `docs/audit.md` — current `_as` API list refreshed; legacy
`RunRecord.actor_id` moved to a historical note.
- `docs/constants.md` — marked the run registry / branch-prefix rows
as legacy.
- `docs/cli.md` — replaced the legacy `omnigraph run *` quickstart
block with `omnigraph commit list/show`.
- `docs/testing.md` — extended the `runs.rs` row to cover the new
MR-794 contract tests; added the `staged_writes.rs` row.
- `AGENTS.md` (CLAUDE.md symlink) — updated the atomic-per-query
description and the L2 capability matrix row.
## Included Changes
- MR-794 step 2+ — rewire `mutate_as` and `load` via in-memory
`MutationStaging` + `stage_*` / `commit_staged` per touched table at
end-of-query.
- (MR-794 step 1 shipped in v0.4.0's PR #67`StagedWrite`,
`stage_append`, `stage_merge_insert`, `commit_staged`,
`scan_with_staged`, `count_rows_with_staged` — and is the substrate
this release builds on.)

View file

@ -20,22 +20,96 @@ publisher's row-level CAS on `__manifest` is the single fence.
A `.gq` query with multiple ops (e.g. `insert Person … insert Knows …`)
must observe earlier ops' writes when validating later ops (referential
integrity, edge cardinality). After demotion this is implemented via an
in-process `MutationStaging` accumulator in
`crates/omnigraph/src/exec/mutation.rs`:
integrity, edge cardinality). After MR-794 step 2+ this is implemented
via an in-memory `MutationStaging` accumulator in
[`crates/omnigraph/src/exec/staging.rs`](../crates/omnigraph/src/exec/staging.rs),
shared by both `mutate_as` and the bulk loader:
- On the first touch of each table, the pre-write manifest version is
captured into `expected_versions[table_key]`.
- Subsequent ops on the same table re-open the dataset at the locally
staged Lance version (bypassing the manifest, which has not been
committed yet) so they see prior writes.
- One `commit_with_expected(updates, expected_versions)` at the end
publishes the lot atomically. Cross-table conflicts surface as
captured into `expected_versions[table_key]` (the publisher's CAS
fence at end-of-query).
- Each insert/update op pushes a `RecordBatch` into the per-table
pending accumulator. Lance HEAD does **not** advance during op
execution.
- Read sites (validation, predicate matching for `update`) consume
`TableStore::scan_with_pending`, which scans committed via Lance
and applies the same SQL filter to the pending batches via DataFusion
`MemTable`. Same-query writes are visible to subsequent reads.
- At end-of-query, `MutationStaging::finalize` issues exactly one
`stage_*` + `commit_staged` per touched table (concatenating
accumulated batches; merge-mode dedupes by `id`, last-write-wins),
and the publisher publishes the manifest atomically across all
touched sub-tables. Cross-table conflicts surface as
`ManifestConflictDetails::ExpectedVersionMismatch`.
- **Deletes still inline-commit.** Lance's `Dataset::delete` is not
exposed as a two-phase op in 4.0.0; deletes go through `delete_where`
immediately and record their post-write state in
`MutationStaging.inline_committed`. The parse-time D₂ rule (below)
prevents inserts/updates from coexisting with deletes in one query,
so the inline path is safe for delete-only mutations.
This upholds [docs/invariants.md §VI.23](invariants.md) (atomicity per
query) and §VI.25 (read-your-writes within a multi-statement mutation —
previously aspirational, now upheld).
query) and §VI.25 (read-your-writes within a multi-statement mutation,
upheld).
### D₂ — parse-time mixed-mode rejection
A single mutation query is either insert/update-only or delete-only.
Mixed → rejected at parse time with a clear error directing the user to
split the query. Reason: mixing creates ordering hazards
(insert→delete on the same row would silently no-op because the staged
insert isn't visible to delete; cascading deletes of just-inserted
edges break referential integrity). Until Lance exposes a two-phase
delete API, the parse-time rejection keeps both paths atomic and
correct. Tracked: MR-793, plus a Lance-upstream ticket.
### `LoadMode::Overwrite` residual
The bulk loader's Append and Merge modes use the staged-write path
described above. `LoadMode::Overwrite` keeps the legacy inline-commit
path: truncate-then-append doesn't fit the staged shape cleanly in
Lance 4.0.0, and overwrite has no in-flight read-your-writes
requirement (the prior data is being wiped). A mid-overwrite failure
can leave Lance HEAD on a partially-truncated table; the next overwrite
will replace it. Operator-driven (rare in agent workloads); document
permanently until Lance exposes `Operation::Overwrite { fragments }` as
a two-phase op.
### Finalize → publisher residual
The staged-write rewire eliminates one drift class **by construction at
the writer layer**: an op that fails before pushing to the in-memory
accumulator (validation errors, missing endpoints, parse-time D₂
rejection) leaves Lance HEAD untouched on every staged table. This is
the case the `partial_failure_leaves_target_queryable_and_unblocks_next_mutation`
test pins.
A second, narrower drift class remains. `MutationStaging::finalize`
runs `stage_*` + `commit_staged` per touched table sequentially, then
the publisher commits the manifest. Lance has no multi-dataset atomic
commit, so the per-table `commit_staged` calls are independent
operations: if commit_staged on table N+1 fails *after* commit_staged
on tables 1..N succeeded, or if the publisher's CAS pre-check rejects
*after* every commit_staged succeeded, tables 1..N are left at
`Lance HEAD = manifest_pinned + 1`. The next mutation against those
tables surfaces `ManifestConflictDetails::ExpectedVersionMismatch`
the same loud failure mode the rewire was designed to make rare, just
no longer "unreachable."
Triggers: transient Lance write errors during finalize (object-store
retry budget exhaustion, disk full); persistent publisher contention
exceeding `PUBLISHER_RETRY_BUDGET = 5` retries. Closing this requires
either a Lance multi-dataset atomic-commit primitive (filed upstream
alongside the two-phase delete request) or a manifest-layer journal
that replays staged commits on next open. Both are heavyweight; the
v1 stance is "narrowed window, documented residual, surface the loud
error when it fires."
The publisher-CAS contract is unchanged: a *concurrent writer* that
advances any of our touched tables between snapshot capture and
publisher commit produces exactly one winner. The residual above is
about *our* abandoned commits in the failure path, not about
concurrency races.
## Conflict shape
@ -59,40 +133,32 @@ list`.
`_graph_runs.lance` belongs in MR-770 (the production sweep) — this PR
stops *creating* run state but does not destroy legacy bytes on disk.
## Known limitation: mid-query partial failure on the same table
## Mid-query partial failure: closed by MR-794
A multi-statement `.gq` mutation where op-N writes a Lance fragment
successfully and op-N+1 then fails leaves the touched table at
`Lance HEAD = manifest_version + 1`. The query is atomic at the manifest
level (the publisher never publishes, so reads at the pinned manifest
version do *not* see op-N's data), but the *next* mutation against the
same table fails loudly with
`ManifestConflictDetails::ExpectedVersionMismatch` because
`ensure_expected_version` enforces strict equality between Lance HEAD and
the manifest's pinned version.
The pre-MR-794 design had a known limitation: a multi-statement `.gq`
mutation where op-N inline-committed a Lance fragment and op-N+1 then
failed left the touched table at `Lance HEAD = manifest_version + 1`,
blocking the next mutation with `ExpectedVersionMismatch`.
**Why the engine doesn't auto-rollback**: Lance's `Dataset::restore()` is
*not* a rewind — it appends a new commit (containing the desired
historical version's data) and advances HEAD further. There is no Lance
API to delete a committed version. A proper fix requires writing each
mutation's per-table fragments to a *transient Lance branch* on the
sub-table, then fast-forwarding main on success or dropping the branch
on failure. That work is tracked as a follow-up to MR-771; in the
meantime:
MR-794 (step 1 + step 2+) closed this for inserts/updates **by
construction at the writer layer**: insert and update batches accumulate
in memory; no Lance HEAD advance happens during op execution; one
`stage_*` + `commit_staged` per touched table runs at end-of-query, and
only after every op succeeded. A failed op leaves Lance HEAD untouched
on the staged tables, so the next mutation proceeds normally with no
drift to reconcile.
- **In practice this is rare.** Most schema-language validation
(`@key`, `@enum`, `@range`, intra-batch uniqueness, edge-endpoint
existence) runs *before* any Lance write inside the failing op, so
single-statement mutations never trip this. The narrow path is
multi-statement queries (`insert ... insert ...`,
`insert ... update ...`) where a late op fails on validation that
depends on earlier ops' staged data.
- **Workaround**: callers that hit this should refresh the handle and
retry the mutation; if Lance HEAD remains drifted the
`omnigraph cleanup` command will GC the orphan version once a later
successful commit on the same table moves HEAD past it. (`cleanup`
cannot reclaim an orphan that *is* the current Lance HEAD; that case
needs the per-table-branch follow-up to fully heal.)
The cancellation case (future drop mid-mutation) inherits the same
guarantee — the in-memory accumulator evaporates with the dropped task
and no Lance write was ever issued.
The cancellation case (future drop mid-mutation) has the same shape and
the same workaround.
For delete-touching mutations the legacy inline-commit shape is
preserved (Lance has no public two-phase delete in 4.0.0) — the same
narrow window remains. The parse-time D₂ rule prevents inserts/updates
from coexisting with deletes in one query, so a pure-delete failure
cannot drift any staged-table state. If a delete-only multi-table
mutation fails mid-cascade, the same workaround as before applies
(retry; rely on `omnigraph cleanup` once a later successful commit
moves HEAD past the orphan version). Closing this requires Lance to
expose `DeleteJob::execute_uncommitted`; tracked in MR-793 and a
Lance-upstream ticket.

View file

@ -22,7 +22,7 @@ OmniGraph is **not** a single Lance dataset; it is a *graph* of datasets coordin
- `edges/{fnv1a64-hex(edge_type_name)}` — one Lance dataset per edge type
- `__manifest/` — the catalog of all sub-tables and their published versions
- `_graph_commits.lance` / `_graph_commit_actors.lance` — the commit graph and its actor map
- `_graph_runs.lance` / `_graph_run_actors.lance` — the run registry and its actor map
- (legacy `_graph_runs.lance` / `_graph_run_actors.lance` from pre-v0.4.0 repos are inert; the run state machine was removed in MR-771 and these files are cleaned up via MR-770's production sweep)
- **Manifest row schema** (`object_id, object_type, location, metadata, base_objects, table_key, table_version, table_branch, row_count`):
- `object_type``table | table_version | table_tombstone`
- `table_key``node:<TypeName> | edge:<EdgeName>`
@ -63,14 +63,12 @@ flowchart TB
nodes["nodes/{fnv1a64-hex}/<br/>one dataset per node type"]:::l2
edges["edges/{fnv1a64-hex}/<br/>one dataset per edge type"]:::l2
cgraph["_graph_commits.lance/<br/>_graph_commit_actors.lance/"]:::l2
runs["_graph_runs.lance/<br/>_graph_run_actors.lance/"]:::l2
refs["_refs/branches/{name}.json<br/>graph-level branches"]:::l2
repo --> manifest
repo --> nodes
repo --> edges
repo --> cgraph
repo --> runs
repo --> refs
subgraph dataset[Inside each Lance dataset — L1]
@ -91,7 +89,7 @@ flowchart TB
- **Repo root** is one directory (or S3 prefix). Everything below is part of one OmniGraph repo.
- **`__manifest/`** is a Lance dataset whose rows describe which sub-table version is published at which graph-branch. Reading a snapshot starts here.
- **`nodes/`** and **`edges/`** are sibling directories holding one Lance dataset per declared type. Names are `fnv1a64-hex` of the type name to keep paths fixed-length and case-safe.
- **`_graph_commits.lance` / `_graph_runs.lance`** are L2 datasets that record the graph-level commit DAG and run registry respectively (each has a paired `*_actors.lance` for the actor map).
- **`_graph_commits.lance`** is an L2 dataset that records the graph-level commit DAG, with a paired `_graph_commit_actors.lance` for the actor map. (Pre-v0.4.0 repos also have inert `_graph_runs.lance` / `_graph_run_actors.lance` from the removed Run state machine; MR-770 sweeps these in production.)
- **`_refs/branches/{name}.json`** is graph-level branch metadata — pointers from a branch name to the manifest version it heads.
- **Inside each Lance dataset** (orange): the standard Lance directory layout. `_versions/{n}.manifest` records every commit; `data/` holds the actual Arrow fragments; `_indices/{uuid}/` holds index segments with their own `fragment_bitmap` for partial coverage; `_refs/` holds Lance-native per-dataset branches and tags.

View file

@ -19,7 +19,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
|---|---|
| `end_to_end.rs` | Full init → load → query/mutate flow |
| `branching.rs` | Branch create / list / delete, lazy fork |
| `runs.rs` | Transactional runs (begin/publish/abort), idempotency |
| `runs.rs` | Direct-publish writes: cancellation, concurrent-writer CAS, multi-statement atomicity, MR-794 staged-write rewire (D₂ rejection, insert+update coalesce, multi-append coalesce, partial-failure recovery, load RI/cardinality recovery) |
| `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead |
| `lifecycle.rs` | Repo lifecycle, schema state |
| `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) |
| `changes.rs` | `diff_between` / `diff_commits` |