diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index d7513a8..7397074 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -341,6 +341,7 @@ fn build_insert_batch( async fn validate_edge_insert_endpoints( db: &Omnigraph, staging: &MutationStaging, + branch: Option<&str>, edge_name: &str, assignments: &HashMap, ) -> Result<()> { @@ -382,45 +383,57 @@ async fn validate_edge_insert_endpoints( } }; - ensure_node_id_exists(db, staging, &edge_type.from_type, from, "src").await?; - ensure_node_id_exists(db, staging, &edge_type.to_type, to, "dst").await?; + ensure_node_id_exists(db, staging, branch, &edge_type.from_type, from, "src").await?; + ensure_node_id_exists(db, staging, branch, &edge_type.to_type, to, "dst").await?; Ok(()) } +/// Quick scan of pending batches for an `id` value match. Used by the +/// mutation path's edge endpoint validation to satisfy read-your-writes +/// for same-query inserts before they're committed to Lance. +fn pending_batches_contain_id(batches: &[RecordBatch], id: &str) -> bool { + for batch in batches { + let Some(col) = batch.column_by_name("id") else { + continue; + }; + let Some(arr) = col.as_any().downcast_ref::() else { + continue; + }; + for i in 0..arr.len() { + if arr.is_valid(i) && arr.value(i) == id { + return true; + } + } + } + false +} + async fn ensure_node_id_exists( db: &Omnigraph, staging: &MutationStaging, + branch: Option<&str>, node_type: &str, id: &str, label: &str, ) -> Result<()> { let table_key = format!("node:{}", node_type); - let filter = format!("id = '{}'", id.replace('\'', "''")); - // Prefer the in-query staged dataset so a same-query insert of the - // referenced node is visible to this validation. Fall back to the - // pre-mutation manifest snapshot when no prior op touched this table. - let exists = if let Some(staged) = staging.latest.get(&table_key) { - let ds = db - .reopen_for_mutation( - &table_key, - &staged.full_path, - staged.table_branch.as_deref(), - staged.table_version, - ) - .await?; - ds.count_rows(Some(filter)) - .await - .map_err(|e| OmniError::Lance(e.to_string()))? - > 0 - } else { - let snapshot = db.snapshot(); - let ds = snapshot.open(&table_key).await?; - ds.count_rows(Some(filter)) - .await - .map_err(|e| OmniError::Lance(e.to_string()))? - > 0 - }; + // Prefer the in-query pending accumulator so a same-query insert of + // the referenced node is visible to this validation. Fall back to + // the pre-mutation manifest snapshot when nothing pending matches. + let pending = staging.pending_batches(&table_key); + if pending_batches_contain_id(pending, id) { + return Ok(()); + } + + let filter = format!("id = '{}'", id.replace('\'', "''")); + let snapshot = db.snapshot_for_branch(branch).await?; + let ds = snapshot.open(&table_key).await?; + let exists = ds + .count_rows(Some(filter)) + .await + .map_err(|e| OmniError::Lance(e.to_string()))? + > 0; if exists { Ok(()) @@ -469,10 +482,22 @@ fn predicate_to_sql( } /// Replace specific columns in a RecordBatch with new literal values. -/// Blob columns are excluded from the scan result, so assigned blob values are -/// synthesized from the full table schema and included inline in the update -/// batch. Unassigned blob columns are omitted so merge_insert leaves them -/// untouched. +/// +/// Blob columns may or may not be present in `batch` depending on the +/// caller's scan projection: +/// - If `batch` does NOT contain a blob column AND it has no assignment, +/// the column is OMITTED from the output. `merge_insert` leaves it +/// untouched. +/// - If `batch` DOES contain a blob column AND it has no assignment, the +/// column is COPIED to the output. This enables coalescing of +/// different-shape updates into a single full-schema merge batch (the +/// per-table accumulator in `MutationStaging` requires consistent +/// schemas across pending batches for `concat_batches`). The +/// round-tripping cost is acceptable for typical agent-driven +/// mutations; tables with large blobs and unassigned-blob updates may +/// want to be split into separate queries. +/// - If a blob column has a string-URI assignment, build the blob array +/// inline. fn apply_assignments( full_schema: &SchemaRef, batch: &RecordBatch, @@ -484,12 +509,8 @@ fn apply_assignments( for field in full_schema.fields().iter() { if blob_properties.contains(field.name()) { - // Blob columns aren't in the scan result. If this blob has an - // assignment, build the blob array inline so the single - // merge_insert covers both scalar and blob updates. Unassigned - // blob columns are omitted — merge_insert only touches columns - // present in the batch. if let Some(Literal::String(uri)) = assignments.get(field.name()) { + // Assigned: build a single blob column from the URI. let mut builder = BlobArrayBuilder::new(batch.num_rows()); for _ in 0..batch.num_rows() { crate::loader::append_blob_value(&mut builder, uri)?; @@ -501,8 +522,17 @@ fn apply_assignments( .finish() .map_err(|e| OmniError::Lance(e.to_string()))?, ); + } else if let Some(col) = batch.column_by_name(field.name()) { + // Unassigned but scan included it: copy through (writes + // back the same blob, no observable change but uniform + // schema for the accumulator). + let blob_field = lance::blob::blob_field(field.name(), field.is_nullable()); + out_fields.push(blob_field); + columns.push(col.clone()); } - // else: no assignment for this blob column — skip it + // else: scan did not include this blob column and no + // assignment — omit. Caller's accumulator must accept the + // narrower schema (legacy single-merge_insert path). } else if let Some(lit) = assignments.get(field.name()) { out_fields.push(field.as_ref().clone()); columns.push(literal_to_typed_array( @@ -528,197 +558,74 @@ fn apply_assignments( // ─── Mutation execution ────────────────────────────────────────────────────── -/// Per-query staging state for direct-to-target mutations. Replaces the -/// `__run__` staging branch with an in-memory accumulator. +use super::staging::{MutationStaging, PendingMode}; + +/// Open a sub-table dataset for read or inline-commit-write within the +/// current mutation query, capturing pre-write metadata in `staging` on +/// first touch. The captured version is the publisher's CAS fence at +/// end-of-query (per-table OCC). /// -/// Each unique table touched by the mutation is captured at first-open time: -/// - `expected_versions[table_key]` records the manifest version we observed -/// pre-write — the publisher's CAS fence at end-of-query. -/// - `latest[table_key]` records the most recent post-write state on that -/// table — used to thread between ops within the query so subsequent ops -/// see prior writes (read-your-writes). +/// The dataset is always opened at HEAD on the requested branch. Under +/// the staged-write rewire (MR-794 step 2+), no per-op commit advances +/// HEAD, so subsequent opens within the same query observe the same +/// version. The exception is the inline-commit delete path +/// (`execute_delete_*`), which still advances HEAD per-op — but D₂ at +/// parse time prevents inserts/updates and deletes from coexisting in +/// one query, so this can't conflict with a pending-batch read. +async fn open_table_for_mutation( + db: &Omnigraph, + staging: &mut MutationStaging, + branch: Option<&str>, + table_key: &str, +) -> Result<(Dataset, String, Option)> { + let (ds, full_path, table_branch) = + db.open_for_mutation_on_branch(branch, table_key).await?; + let expected_version = ds.version().version; + staging.ensure_path( + table_key, + full_path.clone(), + table_branch.clone(), + expected_version, + ); + Ok((ds, full_path, table_branch)) +} + +/// D₂ parse-time check: a single mutation query is either insert/update-only +/// or delete-only. Mixed → reject before any I/O. /// -/// **Known limitation (mid-query partial failure).** If op-N succeeds at the -/// Lance level (a fragment is committed, advancing the table's Lance HEAD) -/// and op-N+1 then fails before the publisher commits, the table is left -/// with `Lance HEAD > manifest_version`. The next `mutate_as` against the -/// same table will surface `ExpectedVersionMismatch` (Lance HEAD ahead of -/// the manifest snapshot). Lance's `restore()` is *not* a rewind — it -/// creates a new commit, monotonically advancing the version. The proper -/// fix uses Lance's distributed-write API (`write_fragments` / -/// `Scanner::with_fragments` / `Operation::Append { fragments }`) — see -/// [MR-794](https://linear.app/modernrelay/issue/MR-794). The staging -/// primitives `TableStore::stage_append` / `stage_merge_insert` / -/// `commit_staged` / `scan_with_staged` are in place; full integration -/// with this struct is the MR-794 follow-up. In practice this path is -/// narrow today: most validation runs before any Lance write, so -/// single-statement mutations are unaffected. See `docs/runs.md`. -#[derive(Default)] -struct MutationStaging { - expected_versions: HashMap, - latest: HashMap, -} - -struct StagedTable { - table_key: String, - table_branch: Option, - table_version: u64, - row_count: u64, - full_path: String, - version_metadata: crate::db::manifest::TableVersionMetadata, -} - -trait IntoStagedRecord { - fn version(&self) -> u64; - fn row_count(&self) -> u64; - fn version_metadata(&self) -> &crate::db::manifest::TableVersionMetadata; -} - -impl IntoStagedRecord for crate::table_store::TableState { - fn version(&self) -> u64 { - self.version - } - fn row_count(&self) -> u64 { - self.row_count - } - fn version_metadata(&self) -> &crate::db::manifest::TableVersionMetadata { - &self.version_metadata - } -} - -impl IntoStagedRecord for crate::table_store::DeleteState { - fn version(&self) -> u64 { - self.version - } - fn row_count(&self) -> u64 { - self.row_count - } - fn version_metadata(&self) -> &crate::db::manifest::TableVersionMetadata { - &self.version_metadata - } -} - -impl MutationStaging { - fn is_empty(&self) -> bool { - self.latest.is_empty() - } - - fn record( - &mut self, - table_key: String, - full_path: String, - table_branch: Option, - state: &S, - ) { - self.latest.insert( - table_key.clone(), - StagedTable { - table_key, - table_branch, - table_version: state.version(), - row_count: state.row_count(), - full_path, - version_metadata: state.version_metadata().clone(), - }, - ); - } - - fn into_updates(self) -> (Vec, HashMap) { - let updates = self - .latest - .into_values() - .map(|st| crate::db::SubTableUpdate { - table_key: st.table_key, - table_version: st.table_version, - table_branch: st.table_branch, - row_count: st.row_count, - version_metadata: st.version_metadata, - }) - .collect(); - (updates, self.expected_versions) - } -} - -/// RAII helper that restores `Omnigraph::coordinator` on drop. Used by -/// `mutate_with_current_actor` so a panic between the coordinator swap and -/// the explicit restore (e.g. an assertion deep inside Lance) does not -/// leave the handle pinned to the requested branch indefinitely. The -/// captured coordinator is `take`n on drop and assigned via the -/// (synchronous) `restore_coordinator` accessor. -/// -/// Holds a bare `*mut Omnigraph` (no lifetime parameter) deliberately: -/// borrowing the engine through this guard would lock out the rest of -/// `mutate_with_current_actor` from calling `&mut self` methods on the -/// engine while the guard is alive. The unsafe is bounded by the -/// constructor contract — the caller must not let the guard outlive the -/// `&mut self` it was built from. In practice this is enforced by the -/// guard being assigned to a stack-local `_guard` binding inside one -/// function and never moved out. -struct CoordinatorRestoreGuard { - db: *mut Omnigraph, - previous: Option, -} - -// SAFETY: the pointer addresses an `Omnigraph`, which is `Send`. The guard -// is short-lived and the only operation it performs is the sync -// `restore_coordinator` field assignment in `Drop`. No reference is shared -// across threads — the future holding the guard moves between threads -// (e.g. when an Axum handler is awaited on a worker), and the swap-back is -// always invoked at most once on whichever thread runs `Drop`. -unsafe impl Send for CoordinatorRestoreGuard {} - -impl CoordinatorRestoreGuard { - /// SAFETY: `db` must outlive the returned guard, and the caller must - /// not move the guard outside the borrow scope of `db`. - fn new(db: &mut Omnigraph, previous: crate::db::GraphCoordinator) -> Self { - Self { - db: db as *mut Omnigraph, - previous: Some(previous), - } - } -} - -impl Drop for CoordinatorRestoreGuard { - fn drop(&mut self) { - if let Some(prev) = self.previous.take() { - // SAFETY: per the `new` contract, `db` is still valid here. - // `restore_coordinator` is a sync field assignment and does not - // re-enter the runtime. - unsafe { - (*self.db).restore_coordinator(prev); +/// Reason: under the staged-write rewire (MR-794 step 2+), inserts and +/// updates accumulate in memory and commit at end-of-query, while deletes +/// still inline-commit (Lance lacks a public two-phase delete in 4.0.0). +/// Mixing creates ordering hazards (same-row insert→delete becomes a no-op +/// because the staged insert isn't visible to delete; cascading deletes +/// of just-inserted edges break referential integrity by silent design). +/// Until Lance exposes `DeleteJob::execute_uncommitted`, the parse-time +/// rejection keeps both paths atomic and correct. +fn enforce_no_mixed_destructive_constructive( + ir: &omnigraph_compiler::ir::MutationIR, +) -> Result<()> { + let mut has_constructive = false; + let mut has_delete = false; + for op in &ir.ops { + match op { + MutationOpIR::Insert { .. } | MutationOpIR::Update { .. } => { + has_constructive = true; + } + MutationOpIR::Delete { .. } => { + has_delete = true; } } } -} - -/// Open a sub-table dataset for write in the current mutation query. On the -/// first touch of a table, captures the pre-write manifest version into -/// `staging.expected_versions` so the publisher can enforce OCC. On -/// subsequent touches, re-opens the dataset at the locally-staged version -/// (the version we wrote in a prior op of the same query) — bypassing the -/// manifest because nothing has been committed yet. -async fn open_for_mutation_in_query( - db: &Omnigraph, - staging: &mut MutationStaging, - table_key: &str, -) -> Result<(Dataset, String, Option)> { - if let Some(staged) = staging.latest.get(table_key) { - let ds = db - .reopen_for_mutation( - table_key, - &staged.full_path, - staged.table_branch.as_deref(), - staged.table_version, - ) - .await?; - return Ok((ds, staged.full_path.clone(), staged.table_branch.clone())); + if has_constructive && has_delete { + return Err(OmniError::manifest(format!( + "mutation '{}' on the same query mixes inserts/updates and deletes; \ + split into separate mutations: (1) inserts and updates, then (2) deletes. \ + This restriction lifts when Lance exposes a two-phase delete API \ + (tracked: MR-793 / Lance-upstream).", + ir.name + ))); } - let (ds, full_path, table_branch) = db.open_for_mutation(table_key).await?; - staging - .expected_versions - .entry(table_key.to_string()) - .or_insert(ds.version().version); - Ok((ds, full_path, table_branch)) + Ok(()) } impl Omnigraph { @@ -769,57 +676,40 @@ impl Omnigraph { } let resolved_params = enrich_mutation_params(params)?; - // Direct-to-target write path. Per-query staging captures pre-write - // manifest versions (publisher CAS fence) and threads dataset state - // across ops to maintain read-your-writes within a multi-statement - // query without per-op manifest commits. + // Per-query staging accumulator. Inserts and updates push batches + // into `pending`; deletes still inline-commit and record into + // `inline_committed`. At end-of-query, `finalize` issues one + // `stage_*` + `commit_staged` per pending table, then the + // publisher commits the manifest atomically across all touched + // tables. Branch is threaded explicitly — no coordinator swap. let mut staging = MutationStaging::default(); - let current = self.active_branch().map(str::to_string); - let needs_swap = requested.as_deref() != current.as_deref(); - - // RAII guard for coordinator state. If we swapped to the requested - // branch, the original coordinator is captured here and unconditionally - // restored on drop — including on panic from `execute_named_mutation` - // or the publisher. Without this, a Lance-internal panic between swap - // and restore would leave the handle pinned to the wrong branch for - // its remaining lifetime. - let _guard = if needs_swap { - let previous = self.swap_coordinator_for_branch(requested.as_deref()).await?; - Some(CoordinatorRestoreGuard::new(self, previous)) - } else { - None - }; - let exec_result = self - .execute_named_mutation(query_source, query_name, &resolved_params, &mut staging) + .execute_named_mutation( + query_source, + query_name, + &resolved_params, + requested.as_deref(), + &mut staging, + ) .await; - let publish_result = match exec_result { + match exec_result { Err(e) => Err(e), + Ok(total) if staging.is_empty() => Ok(total), Ok(total) => { - if staging.is_empty() { - Ok(total) - } else { - let (updates, expected_versions) = staging.into_updates(); - match self - .commit_updates_on_branch_with_expected( - requested.as_deref(), - &updates, - &expected_versions, - ) - .await - { - Ok(_) => Ok(total), - Err(e) => Err(e), - } - } + let (updates, expected_versions) = staging + .finalize(self, requested.as_deref()) + .await?; + self.commit_updates_on_branch_with_expected( + requested.as_deref(), + &updates, + &expected_versions, + ) + .await?; + Ok(total) } - }; - - // `_guard` drops here and restores the coordinator (also restores on - // any panic that unwound through this function above). - publish_result + } } async fn execute_named_mutation( @@ -827,6 +717,7 @@ impl Omnigraph { query_source: &str, query_name: &str, params: &ParamMap, + branch: Option<&str>, staging: &mut MutationStaging, ) -> Result { let query_decl = omnigraph_compiler::find_named_query(query_source, query_name) @@ -843,6 +734,8 @@ impl Omnigraph { } let ir = lower_mutation_query(&query_decl)?; + // D₂: reject mixed insert/update + delete before any I/O. + enforce_no_mixed_destructive_constructive(&ir)?; let mut total = MutationResult::default(); for op in &ir.ops { @@ -851,7 +744,7 @@ impl Omnigraph { type_name, assignments, } => { - self.execute_insert(type_name, assignments, params, staging) + self.execute_insert(type_name, assignments, params, branch, staging) .await? } MutationOpIR::Update { @@ -859,14 +752,21 @@ impl Omnigraph { assignments, predicate, } => { - self.execute_update(type_name, assignments, predicate, params, staging) - .await? + self.execute_update( + type_name, + assignments, + predicate, + params, + branch, + staging, + ) + .await? } MutationOpIR::Delete { type_name, predicate, } => { - self.execute_delete(type_name, predicate, params, staging) + self.execute_delete(type_name, predicate, params, branch, staging) .await? } }; @@ -881,6 +781,7 @@ impl Omnigraph { type_name: &str, assignments: &[IRAssignment], params: &ParamMap, + branch: Option<&str>, staging: &mut MutationStaging, ) -> Result { let mut resolved: HashMap = HashMap::new(); @@ -923,13 +824,18 @@ impl Omnigraph { } let has_key = node_type.key_property().is_some(); let table_key = format!("node:{}", type_name); - let (state, full_path, table_branch) = if has_key { - self.upsert_batch_staged(&table_key, staging, schema, batch).await? + // Capture pre-write metadata on first touch (no Lance write). + let (_ds, _full_path, _table_branch) = + open_table_for_mutation(self, staging, branch, &table_key).await?; + // Accumulate. @key inserts go into the Merge stream (so a + // later update on the same id coalesces correctly); no-key + // inserts go into the Append stream. + let mode = if has_key { + PendingMode::Merge } else { - self.append_batch_staged(&table_key, staging, schema, batch).await? + PendingMode::Append }; - - staging.record(table_key, full_path, table_branch, &state); + staging.append_batch(&table_key, schema, mode, batch)?; Ok(MutationResult { affected_nodes: 1, @@ -942,7 +848,7 @@ impl Omnigraph { let id = ulid::Ulid::new().to_string(); let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; - validate_edge_insert_endpoints(self, staging, type_name, &resolved).await?; + validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?; crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?; let unique_props = crate::loader::unique_property_names_for_edge(edge_type); if !unique_props.is_empty() { @@ -952,23 +858,27 @@ impl Omnigraph { &unique_props, )?; } - let active_branch = self.active_branch().map(str::to_string); let table_key = format!("edge:{}", type_name); - let (state, full_path, table_branch) = self - .append_batch_staged(&table_key, staging, schema, batch) - .await?; + // Capture pre-write metadata on first touch (no Lance write). + let (ds, _full_path, _table_branch) = + open_table_for_mutation(self, staging, branch, &table_key).await?; + // Accumulate the new edge row. Edge IDs are ULID-generated so + // Append mode is correct (no key-based dedup needed). + staging.append_batch(&table_key, schema, PendingMode::Append, batch.clone())?; - crate::loader::validate_edge_cardinality( + // Edge cardinality validation: scan committed edges via Lance + // + iterate pending edges in-memory for the `src` column, + // group-by-src. The pending side already includes the row + // we just appended (above). + validate_edge_cardinality_with_pending( self, - active_branch.as_deref(), - type_name, - state.version, - table_branch.as_deref(), + &ds, + staging, + &table_key, + edge_type, ) .await?; - staging.record(table_key, full_path, table_branch, &state); - self.invalidate_graph_index().await; Ok(MutationResult { @@ -980,57 +890,13 @@ impl Omnigraph { } } - /// Append a batch to a sub-table within an in-flight mutation, routing - /// through `MutationStaging` so subsequent ops in the same query see the - /// write before publish. Returns the new state, the full path, and the - /// table branch. - async fn append_batch_staged( - &self, - table_key: &str, - staging: &mut MutationStaging, - _schema: SchemaRef, - batch: RecordBatch, - ) -> Result<(crate::table_store::TableState, String, Option)> { - let (mut ds, full_path, table_branch) = - open_for_mutation_in_query(self, staging, table_key).await?; - let state = self - .table_store() - .append_batch(&full_path, &mut ds, batch) - .await?; - Ok((state, full_path, table_branch)) - } - - /// Upsert a batch into a sub-table using merge_insert keyed by "id", - /// routing through `MutationStaging`. - async fn upsert_batch_staged( - &self, - table_key: &str, - staging: &mut MutationStaging, - _schema: SchemaRef, - batch: RecordBatch, - ) -> Result<(crate::table_store::TableState, String, Option)> { - let (ds, full_path, table_branch) = - open_for_mutation_in_query(self, staging, table_key).await?; - let state = self - .table_store() - .merge_insert_batch( - &full_path, - ds, - batch, - vec!["id".to_string()], - lance::dataset::WhenMatched::UpdateAll, - lance::dataset::WhenNotMatched::InsertAll, - ) - .await?; - Ok((state, full_path, table_branch)) - } - async fn execute_update( &mut self, type_name: &str, assignments: &[IRAssignment], predicate: &IRMutationPredicate, params: &ParamMap, + branch: Option<&str>, staging: &mut MutationStaging, ) -> Result { // Defense in depth: ensure this is a node type @@ -1056,23 +922,39 @@ impl Omnigraph { let blob_props = self.catalog().node_types[type_name].blob_properties.clone(); let table_key = format!("node:{}", type_name); - let (ds, full_path, table_branch) = - open_for_mutation_in_query(self, staging, &table_key).await?; - let initial_version = ds.version().version; + let (ds, _full_path, _table_branch) = + open_table_for_mutation(self, staging, branch, &table_key).await?; + // Scan committed via Lance + apply the same predicate to pending + // batches via DataFusion `MemTable` (read-your-writes for prior + // ops in this query). The pending side may include rows from + // earlier `insert` / `update` ops on the same table. + // + // For blob tables we project away the blob columns: Lance's + // scanner doesn't accept the standard projection path on blob + // descriptors and would panic with a `Field::project` assertion. + // The downstream `apply_assignments` synthesizes blob columns + // from explicit assignments and omits unassigned blobs (Lance's + // merge_insert leaves them untouched). Tables without blob + // columns scan the full schema unprojected. let non_blob_cols: Vec<&str> = schema .fields() .iter() .filter(|f| !blob_props.contains(f.name())) .map(|f| f.name().as_str()) .collect(); + let projection: Option<&[&str]> = + (!blob_props.is_empty()).then_some(non_blob_cols.as_slice()); + let pending_batches = staging.pending_batches(&table_key); + let pending_schema = staging.pending_schema(&table_key); let batches = self .table_store() - .scan( + .scan_with_pending( &ds, - (!blob_props.is_empty()).then_some(non_blob_cols.as_slice()), + pending_batches, + pending_schema, + projection, Some(&pred_sql), - None, ) .await?; @@ -1083,13 +965,12 @@ impl Omnigraph { }); } - let matched = if batches.len() == 1 { - batches.into_iter().next().unwrap() - } else { - let s = batches[0].schema(); - arrow_select::concat::concat_batches(&s, &batches) - .map_err(|e| OmniError::Lance(e.to_string()))? - }; + // Concat the matched batches into one. The pending side may have + // a slightly different schema (e.g. Lance's `_rowid` column on + // the committed side, missing on the pending side). Normalize by + // dropping any `_rowid` / `_rowaddr` columns and reordering to + // the table's canonical schema. + let matched = concat_match_batches_to_schema(&schema, &blob_props, batches)?; let affected_count = matched.num_rows(); @@ -1110,29 +991,13 @@ impl Omnigraph { )?; } - // Re-open for merge_insert (scan consumed the dataset; - // version guard was already applied by open_for_mutation above) - let ds = self - .reopen_for_mutation( - &table_key, - &full_path, - table_branch.as_deref(), - initial_version, - ) - .await?; - let update_state = self - .table_store() - .merge_insert_batch( - &full_path, - ds, - updated, - vec!["id".to_string()], - lance::dataset::WhenMatched::UpdateAll, - lance::dataset::WhenNotMatched::DoNothing, - ) - .await?; - - staging.record(table_key, full_path, table_branch, &update_state); + // Accumulate the updated batch into the Merge-mode pending stream. + // The accumulator may now contain entries with the same id as a + // prior insert or update on this table; `MutationStaging::finalize` + // dedupes by id (last-occurrence wins) before issuing the single + // `stage_merge_insert` call at end-of-query. + let updated_schema = updated.schema(); + staging.append_batch(&table_key, updated_schema, PendingMode::Merge, updated)?; Ok(MutationResult { affected_nodes: affected_count, @@ -1145,14 +1010,15 @@ impl Omnigraph { type_name: &str, predicate: &IRMutationPredicate, params: &ParamMap, + branch: Option<&str>, staging: &mut MutationStaging, ) -> Result { let is_node = self.catalog().node_types.contains_key(type_name); if is_node { - self.execute_delete_node(type_name, predicate, params, staging) + self.execute_delete_node(type_name, predicate, params, branch, staging) .await } else { - self.execute_delete_edge(type_name, predicate, params, staging) + self.execute_delete_edge(type_name, predicate, params, branch, staging) .await } } @@ -1162,16 +1028,19 @@ impl Omnigraph { type_name: &str, predicate: &IRMutationPredicate, params: &ParamMap, + branch: Option<&str>, staging: &mut MutationStaging, ) -> Result { let pred_sql = predicate_to_sql(predicate, params, false)?; let table_key = format!("node:{}", type_name); let (ds, full_path, table_branch) = - open_for_mutation_in_query(self, staging, &table_key).await?; + open_table_for_mutation(self, staging, branch, &table_key).await?; let initial_version = ds.version().version; - // Scan matching IDs for cascade + // Scan matching IDs for cascade. Per D₂ this never overlaps with + // staged inserts (mixed insert/delete in one query is rejected at + // parse time), so we scan committed only. let batches = self .table_store() .scan(&ds, Some(&["id"]), Some(&pred_sql), None) @@ -1200,8 +1069,11 @@ impl Omnigraph { let affected_nodes = deleted_ids.len(); - // Delete nodes (re-open needed because the scan consumed the dataset; - // version guard was already applied by open_for_mutation above) + // Delete nodes — still inline-commit (Lance's `Dataset::delete` is + // not exposed as a two-phase op in 4.0.0). D₂ keeps inserts and + // deletes from coexisting in one query, so this advance of Lance + // HEAD is the only HEAD movement during the query and the + // publisher's CAS captures it intact. let mut ds = self .reopen_for_mutation( &table_key, @@ -1215,12 +1087,13 @@ impl Omnigraph { .delete_where(&full_path, &mut ds, &pred_sql) .await?; - staging.record( - table_key.clone(), - full_path.clone(), - table_branch.clone(), - &delete_state, - ); + staging.record_inline(crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: delete_state.version, + table_branch: table_branch.clone(), + row_count: delete_state.row_count, + version_metadata: delete_state.version_metadata, + }); let mut affected_edges = 0usize; let escaped: Vec = deleted_ids @@ -1251,7 +1124,7 @@ impl Omnigraph { let edge_table_key = format!("edge:{}", edge_name); let cascade_filter = cascade_filters.join(" OR "); let (mut edge_ds, edge_full_path, edge_table_branch) = - open_for_mutation_in_query(self, staging, &edge_table_key).await?; + open_table_for_mutation(self, staging, branch, &edge_table_key).await?; let edge_delete = self .table_store() @@ -1261,12 +1134,13 @@ impl Omnigraph { affected_edges += edge_delete.deleted_rows; if edge_delete.deleted_rows > 0 { - staging.record( - edge_table_key, - edge_full_path, - edge_table_branch, - &edge_delete, - ); + staging.record_inline(crate::db::SubTableUpdate { + table_key: edge_table_key, + table_version: edge_delete.version, + table_branch: edge_table_branch, + row_count: edge_delete.row_count, + version_metadata: edge_delete.version_metadata, + }); } } @@ -1285,13 +1159,14 @@ impl Omnigraph { type_name: &str, predicate: &IRMutationPredicate, params: &ParamMap, + branch: Option<&str>, staging: &mut MutationStaging, ) -> Result { let pred_sql = predicate_to_sql(predicate, params, true)?; let table_key = format!("edge:{}", type_name); let (mut ds, full_path, table_branch) = - open_for_mutation_in_query(self, staging, &table_key).await?; + open_table_for_mutation(self, staging, branch, &table_key).await?; let delete_state = self .table_store() @@ -1300,7 +1175,13 @@ impl Omnigraph { let affected = delete_state.deleted_rows; if affected > 0 { - staging.record(table_key, full_path, table_branch, &delete_state); + staging.record_inline(crate::db::SubTableUpdate { + table_key, + table_version: delete_state.version, + table_branch, + row_count: delete_state.row_count, + version_metadata: delete_state.version_metadata, + }); self.invalidate_graph_index().await; } @@ -1311,6 +1192,105 @@ impl Omnigraph { } } +/// Concat the matched batches from `scan_with_pending` into a single batch. +/// `scan_with_pending` returns committed-side and pending-side batches in +/// order; both should share a schema if pending was produced through +/// `apply_assignments` with full-schema scan input. If schemas drift, +/// surface a clear error so the user can split the query. +fn concat_match_batches_to_schema( + _schema: &SchemaRef, + _blob_properties: &HashSet, + batches: Vec, +) -> Result { + if batches.len() == 1 { + return Ok(batches.into_iter().next().unwrap()); + } + let common = batches[0].schema(); + arrow_select::concat::concat_batches(&common, &batches).map_err(|e| { + OmniError::Lance(format!( + "scan_with_pending returned batches with mismatched schemas \ + across the committed/pending boundary; this typically indicates \ + a blob-column shape mismatch between the committed table and a \ + prior in-query insert/update. Split blob-touching mutations \ + into separate queries. ({})", + e + )) + }) +} + +/// Validate that adding `pending` edges plus the committed edges does not +/// exceed the per-source cardinality bound on `edge_type`. Reads the `src` +/// column from both committed (Lance) and pending (in-memory) and counts +/// per src. +async fn validate_edge_cardinality_with_pending( + db: &Omnigraph, + committed_ds: &Dataset, + staging: &MutationStaging, + table_key: &str, + edge_type: &omnigraph_compiler::catalog::EdgeType, +) -> Result<()> { + use std::collections::HashMap as StdHashMap; + + if edge_type.cardinality.is_default() { + return Ok(()); + } + + let mut counts: StdHashMap = StdHashMap::new(); + + // Committed side: scan `src` column (cheap, no filter). + let committed = db + .table_store() + .scan(committed_ds, Some(&["src"]), None, None) + .await?; + for batch in &committed { + let srcs = batch + .column_by_name("src") + .ok_or_else(|| OmniError::Lance("missing 'src' column on edge table".into()))? + .as_any() + .downcast_ref::() + .ok_or_else(|| OmniError::Lance("'src' column is not Utf8".into()))?; + for i in 0..srcs.len() { + if srcs.is_valid(i) { + *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; + } + } + } + + // Pending side: walk in-memory pending batches for `src`. + for batch in staging.pending_batches(table_key) { + let Some(col) = batch.column_by_name("src") else { + continue; + }; + let Some(srcs) = col.as_any().downcast_ref::() else { + continue; + }; + for i in 0..srcs.len() { + if srcs.is_valid(i) { + *counts.entry(srcs.value(i).to_string()).or_insert(0) += 1; + } + } + } + + let card = &edge_type.cardinality; + for (src, count) in &counts { + if let Some(max) = card.max { + if *count > max { + return Err(OmniError::manifest(format!( + "@card violation on edge {}: source '{}' has {} edges (max {})", + edge_type.name, src, count, max + ))); + } + } + // Note: per-source minimum cardinality cannot be checked + // mid-query (a bound of `2..` requires both edges to be inserted + // before validation). The publisher path could re-validate at + // commit time; for now, defer to the loader's end-of-query check. + let _ = card.min; + } + + Ok(()) +} + fn enrich_mutation_params(params: &ParamMap) -> Result { let mut resolved = params.clone(); if !resolved.contains_key(NOW_PARAM_NAME) {