MR-794 step 2: rewire loader for Append/Merge — Overwrite stays inline

load_jsonl_reader dispatches on mode: Append/Merge use the
MutationStaging accumulator (per-type batch staging, single stage_* +
commit_staged per touched table at end-of-load, publisher CAS).
Overwrite keeps the legacy concurrent inline-commit path
(truncate-then-append doesn't fit the staged shape; overwrite has no
in-flight read-your-writes requirement).

* New helpers collect_node_ids_with_pending and
  validate_edge_cardinality_with_pending_loader — loader analogs
  of the engine's pending-aware validators.
* Phase 2c (RI) and Phase 3 (cardinality) consult pending batches
  for Append/Merge so a mid-load failure aborts the load before any
  Lance write reaches HEAD.

A failed Append/Merge load no longer advances Lance HEAD on staged
tables — the next load on the same tables proceeds normally with no
ExpectedVersionMismatch. Overwrite mode's drift residual is unchanged
from today's behavior; documented in docs/runs.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-01 10:42:47 +02:00
parent e6f48ba24d
commit bbf610ea9b
No known key found for this signature in database

View file

@ -21,6 +21,7 @@ use serde_json::Value as JsonValue;
use crate::db::Omnigraph;
use crate::error::{OmniError, Result};
use crate::exec::staging::{MutationStaging, PendingMode};
/// Result of a load operation.
#[derive(Debug, Clone, Default)]
@ -304,21 +305,29 @@ async fn load_jsonl_reader<R: BufRead>(
}
}
// Phase 2: Build per-type RecordBatches and write to Lance.
//
// Writes to different tables are independent in Lance (each table has its
// own manifest + fragments), so we parallelize across types with a bounded
// concurrency limit. Serial writes against S3 were the dominant cost of
// load — batching and parallelizing per-type cuts wall time by roughly
// `LOAD_WRITE_CONCURRENCY`× for wide schemas (see MR-677).
// Phase 2: Build per-type RecordBatches and accumulate into the
// staging pipeline. For Append/Merge, batches go into an in-memory
// accumulator and a single `stage_*` + `commit_staged` per touched
// table runs at end-of-load — a mid-load failure (RI / cardinality
// violation) leaves Lance HEAD untouched. For Overwrite, the legacy
// inline-commit path is preserved (truncate+append doesn't fit the
// staged shape cleanly, and overwrite has no in-flight read-your-writes
// requirement).
let mut updates = Vec::new();
let mut result = LoadResult::default();
let snapshot = db.snapshot_for_branch(branch).await?;
// Capture per-table manifest versions before any write so the publisher
// CAS at commit-time can detect concurrent writers landing between our
// read snapshot and our publish.
let mut expected_table_versions: HashMap<String, u64> = HashMap::new();
let use_staging = !matches!(mode, LoadMode::Overwrite);
let mut staging = MutationStaging::default();
let mut overwrite_updates: Vec<crate::db::SubTableUpdate> = Vec::new();
let mut overwrite_expected: HashMap<String, u64> = HashMap::new();
let pending_mode = match mode {
LoadMode::Merge => PendingMode::Merge,
// Append-mode loads accumulate as Append. Edge tables (no @key)
// and no-key node tables stay safe on the stage_append path. The
// Merge mode applies dedupe-by-id; Append assumes unique inputs.
LoadMode::Append => PendingMode::Append,
LoadMode::Overwrite => PendingMode::Append, // unused
};
// Phase 2a: build and validate every node batch up front. Cheap and
// synchronous — surfaces validation errors before any S3 traffic.
@ -338,46 +347,89 @@ async fn load_jsonl_reader<R: BufRead>(
let entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
expected_table_versions.insert(table_key.clone(), entry.table_version);
if !use_staging {
overwrite_expected.insert(table_key.clone(), entry.table_version);
}
prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
}
// Phase 2b: write every node type concurrently, bounded.
let node_write_results = write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.nodes_loaded.insert(type_name, loaded_count);
// Phase 2b: write every node type. Append/Merge → in-memory
// accumulator. Overwrite → concurrent inline-commit (legacy path).
if use_staging {
for (type_name, table_key, batch, loaded_count) in prepared_nodes {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.nodes_loaded.insert(type_name, loaded_count);
}
} else {
let node_write_results =
write_batches_concurrently(db, branch, mode, prepared_nodes).await?;
for (type_name, table_key, loaded_count, state, table_branch) in node_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.nodes_loaded.insert(type_name, loaded_count);
}
}
// Phase 2b: Validate edge referential integrity — every src/dst must
// reference an existing node ID in the appropriate type.
// Phase 2c: Validate edge referential integrity — every src/dst must
// reference an existing node ID in the appropriate type. For staged
// loads, the lookup unions snapshot-committed IDs with the in-memory
// pending batches (which carry the just-staged node inserts).
for (edge_name, rows) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let from_ids = collect_node_ids(
db,
branch,
&edge_type.from_type,
&node_rows,
&catalog,
&updates,
)
.await?;
let to_ids = collect_node_ids(
db,
branch,
&edge_type.to_type,
&node_rows,
&catalog,
&updates,
)
.await?;
let from_ids = if use_staging {
collect_node_ids_with_pending(
db,
branch,
&edge_type.from_type,
&staging,
)
.await?
} else {
collect_node_ids(
db,
branch,
&edge_type.from_type,
&node_rows,
&catalog,
&overwrite_updates,
)
.await?
};
let to_ids = if use_staging {
collect_node_ids_with_pending(
db,
branch,
&edge_type.to_type,
&staging,
)
.await?
} else {
collect_node_ids(
db,
branch,
&edge_type.to_type,
&node_rows,
&catalog,
&overwrite_updates,
)
.await?
};
for (i, (src, dst, _)) in rows.iter().enumerate() {
if !from_ids.contains(src.as_str()) {
@ -401,7 +453,7 @@ async fn load_jsonl_reader<R: BufRead>(
}
}
// Write edges (parallel per edge type, same pattern as nodes)
// Phase 2d: build edge batches.
let mut prepared_edges: Vec<(String, String, RecordBatch, usize)> =
Vec::with_capacity(edge_rows.len());
for (edge_name, rows) in &edge_rows {
@ -417,29 +469,61 @@ async fn load_jsonl_reader<R: BufRead>(
let entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
expected_table_versions.insert(table_key.clone(), entry.table_version);
if !use_staging {
overwrite_expected.insert(table_key.clone(), entry.table_version);
}
prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
}
let edge_write_results = write_batches_concurrently(db, branch, mode, prepared_edges).await?;
for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.edges_loaded.insert(edge_name, loaded_count);
// Phase 2e: write every edge type. Same dispatch as Phase 2b.
if use_staging {
for (edge_name, table_key, batch, loaded_count) in prepared_edges {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
&table_key,
full_path,
table_branch,
expected_version,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
result.edges_loaded.insert(edge_name, loaded_count);
}
} else {
let edge_write_results =
write_batches_concurrently(db, branch, mode, prepared_edges).await?;
for (edge_name, table_key, loaded_count, state, table_branch) in edge_write_results {
overwrite_updates.push(crate::db::SubTableUpdate {
table_key,
table_version: state.version,
table_branch,
row_count: state.row_count,
version_metadata: state.version_metadata,
});
result.edges_loaded.insert(edge_name, loaded_count);
}
}
// Phase 3: Validate edge cardinality constraints (before commit — invalid
// data must not be committed). Opens edge sub-tables at their just-written
// versions, not through the snapshot (which still pins to pre-write state).
// Phase 3: Validate edge cardinality constraints (before commit —
// invalid data must not be committed). Staged path scans committed
// edges via Lance + iterates pending edges in-memory. Overwrite path
// opens the just-written version (legacy behavior).
for (edge_name, _) in &edge_rows {
let edge_type = &catalog.edge_types[edge_name];
let table_key = format!("edge:{}", edge_name);
if let Some(update) = updates.iter().find(|u| u.table_key == table_key) {
if use_staging {
validate_edge_cardinality_with_pending_loader(
db,
branch,
edge_type,
&table_key,
&staging,
)
.await?;
} else if let Some(update) = overwrite_updates.iter().find(|u| u.table_key == table_key) {
validate_edge_cardinality(
db,
branch,
@ -452,8 +536,18 @@ async fn load_jsonl_reader<R: BufRead>(
}
// Phase 4: Atomic manifest commit with publisher-level OCC.
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_table_versions)
if use_staging {
let (updates, expected_versions) = staging.finalize(db, branch).await?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
.await?;
} else {
db.commit_updates_on_branch_with_expected(
branch,
&overwrite_updates,
&overwrite_expected,
)
.await?;
}
Ok(result)
}
@ -1456,6 +1550,155 @@ pub(crate) async fn validate_edge_cardinality(
Ok(())
}
/// Validate edge `@card` cardinality with in-memory pending edges visible.
///
/// Loader-level analog to `exec::mutation::validate_edge_cardinality_with_pending`:
/// scans committed edges via Lance and unions counts with the pending
/// edge batches accumulated by the staged loader. Used by Append/Merge
/// loads (the Overwrite path uses `validate_edge_cardinality` which
/// opens the just-written Lance version).
async fn validate_edge_cardinality_with_pending_loader(
db: &Omnigraph,
branch: Option<&str>,
edge_type: &omnigraph_compiler::catalog::EdgeType,
table_key: &str,
staging: &MutationStaging,
) -> Result<()> {
if edge_type.cardinality.is_default() {
return Ok(());
}
let mut counts: HashMap<String, u32> = HashMap::new();
// Committed side: open at pre-write version (the snapshot pinned at
// load entry; pending writes haven't committed yet).
let snapshot = db.snapshot_for_branch(branch).await?;
if let Some(entry) = snapshot.entry(table_key) {
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
.await?;
let batches = db
.table_store()
.scan(&ds, Some(&["src"]), None, None)
.await?;
for batch in &batches {
let srcs = batch
.column_by_name("src")
.ok_or_else(|| OmniError::Lance("missing 'src' column".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?;
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
}
// Pending side: walk in-memory pending batches for `src`.
for batch in staging.pending_batches(table_key) {
let Some(col) = batch.column_by_name("src") else {
continue;
};
let Some(srcs) = col.as_any().downcast_ref::<StringArray>() else {
continue;
};
for i in 0..srcs.len() {
if srcs.is_valid(i) {
*counts.entry(srcs.value(i).to_string()).or_insert(0) += 1;
}
}
}
let card = &edge_type.cardinality;
for (src, count) in &counts {
if let Some(max) = card.max {
if *count > max {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (max {})",
edge_type.name, src, count, max
)));
}
}
if *count < card.min {
return Err(OmniError::manifest(format!(
"@card violation on edge {}: source '{}' has {} edges (min {})",
edge_type.name, src, count, card.min
)));
}
}
Ok(())
}
/// Collect all valid node IDs for a given type, with in-memory pending
/// node inserts visible. Used by the staged loader's Phase 2c
/// referential-integrity validation.
///
/// Union of:
/// - IDs from the staged loader's pending batches (in-memory; just-staged
/// inserts of this type)
/// - IDs from the committed sub-table at the pre-load snapshot version
async fn collect_node_ids_with_pending(
db: &Omnigraph,
branch: Option<&str>,
type_name: &str,
staging: &MutationStaging,
) -> Result<HashSet<String>> {
let mut ids = HashSet::new();
let table_key = format!("node:{}", type_name);
// From staging.pending: walk the in-memory accumulator's id column.
for batch in staging.pending_batches(&table_key) {
if let Some(col) = batch.column_by_name("id") {
if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
for i in 0..arr.len() {
if arr.is_valid(i) {
ids.insert(arr.value(i).to_string());
}
}
}
}
}
// From the committed Lance sub-table at the pre-load snapshot version.
let snapshot = db.snapshot_for_branch(branch).await?;
let Some(entry) = snapshot.entry(&table_key) else {
return Ok(ids);
};
let ds = db
.open_dataset_at_state(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
)
.await?;
let batches = db
.table_store()
.scan(&ds, Some(&["id"]), None, None)
.await?;
for batch in &batches {
let id_col = batch
.column_by_name("id")
.ok_or_else(|| OmniError::Lance("missing 'id' column".into()))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
for i in 0..batch.num_rows() {
ids.insert(id_col.value(i).to_string());
}
}
Ok(ids)
}
/// Collect all valid node IDs for a given type. Union of:
/// - IDs from the just-loaded batch (in memory, from node_rows)
/// - IDs from the sub-table at the just-written version (if it was updated)