From bbf610ea9b51edc07f10097c2649f3171d5f0c97 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 1 May 2026 10:42:47 +0200 Subject: [PATCH] =?UTF-8?q?MR-794=20step=202:=20rewire=20loader=20for=20Ap?= =?UTF-8?q?pend/Merge=20=E2=80=94=20Overwrite=20stays=20inline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit load_jsonl_reader dispatches on mode: Append/Merge use the MutationStaging accumulator (per-type batch staging, single stage_* + commit_staged per touched table at end-of-load, publisher CAS). Overwrite keeps the legacy concurrent inline-commit path (truncate-then-append doesn't fit the staged shape; overwrite has no in-flight read-your-writes requirement). * New helpers collect_node_ids_with_pending and validate_edge_cardinality_with_pending_loader — loader analogs of the engine's pending-aware validators. * Phase 2c (RI) and Phase 3 (cardinality) consult pending batches for Append/Merge so a mid-load failure aborts the load before any Lance write reaches HEAD. A failed Append/Merge load no longer advances Lance HEAD on staged tables — the next load on the same tables proceeds normally with no ExpectedVersionMismatch. Overwrite mode's drift residual is unchanged from today's behavior; documented in docs/runs.md. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/loader/mod.rs | 369 ++++++++++++++++++++++++----- 1 file changed, 306 insertions(+), 63 deletions(-) diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 405f560..d05a315 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -21,6 +21,7 @@ use serde_json::Value as JsonValue; use crate::db::Omnigraph; use crate::error::{OmniError, Result}; +use crate::exec::staging::{MutationStaging, PendingMode}; /// Result of a load operation. #[derive(Debug, Clone, Default)] @@ -304,21 +305,29 @@ async fn load_jsonl_reader( } } - // Phase 2: Build per-type RecordBatches and write to Lance. - // - // Writes to different tables are independent in Lance (each table has its - // own manifest + fragments), so we parallelize across types with a bounded - // concurrency limit. Serial writes against S3 were the dominant cost of - // load — batching and parallelizing per-type cuts wall time by roughly - // `LOAD_WRITE_CONCURRENCY`× for wide schemas (see MR-677). + // Phase 2: Build per-type RecordBatches and accumulate into the + // staging pipeline. For Append/Merge, batches go into an in-memory + // accumulator and a single `stage_*` + `commit_staged` per touched + // table runs at end-of-load — a mid-load failure (RI / cardinality + // violation) leaves Lance HEAD untouched. For Overwrite, the legacy + // inline-commit path is preserved (truncate+append doesn't fit the + // staged shape cleanly, and overwrite has no in-flight read-your-writes + // requirement). - let mut updates = Vec::new(); let mut result = LoadResult::default(); let snapshot = db.snapshot_for_branch(branch).await?; - // Capture per-table manifest versions before any write so the publisher - // CAS at commit-time can detect concurrent writers landing between our - // read snapshot and our publish. - let mut expected_table_versions: HashMap = HashMap::new(); + let use_staging = !matches!(mode, LoadMode::Overwrite); + let mut staging = MutationStaging::default(); + let mut overwrite_updates: Vec = Vec::new(); + let mut overwrite_expected: HashMap = HashMap::new(); + let pending_mode = match mode { + LoadMode::Merge => PendingMode::Merge, + // Append-mode loads accumulate as Append. Edge tables (no @key) + // and no-key node tables stay safe on the stage_append path. The + // Merge mode applies dedupe-by-id; Append assumes unique inputs. + LoadMode::Append => PendingMode::Append, + LoadMode::Overwrite => PendingMode::Append, // unused + }; // Phase 2a: build and validate every node batch up front. Cheap and // synchronous — surfaces validation errors before any S3 traffic. @@ -338,46 +347,89 @@ async fn load_jsonl_reader( let entry = snapshot .entry(&table_key) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; - expected_table_versions.insert(table_key.clone(), entry.table_version); + if !use_staging { + overwrite_expected.insert(table_key.clone(), entry.table_version); + } prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count)); } - // Phase 2b: write every node type concurrently, bounded. - let node_write_results = write_batches_concurrently(db, branch, mode, prepared_nodes).await?; - - for (type_name, table_key, loaded_count, state, table_branch) in node_write_results { - updates.push(crate::db::SubTableUpdate { - table_key, - table_version: state.version, - table_branch, - row_count: state.row_count, - version_metadata: state.version_metadata, - }); - result.nodes_loaded.insert(type_name, loaded_count); + // Phase 2b: write every node type. Append/Merge → in-memory + // accumulator. Overwrite → concurrent inline-commit (legacy path). + if use_staging { + for (type_name, table_key, batch, loaded_count) in prepared_nodes { + let (ds, full_path, table_branch) = db + .open_for_mutation_on_branch(branch, &table_key) + .await?; + let expected_version = ds.version().version; + staging.ensure_path( + &table_key, + full_path, + table_branch, + expected_version, + ); + let schema = batch.schema(); + staging.append_batch(&table_key, schema, pending_mode, batch)?; + result.nodes_loaded.insert(type_name, loaded_count); + } + } else { + let node_write_results = + write_batches_concurrently(db, branch, mode, prepared_nodes).await?; + for (type_name, table_key, loaded_count, state, table_branch) in node_write_results { + overwrite_updates.push(crate::db::SubTableUpdate { + table_key, + table_version: state.version, + table_branch, + row_count: state.row_count, + version_metadata: state.version_metadata, + }); + result.nodes_loaded.insert(type_name, loaded_count); + } } - // Phase 2b: Validate edge referential integrity — every src/dst must - // reference an existing node ID in the appropriate type. + // Phase 2c: Validate edge referential integrity — every src/dst must + // reference an existing node ID in the appropriate type. For staged + // loads, the lookup unions snapshot-committed IDs with the in-memory + // pending batches (which carry the just-staged node inserts). for (edge_name, rows) in &edge_rows { let edge_type = &catalog.edge_types[edge_name]; - let from_ids = collect_node_ids( - db, - branch, - &edge_type.from_type, - &node_rows, - &catalog, - &updates, - ) - .await?; - let to_ids = collect_node_ids( - db, - branch, - &edge_type.to_type, - &node_rows, - &catalog, - &updates, - ) - .await?; + let from_ids = if use_staging { + collect_node_ids_with_pending( + db, + branch, + &edge_type.from_type, + &staging, + ) + .await? + } else { + collect_node_ids( + db, + branch, + &edge_type.from_type, + &node_rows, + &catalog, + &overwrite_updates, + ) + .await? + }; + let to_ids = if use_staging { + collect_node_ids_with_pending( + db, + branch, + &edge_type.to_type, + &staging, + ) + .await? + } else { + collect_node_ids( + db, + branch, + &edge_type.to_type, + &node_rows, + &catalog, + &overwrite_updates, + ) + .await? + }; for (i, (src, dst, _)) in rows.iter().enumerate() { if !from_ids.contains(src.as_str()) { @@ -401,7 +453,7 @@ async fn load_jsonl_reader( } } - // Write edges (parallel per edge type, same pattern as nodes) + // Phase 2d: build edge batches. let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> = Vec::with_capacity(edge_rows.len()); for (edge_name, rows) in &edge_rows { @@ -417,29 +469,61 @@ async fn load_jsonl_reader( let entry = snapshot .entry(&table_key) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; - expected_table_versions.insert(table_key.clone(), entry.table_version); + if !use_staging { + overwrite_expected.insert(table_key.clone(), entry.table_version); + } prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count)); } - let edge_write_results = write_batches_concurrently(db, branch, mode, prepared_edges).await?; - - for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results { - updates.push(crate::db::SubTableUpdate { - table_key, - table_version: state.version, - table_branch, - row_count: state.row_count, - version_metadata: state.version_metadata, - }); - result.edges_loaded.insert(edge_name, loaded_count); + // Phase 2e: write every edge type. Same dispatch as Phase 2b. + if use_staging { + for (edge_name, table_key, batch, loaded_count) in prepared_edges { + let (ds, full_path, table_branch) = db + .open_for_mutation_on_branch(branch, &table_key) + .await?; + let expected_version = ds.version().version; + staging.ensure_path( + &table_key, + full_path, + table_branch, + expected_version, + ); + let schema = batch.schema(); + staging.append_batch(&table_key, schema, pending_mode, batch)?; + result.edges_loaded.insert(edge_name, loaded_count); + } + } else { + let edge_write_results = + write_batches_concurrently(db, branch, mode, prepared_edges).await?; + for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results { + overwrite_updates.push(crate::db::SubTableUpdate { + table_key, + table_version: state.version, + table_branch, + row_count: state.row_count, + version_metadata: state.version_metadata, + }); + result.edges_loaded.insert(edge_name, loaded_count); + } } - // Phase 3: Validate edge cardinality constraints (before commit — invalid - // data must not be committed). Opens edge sub-tables at their just-written - // versions, not through the snapshot (which still pins to pre-write state). + // Phase 3: Validate edge cardinality constraints (before commit — + // invalid data must not be committed). Staged path scans committed + // edges via Lance + iterates pending edges in-memory. Overwrite path + // opens the just-written version (legacy behavior). for (edge_name, _) in &edge_rows { + let edge_type = &catalog.edge_types[edge_name]; let table_key = format!("edge:{}", edge_name); - if let Some(update) = updates.iter().find(|u| u.table_key == table_key) { + if use_staging { + validate_edge_cardinality_with_pending_loader( + db, + branch, + edge_type, + &table_key, + &staging, + ) + .await?; + } else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) { validate_edge_cardinality( db, branch, @@ -452,8 +536,18 @@ async fn load_jsonl_reader( } // Phase 4: Atomic manifest commit with publisher-level OCC. - db.commit_updates_on_branch_with_expected(branch, &updates, &expected_table_versions) + if use_staging { + let (updates, expected_versions) = staging.finalize(db, branch).await?; + db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions) + .await?; + } else { + db.commit_updates_on_branch_with_expected( + branch, + &overwrite_updates, + &overwrite_expected, + ) .await?; + } Ok(result) } @@ -1456,6 +1550,155 @@ pub(crate) async fn validate_edge_cardinality( Ok(()) } +/// Validate edge `@card` cardinality with in-memory pending edges visible. +/// +/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`: +/// scans committed edges via Lance and unions counts with the pending +/// edge batches accumulated by the staged loader. Used by Append/Merge +/// loads (the Overwrite path uses `validate_edge_cardinality` which +/// opens the just-written Lance version). +async fn validate_edge_cardinality_with_pending_loader( + db: &Omnigraph, + branch: Option<&str>, + edge_type: &omnigraph_compiler::catalog::EdgeType, + table_key: &str, + staging: &MutationStaging, +) -> Result<()> { + if edge_type.cardinality.is_default() { + return Ok(()); + } + + let mut counts: HashMap = HashMap::new(); + + // Committed side: open at pre-write version (the snapshot pinned at + // load entry; pending writes haven't committed yet). + let snapshot = db.snapshot_for_branch(branch).await?; + if let Some(entry) = snapshot.entry(table_key) { + let ds = db + .open_dataset_at_state( + &entry.table_path, + entry.table_branch.as_deref(), + entry.table_version, + ) + .await?; + let batches = db + .table_store() + .scan(&ds, Some(&["src"]), None, None) + .await?; + for batch in &batches { + let srcs = batch + .column_by_name("src") + .ok_or_else(|| OmniError::Lance("missing 'src' column".into()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?; + for i in 0..srcs.len() { + if srcs.is_valid(i) { + *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; + } + } + } + } + + // Pending side: walk in-memory pending batches for `src`. + for batch in staging.pending_batches(table_key) { + let Some(col) = batch.column_by_name("src") else { + continue; + }; + let Some(srcs) = col.as_any().downcast_ref::() else { + continue; + }; + for i in 0..srcs.len() { + if srcs.is_valid(i) { + *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; + } + } + } + + let card = &edge_type.cardinality; + for (src, count) in &counts { + if let Some(max) = card.max { + if *count > max { + return Err(OmniError::manifest(format!( + "@card violation on edge {}: source '{}' has {} edges (max {})", + edge_type.name, src, count, max + ))); + } + } + if *count < card.min { + return Err(OmniError::manifest(format!( + "@card violation on edge {}: source '{}' has {} edges (min {})", + edge_type.name, src, count, card.min + ))); + } + } + + Ok(()) +} + +/// Collect all valid node IDs for a given type, with in-memory pending +/// node inserts visible. Used by the staged loader's Phase 2c +/// referential-integrity validation. +/// +/// Union of: +/// - IDs from the staged loader's pending batches (in-memory; just-staged +/// inserts of this type) +/// - IDs from the committed sub-table at the pre-load snapshot version +async fn collect_node_ids_with_pending( + db: &Omnigraph, + branch: Option<&str>, + type_name: &str, + staging: &MutationStaging, +) -> Result> { + let mut ids = HashSet::new(); + let table_key = format!("node:{}", type_name); + + // From staging.pending: walk the in-memory accumulator's id column. + for batch in staging.pending_batches(&table_key) { + if let Some(col) = batch.column_by_name("id") { + if let Some(arr) = col.as_any().downcast_ref::() { + for i in 0..arr.len() { + if arr.is_valid(i) { + ids.insert(arr.value(i).to_string()); + } + } + } + } + } + + // From the committed Lance sub-table at the pre-load snapshot version. + let snapshot = db.snapshot_for_branch(branch).await?; + let Some(entry) = snapshot.entry(&table_key) else { + return Ok(ids); + }; + let ds = db + .open_dataset_at_state( + &entry.table_path, + entry.table_branch.as_deref(), + entry.table_version, + ) + .await?; + + let batches = db + .table_store() + .scan(&ds, Some(&["id"]), None, None) + .await?; + + for batch in &batches { + let id_col = batch + .column_by_name("id") + .ok_or_else(|| OmniError::Lance("missing 'id' column".into()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?; + for i in 0..batch.num_rows() { + ids.insert(id_col.value(i).to_string()); + } + } + + Ok(ids) +} + /// Collect all valid node IDs for a given type. Union of: /// - IDs from the just-loaded batch (in memory, from node_rows) /// - IDs from the sub-table at the just-written version (if it was updated)