diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 7695906..92ddfac 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -133,6 +133,33 @@ impl MutationStaging { // be appended. return Ok(()); } + // If we've already accumulated a batch on this table, the new + // batch's schema MUST match the existing accumulator's schema. + // The mismatch case in practice is a blob-bearing table that + // sees an `insert` (full schema, blob columns included) and + // then an `update` whose `apply_assignments` output omits + // unassigned blob columns (subset schema). Concat-time and + // MemTable-construction errors would catch this later, but + // surfacing it at the offending `append_batch` call gives the + // caller a clearer point of failure attached to the specific + // op that introduced the drift. + if let Some(existing) = self.pending.get(table_key) { + if !schemas_compatible(&existing.schema, &batch.schema()) { + return Err(OmniError::manifest(format!( + "table '{}' accumulated mutation batches with mismatched schemas: \ + prior batches have {} columns, this batch has {}. \ + This typically happens on a blob-bearing table when one \ + op uses the full schema (e.g. an `insert`) and another \ + omits unassigned blob columns (e.g. an `update` that \ + doesn't set every blob property). Split the mutation \ + into two queries: one for the inserts, one for the \ + updates.", + table_key, + existing.schema.fields().len(), + batch.schema().fields().len(), + ))); + } + } let entry = self .pending .entry(table_key.to_string()) @@ -305,6 +332,24 @@ impl MutationStaging { /// arbitrary results on duplicates). /// /// `batches` must be non-empty and all share `schema` (caller enforces). +/// Compare two schemas for the purposes of `MutationStaging::append_batch`'s +/// accumulator-compatibility check. We treat schemas as compatible if +/// they have the same field names and data types in the same order. +/// Nullability and field metadata differences are tolerated — Lance and +/// Arrow round-trip these freely and the accumulator's downstream +/// `concat_batches` is also permissive on those. +fn schemas_compatible(a: &SchemaRef, b: &SchemaRef) -> bool { + if a.fields().len() != b.fields().len() { + return false; + } + for (af, bf) in a.fields().iter().zip(b.fields().iter()) { + if af.name() != bf.name() || af.data_type() != bf.data_type() { + return false; + } + } + true +} + fn dedupe_merge_batches_by_id( schema: &SchemaRef, batches: Vec, @@ -536,10 +581,19 @@ fn count_pending_src_with_dedupe( let mut kept_srcs: Vec = Vec::new(); for batch in pending_batches.iter().rev() { let Some(key_col) = batch.column_by_name(dedupe_key_column) else { - // Pending batch is missing the key column — fall back to naive - // counting for this batch (caller's contract was about merge - // semantics; if the column isn't there we can't dedupe). - continue; + // Pending batch is missing the key column. By construction + // this is unreachable: callers in dedupe mode always push + // batches whose schema contains the key (loader Merge mode + // builds via build_edge_batch which always emits `id`; the + // append_batch schema-compatibility check at the call site + // would also reject a heterogeneous mix). If it ever fires + // it's a programmer error — fail loudly rather than skip + // counting (which would let `@card` violations slip). + return Err(OmniError::manifest_internal(format!( + "count_pending_src_with_dedupe: pending batch missing dedup key column '{}' \ + (schema-compat check at append_batch should have rejected this)", + dedupe_key_column + ))); }; let key_arr = key_col.as_any().downcast_ref::().ok_or_else(|| { OmniError::Lance(format!( diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index a349a64..374adc9 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -1655,7 +1655,13 @@ async fn collect_node_ids_with_pending( .downcast_ref::() .ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?; for i in 0..batch.num_rows() { - ids.insert(id_col.value(i).to_string()); + // Defensive: `id` is the @key column on every node type and + // is non-nullable by schema, but a committed-row corruption + // (or future schema change) could surface a NULL. Skip + // rather than insert "" — pending-side does the same. + if id_col.is_valid(i) { + ids.insert(id_col.value(i).to_string()); + } } } @@ -1718,6 +1724,9 @@ async fn collect_node_ids( .downcast_ref::() .unwrap(); for i in 0..batch.num_rows() { + if !id_col.is_valid(i) { + continue; + } ids.insert(id_col.value(i).to_string()); } } diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index c460e51..b0ecc18 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -1168,12 +1168,6 @@ fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<( Ok(()) } -/// Apply `projection` and `filter` to in-memory pending batches via a -/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for -/// the read-your-writes side of MR-794's in-memory accumulator. -/// -/// `pending_batches` must be non-empty (the caller short-circuits on -/// empty). /// Collect the set of values in a Utf8 column across multiple batches. /// Used by `scan_with_pending`'s merge-semantic path to identify /// committed rows that are shadowed by pending writes. NULL values are @@ -1255,6 +1249,21 @@ fn filter_out_rows_where_string_in( Ok(out) } +/// Apply `projection` and `filter` to in-memory pending batches via a +/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for +/// the read-your-writes side of MR-794's in-memory accumulator. +/// +/// `pending_batches` must be non-empty (the caller short-circuits on +/// empty). +/// +/// **SQL dialect contract.** `filter` is also passed to Lance's scanner +/// on the committed side. Lance and DataFusion both accept standard +/// SQL comparison predicates (`col op literal`) and OmniGraph's +/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`, +/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific +/// scanner extension (vector search, FTS, `_rowid` references) into +/// the filter, this function will need explicit translation — DataFusion +/// won't recognize those operators against the in-memory `MemTable`. async fn scan_pending_batches( pending_batches: &[RecordBatch], pending_schema: Option, diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index 5353a2f..74bafc1 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -1274,3 +1274,93 @@ async fn scan_with_pending_rejects_key_column_missing_from_projection() { "merge-shadow should drop committed 'a' and surface pending 'a' + committed 'b'" ); } + +/// Cursor Bugbot Medium (follow-up on commit 052b6e6): the +/// `PendingTable.schema` field is captured from the first `append_batch` +/// call and never updated. On a blob-bearing table, an `insert` +/// produces a full-schema batch (blob columns included) and an `update` +/// that doesn't assign every blob produces a subset-schema batch. Mixed +/// in one query, the second `append_batch` would silently push an +/// incompatible batch — the mismatch surfaced eventually at +/// `concat_batches`/MemTable construction inside finalize, but the +/// failure point was distant from the offending op. +/// +/// Post-fix: `append_batch` validates the new batch's schema against +/// the existing accumulator's schema and returns a typed error +/// directing the caller to split the mutation. The error fires at the +/// second op (the update), not at end-of-query. +#[tokio::test] +async fn append_batch_rejects_mismatched_schema_in_blob_table_at_offending_op() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + const BLOB_SCHEMA: &str = r#" +node Document { + title: String @key + content: Blob? + note: String? +} +"#; + const BLOB_QUERIES: &str = r#" +query insert_then_update_note( + $title: String, $blob: String, $note: String +) { + insert Document { title: $title, content: $blob } + update Document set { note: $note } where title = $title +} +"#; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap(); + + // Seed with a Document so the update has something to match (the + // mid-query case is the chained-update scenario where the update's + // predicate matches the just-inserted row, exercising the in-memory + // pending union). + load_jsonl( + &mut db, + r#"{"type":"Document","data":{"title":"seed","content":"base64:AQID"}}"#, + LoadMode::Overwrite, + ) + .await + .unwrap(); + + let err = db + .mutate( + "main", + BLOB_QUERIES, + "insert_then_update_note", + ¶ms(&[ + ("$title", "letter"), + ("$blob", "base64:BAUG"), + ("$note", "draft 1"), + ]), + ) + .await + .expect_err("blob-table mixed insert+update with non-fully-assigned blob must error early"); + let OmniError::Manifest(manifest_err) = err else { + panic!("expected Manifest error, got {err:?}"); + }; + assert!( + manifest_err.message.contains("mismatched schemas") + && manifest_err.message.contains("Split the mutation"), + "error must direct user to split: {}", + manifest_err.message, + ); + + // Confirm the manifest didn't advance — early error must be + // before any commit. + let qr = db + .query( + ReadTarget::branch("main"), + r#"query get_doc($title: String) { + match { $d: Document { title: $title } } + return { $d.title } + }"#, + "get_doc", + ¶ms(&[("$title", "letter")]), + ) + .await + .unwrap(); + assert_eq!(qr.num_rows(), 0, "letter must not be visible after early error"); +}