MR-794 step 2: address Cursor Bugbot follow-ups on commits 3223b51 + 052b6e6

Four code/doc fixes from the latest Cursor Bugbot pass:

* **Misplaced doc comment in table_store.rs (Medium):** the doc block
  intended for `scan_pending_batches` was, after my earlier edit,
  attached to `collect_string_column_values` because the new helper
  was inserted between the original docblock and `scan_pending_batches`.
  Move the docblock back onto its function and add a note about the
  shared SQL-dialect contract with the Lance scanner (the predicate
  goes to both, which is fine for `predicate_to_sql`'s plain comparison
  shapes today; future Lance-specific scanner extensions in the filter
  would need translation).

* **Missing null check on committed `id` column (Low):** the
  committed-side loop in `collect_node_ids_with_pending` (and the
  parallel non-pending `collect_node_ids`) read `id_col.value(i)`
  without `is_valid(i)` first. `id` is the @key column on every node
  type and non-nullable by schema, so this is unreachable today, but
  the inconsistency with the pending-side `is_valid` guard is worth
  closing for symmetry / defense.

* **Misleading comment in count_pending_src_with_dedupe (Low):** the
  comment claimed "fall back to naive counting" but the code did
  `continue`. Fix: it's unreachable in practice (the pending-side
  schema always contains the key when the caller passes one), so
  failing loudly with a typed error if it ever does fire is correct
  — silently skipping the batch would let `@card` violations slip
  past validation.

* **PendingTable.schema mismatch surfaces too late (Medium):**
  PendingTable captures the schema from the first batch and never
  updates it. On a blob-bearing table, `insert` produces a full-schema
  batch and `update` (without assigning every blob) produces a
  subset-schema batch. Pre-fix the mismatch surfaced inside
  finalize/MemTable construction — distant from the offending op.
  Post-fix `MutationStaging::append_batch` validates the new batch's
  schema against the existing accumulator's schema and returns a
  typed error directing the caller to split the mutation. Error
  fires at the offending op, not at end-of-query. New helper
  `schemas_compatible` compares field name + data_type pairs;
  nullability and field metadata differences stay tolerated (downstream
  concat already permits those).

Cubic Cursor Bugbot finding #5 (cascade delete edge re-open) self-resolved
in the bot's own analysis ("logic appears sound on re-examination") —
no action.

New test on tests/runs.rs:

* append_batch_rejects_mismatched_schema_in_blob_table_at_offending_op
  — pins the early-error path. Builds a blob-bearing schema, runs an
  `insert + update` query where the update doesn't assign the blob,
  asserts the error fires at the second op with the "Split the
  mutation" message and the manifest is unchanged.

Local: tests/runs.rs 24/24 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-01 21:50:13 +02:00
parent 675568ce85
commit ea16c74329
No known key found for this signature in database
4 changed files with 173 additions and 11 deletions

View file

@ -133,6 +133,33 @@ impl MutationStaging {
// be appended.
return Ok(());
}
// If we've already accumulated a batch on this table, the new
// batch's schema MUST match the existing accumulator's schema.
// The mismatch case in practice is a blob-bearing table that
// sees an `insert` (full schema, blob columns included) and
// then an `update` whose `apply_assignments` output omits
// unassigned blob columns (subset schema). Concat-time and
// MemTable-construction errors would catch this later, but
// surfacing it at the offending `append_batch` call gives the
// caller a clearer point of failure attached to the specific
// op that introduced the drift.
if let Some(existing) = self.pending.get(table_key) {
if !schemas_compatible(&existing.schema, &batch.schema()) {
return Err(OmniError::manifest(format!(
"table '{}' accumulated mutation batches with mismatched schemas: \
prior batches have {} columns, this batch has {}. \
This typically happens on a blob-bearing table when one \
op uses the full schema (e.g. an `insert`) and another \
omits unassigned blob columns (e.g. an `update` that \
doesn't set every blob property). Split the mutation \
into two queries: one for the inserts, one for the \
updates.",
table_key,
existing.schema.fields().len(),
batch.schema().fields().len(),
)));
}
}
let entry = self
.pending
.entry(table_key.to_string())
@ -305,6 +332,24 @@ impl MutationStaging {
/// arbitrary results on duplicates).
///
/// `batches` must be non-empty and all share `schema` (caller enforces).
/// Compare two schemas for the purposes of `MutationStaging::append_batch`'s
/// accumulator-compatibility check. We treat schemas as compatible if
/// they have the same field names and data types in the same order.
/// Nullability and field metadata differences are tolerated — Lance and
/// Arrow round-trip these freely and the accumulator's downstream
/// `concat_batches` is also permissive on those.
fn schemas_compatible(a: &SchemaRef, b: &SchemaRef) -> bool {
if a.fields().len() != b.fields().len() {
return false;
}
for (af, bf) in a.fields().iter().zip(b.fields().iter()) {
if af.name() != bf.name() || af.data_type() != bf.data_type() {
return false;
}
}
true
}
fn dedupe_merge_batches_by_id(
schema: &SchemaRef,
batches: Vec<RecordBatch>,
@ -536,10 +581,19 @@ fn count_pending_src_with_dedupe(
let mut kept_srcs: Vec<String> = Vec::new();
for batch in pending_batches.iter().rev() {
let Some(key_col) = batch.column_by_name(dedupe_key_column) else {
// Pending batch is missing the key column — fall back to naive
// counting for this batch (caller's contract was about merge
// semantics; if the column isn't there we can't dedupe).
continue;
// Pending batch is missing the key column. By construction
// this is unreachable: callers in dedupe mode always push
// batches whose schema contains the key (loader Merge mode
// builds via build_edge_batch which always emits `id`; the
// append_batch schema-compatibility check at the call site
// would also reject a heterogeneous mix). If it ever fires
// it's a programmer error — fail loudly rather than skip
// counting (which would let `@card` violations slip).
return Err(OmniError::manifest_internal(format!(
"count_pending_src_with_dedupe: pending batch missing dedup key column '{}' \
(schema-compat check at append_batch should have rejected this)",
dedupe_key_column
)));
};
let key_arr = key_col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
OmniError::Lance(format!(

View file

@ -1655,7 +1655,13 @@ async fn collect_node_ids_with_pending(
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::Lance("'id' column is not Utf8".into()))?;
for i in 0..batch.num_rows() {
ids.insert(id_col.value(i).to_string());
// Defensive: `id` is the @key column on every node type and
// is non-nullable by schema, but a committed-row corruption
// (or future schema change) could surface a NULL. Skip
// rather than insert "" — pending-side does the same.
if id_col.is_valid(i) {
ids.insert(id_col.value(i).to_string());
}
}
}
@ -1718,6 +1724,9 @@ async fn collect_node_ids(
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..batch.num_rows() {
if !id_col.is_valid(i) {
continue;
}
ids.insert(id_col.value(i).to_string());
}
}

View file

@ -1168,12 +1168,6 @@ fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<(
Ok(())
}
/// Apply `projection` and `filter` to in-memory pending batches via a
/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
/// the read-your-writes side of MR-794's in-memory accumulator.
///
/// `pending_batches` must be non-empty (the caller short-circuits on
/// empty).
/// Collect the set of values in a Utf8 column across multiple batches.
/// Used by `scan_with_pending`'s merge-semantic path to identify
/// committed rows that are shadowed by pending writes. NULL values are
@ -1255,6 +1249,21 @@ fn filter_out_rows_where_string_in(
Ok(out)
}
/// Apply `projection` and `filter` to in-memory pending batches via a
/// fresh DataFusion `SessionContext`. Used by `scan_with_pending` for
/// the read-your-writes side of MR-794's in-memory accumulator.
///
/// `pending_batches` must be non-empty (the caller short-circuits on
/// empty).
///
/// **SQL dialect contract.** `filter` is also passed to Lance's scanner
/// on the committed side. Lance and DataFusion both accept standard
/// SQL comparison predicates (`col op literal`) and OmniGraph's
/// `predicate_to_sql` only emits those shapes today (`=`, `!=`, `>`,
/// `<`, `>=`, `<=`). If a future caller introduces a Lance-specific
/// scanner extension (vector search, FTS, `_rowid` references) into
/// the filter, this function will need explicit translation — DataFusion
/// won't recognize those operators against the in-memory `MemTable`.
async fn scan_pending_batches(
pending_batches: &[RecordBatch],
pending_schema: Option<SchemaRef>,

View file

@ -1274,3 +1274,93 @@ async fn scan_with_pending_rejects_key_column_missing_from_projection() {
"merge-shadow should drop committed 'a' and surface pending 'a' + committed 'b'"
);
}
/// Cursor Bugbot Medium (follow-up on commit 052b6e6): the
/// `PendingTable.schema` field is captured from the first `append_batch`
/// call and never updated. On a blob-bearing table, an `insert`
/// produces a full-schema batch (blob columns included) and an `update`
/// that doesn't assign every blob produces a subset-schema batch. Mixed
/// in one query, the second `append_batch` would silently push an
/// incompatible batch — the mismatch surfaced eventually at
/// `concat_batches`/MemTable construction inside finalize, but the
/// failure point was distant from the offending op.
///
/// Post-fix: `append_batch` validates the new batch's schema against
/// the existing accumulator's schema and returns a typed error
/// directing the caller to split the mutation. The error fires at the
/// second op (the update), not at end-of-query.
#[tokio::test]
async fn append_batch_rejects_mismatched_schema_in_blob_table_at_offending_op() {
use omnigraph::loader::{LoadMode, load_jsonl};
const BLOB_SCHEMA: &str = r#"
node Document {
title: String @key
content: Blob?
note: String?
}
"#;
const BLOB_QUERIES: &str = r#"
query insert_then_update_note(
$title: String, $blob: String, $note: String
) {
insert Document { title: $title, content: $blob }
update Document set { note: $note } where title = $title
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
// Seed with a Document so the update has something to match (the
// mid-query case is the chained-update scenario where the update's
// predicate matches the just-inserted row, exercising the in-memory
// pending union).
load_jsonl(
&mut db,
r#"{"type":"Document","data":{"title":"seed","content":"base64:AQID"}}"#,
LoadMode::Overwrite,
)
.await
.unwrap();
let err = db
.mutate(
"main",
BLOB_QUERIES,
"insert_then_update_note",
&params(&[
("$title", "letter"),
("$blob", "base64:BAUG"),
("$note", "draft 1"),
]),
)
.await
.expect_err("blob-table mixed insert+update with non-fully-assigned blob must error early");
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
assert!(
manifest_err.message.contains("mismatched schemas")
&& manifest_err.message.contains("Split the mutation"),
"error must direct user to split: {}",
manifest_err.message,
);
// Confirm the manifest didn't advance — early error must be
// before any commit.
let qr = db
.query(
ReadTarget::branch("main"),
r#"query get_doc($title: String) {
match { $d: Document { title: $title } }
return { $d.title }
}"#,
"get_doc",
&params(&[("$title", "letter")]),
)
.await
.unwrap();
assert_eq!(qr.num_rows(), 0, "letter must not be visible after early error");
}