MR-925: validation-prototypes scaffolding + exp 1.1 + exp 1.2

- exclude validation-prototypes/ and merge-insert-cas-repro from the main
  workspace so the nested cargo workspace can use its own pin set
- add validation-prototypes/{factorized-batches,custom-lance-index}/
  scratch crates (never merged to main; long-lived branch only)
- exp 1.1 — factorized batches through DataFusion ops: writeup at
  .context/experiments/factorized-batches.md (5 cells × 8 ops; all
  scalar-keyed ops accept List<UInt64> input, UNNEST via CROSS JOIN
  fails in DF 52.5)
- exp 1.2 — custom Lance index plugin from outside lance: writeup at
  .context/experiments/custom-lance-index.md (5 probes; transaction
  surface is open, SCALAR_INDEX_PLUGIN_REGISTRY is closed → hard
  blocker for MR-737 §5.4; recommends upstream path or external-index
  path)
This commit is contained in:
Devin AI 2026-05-12 16:49:33 +00:00
parent c9c7c0672e
commit 02c4b45c85
12 changed files with 8033 additions and 0 deletions

6324
validation-prototypes/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,69 @@
[workspace]
resolver = "2"
members = [
"factorized-batches",
"custom-lance-index",
# Additional crates added as each experiment is set up:
# "custom-operator", # 1.3
# "sip-format-bench", # 1.4
# "bitmap-pushdown", # 1.5
# "txn-branches-cost", # 1.6
# "stable-rowid-index", # 1.7
]
# Pre-Phase-0 validation prototypes for MR-925 / MR-737.
# These are THROWAWAY crates that produce go/no-go signals or calibration
# numbers. Do not merge to main. The findings live in `.context/experiments/`.
[workspace.dependencies]
# Pin to the omnigraph workspace versions so the experiments exercise the
# same substrate behavior the engine will see in Phase 0.
arrow-array = "57"
arrow-ipc = "57"
arrow-schema = "57"
arrow-select = "57"
arrow-cast = { version = "57", features = ["prettyprint"] }
arrow-ord = "57"
arrow = "57"
datafusion = { version = "52", default-features = false }
datafusion-physical-plan = "52"
datafusion-physical-expr = "52"
datafusion-execution = "52"
datafusion-common = "52"
datafusion-expr = "52"
datafusion-functions-aggregate = "52"
datafusion-physical-optimizer = "52"
lance = { version = "4.0.0", default-features = false, features = ["aws"] }
lance-datafusion = "4.0.0"
lance-file = "4.0.0"
lance-index = "4.0.0"
lance-table = "4.0.0"
lance-core = "4.0.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
futures = "0.3"
async-trait = "0.1"
tempfile = "3"
anyhow = "1"
rand = "0.8"
roaring = "0.11"
croaring = "2"
prost = "0.14"
prost-types = "0.14"
uuid = { version = "1", features = ["v4"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
serde_json = "1"
[profile.dev]
debug = 0
[profile.dev.package."*"]
opt-level = 2
[profile.release]
opt-level = 3
lto = "thin"
codegen-units = 16

View file

@ -0,0 +1,30 @@
[package]
name = "custom-lance-index"
version = "0.0.0"
edition = "2024"
publish = false
# Experiment 1.2 (MR-925) — custom Lance index plugin from outside the lance crate.
# Validates MR-737 §5.4, §5.5.
[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
lance = { workspace = true }
lance-table = { workspace = true }
lance-index = { workspace = true }
lance-core = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
anyhow = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
roaring = { workspace = true }
tempfile = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true }
[[bin]]
name = "custom-lance-index"
path = "src/main.rs"

View file

@ -0,0 +1,355 @@
//! MR-925 Experiment 1.2 — custom Lance index plugin from outside the lance crate.
//!
//! Goal: probe what a third-party crate (us) can and *cannot* do when shipping
//! a "custom index" against the public Lance 4.0.0 surface. Produces a
//! compatibility matrix the writeup at `.context/experiments/custom-lance-index.md`
//! consumes.
//!
//! Probes:
//!
//! P1. Construct an `IndexMetadata` with a non-standard `index_details`
//! protobuf and commit it via `Operation::CreateIndex`.
//! P2. Reopen the dataset; verify `load_indices()` returns our row (or filters
//! it out).
//! P3. Append fragments; observe whether the index's `fragment_bitmap` is
//! updated automatically (it should not be — that's the engine's job).
//! P4. Run a `Scanner` with a filter; observe whether Lance attempts to open
//! our index. We expect failure: `SCALAR_INDEX_PLUGIN_REGISTRY` is a
//! `pub(crate)` static with no setter as of 4.0.0
//! (lance/src/index/scalar.rs:223 carries the TODO).
//! P5. Run `compact_files` (Rewrite). Observe whether our `IndexMetadata`
//! survives the rewrite or is dropped.
use std::sync::Arc;
use anyhow::{Context, Result};
use arrow_array::builder::{StringBuilder, UInt64Builder};
use arrow_array::{RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
use lance::Dataset;
use lance::dataset::optimize::{CompactionOptions, compact_files};
use lance::dataset::transaction::Operation;
use lance::dataset::WriteParams;
use lance::session::Session;
use lance_index::DatasetIndexExt;
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
use tempfile::TempDir;
use uuid::Uuid;
use prost_types::Any as ProstAny;
const TYPE_URL: &str = "omnigraph.v0.NeighborIndexDetails";
fn make_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("key", DataType::UInt64, false),
Field::new("payload", DataType::Utf8, false),
]))
}
fn build_batch(n: u64, key_base: u64) -> RecordBatch {
let schema = make_schema();
let mut keys = UInt64Builder::with_capacity(n as usize);
let mut payloads = StringBuilder::new();
for i in 0..n {
keys.append_value(key_base + i);
payloads.append_value(format!("p_{:06}", key_base + i));
}
RecordBatch::try_new(
schema,
vec![Arc::new(keys.finish()), Arc::new(payloads.finish())],
)
.expect("build batch")
}
async fn write_initial(uri: &str) -> Result<Dataset> {
let schema = make_schema();
let batches = vec![Ok(build_batch(1000, 0))];
let reader = RecordBatchIterator::new(batches.into_iter(), schema.clone());
Dataset::write(reader, uri, Some(WriteParams::default()))
.await
.context("initial write")
}
async fn append_more(ds: &mut Dataset) -> Result<()> {
let schema = make_schema();
let batches = vec![Ok(build_batch(500, 10_000))];
let reader = RecordBatchIterator::new(batches.into_iter(), schema.clone());
ds.append(reader, None).await.context("append")?;
Ok(())
}
/// Construct our custom-index metadata. The bytes payload mimics what a
/// real index plugin would carry: a serialized BTreeMap<u64, u64> (key →
/// row_addr). We don't read this back here — we just want to prove that
/// Lance round-trips it through the manifest unchanged.
fn make_index_metadata(uuid: Uuid, frag_ids: &[u64], dataset_version: u64) -> IndexMetadata {
let payload_bytes: Vec<u8> = b"omnigraph::neighbor_index v0 (1000 entries)".to_vec();
let any = ProstAny {
type_url: TYPE_URL.to_string(),
value: payload_bytes,
};
let mut bitmap = RoaringBitmap::new();
for f in frag_ids {
bitmap.insert(*f as u32);
}
IndexMetadata {
uuid,
fields: vec![0], // 0 = "key" by schema position
name: "neighbor_idx".to_string(),
dataset_version,
fragment_bitmap: Some(bitmap),
index_details: Some(Arc::new(any)),
index_version: 0,
created_at: None,
base_id: None,
files: None,
}
}
async fn commit_index(ds: &Dataset, idx: IndexMetadata) -> Result<Dataset> {
let op = Operation::CreateIndex {
new_indices: vec![idx],
removed_indices: vec![],
};
let new = Dataset::commit(
ds.uri(),
op,
Some(ds.manifest().version),
None,
None,
Arc::new(Session::default()),
false,
)
.await
.context("commit CreateIndex")?;
Ok(new)
}
#[derive(Default)]
struct Matrix {
rows: Vec<Row>,
}
struct Row {
probe: &'static str,
outcome: String,
notes: String,
}
impl Matrix {
fn add(&mut self, probe: &'static str, outcome: impl Into<String>, notes: impl Into<String>) {
self.rows.push(Row {
probe,
outcome: outcome.into(),
notes: notes.into(),
});
}
fn print(&self) {
println!("\n{:-^120}", " custom-lance-index compatibility matrix ");
println!("{:<32} {:<14} {}", "probe", "outcome", "notes");
println!("{:-<120}", "");
for r in &self.rows {
println!("{:<32} {:<14} {}", r.probe, r.outcome, r.notes);
}
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> Result<()> {
let tmp = TempDir::new().context("tmpdir")?;
let uri = format!("file://{}", tmp.path().join("ds").display());
println!("dataset uri: {uri}");
let mut matrix = Matrix::default();
// P1: build a dataset, then construct + commit our custom index.
let ds = write_initial(&uri).await?;
let frag_ids: Vec<u64> = ds
.get_fragments()
.iter()
.map(|f| f.id() as u64)
.collect();
println!("initial fragments: {frag_ids:?}");
let our_uuid = Uuid::new_v4();
let idx = make_index_metadata(our_uuid, &frag_ids, ds.manifest().version);
let mut ds = match commit_index(&ds, idx).await {
Ok(d) => {
matrix.add(
"P1 construct+commit",
"OK",
format!(
"Operation::CreateIndex accepted custom type_url '{TYPE_URL}'; commit v{}",
d.manifest().version
),
);
d
}
Err(e) => {
matrix.add("P1 construct+commit", "FAIL", format!("{e:#}"));
matrix.print();
return Ok(());
}
};
// P2: load indices.
let indices = ds.load_indices().await.context("load_indices")?;
let ours: Vec<&IndexMetadata> = indices
.iter()
.filter(|i| i.uuid == our_uuid)
.collect();
if ours.len() == 1 {
let our_idx = ours[0];
let detail_url = our_idx
.index_details
.as_ref()
.map(|a| a.type_url.clone())
.unwrap_or_default();
let frag_count = our_idx
.fragment_bitmap
.as_ref()
.map(|b| b.len())
.unwrap_or(0);
matrix.add(
"P2 load_indices (round-trip)",
"OK",
format!(
"type_url='{detail_url}' fragment_bitmap.len={frag_count} survives retain_supported_indices"
),
);
} else {
matrix.add(
"P2 load_indices (round-trip)",
"FAIL",
format!(
"expected 1 row matching uuid {our_uuid}, found {} (retain_supported_indices likely dropped it)",
ours.len()
),
);
}
// P3: append more rows; the index's fragment_bitmap should NOT
// auto-update — that's the plugin's job. Verify the dataset still
// reports the same (stale) bitmap.
append_more(&mut ds).await?;
let indices_after_append = ds.load_indices().await?;
let ours_after_append: Vec<&IndexMetadata> = indices_after_append
.iter()
.filter(|i| i.uuid == our_uuid)
.collect();
if let Some(idx) = ours_after_append.first() {
let frags_now: Vec<u32> = idx
.fragment_bitmap
.as_ref()
.map(|b| b.iter().collect())
.unwrap_or_default();
matrix.add(
"P3 append-row coverage",
if frags_now.len() == frag_ids.len() {
"STALE_AS_EXPECTED"
} else {
"UNEXPECTED_AUTO_UPDATE"
},
format!(
"fragment_bitmap={frags_now:?} (expected {frag_ids:?}); new fragments not auto-covered"
),
);
} else {
matrix.add("P3 append-row coverage", "DROPPED", "index disappeared after append");
}
// P4: try to scan with a predicate; observe whether Lance tries to open
// our index. With the closed plugin registry, `open_scalar_index` should
// never even be invoked on our type_url because the predicate is on
// `key` — but a different index over `key` does not exist in any builtin
// type. We assert here that scanning still works (Lance falls back to
// full-scan) and does NOT panic on our metadata being present.
let mut scanner = ds.scan();
scanner
.filter("key = 42")
.context("filter")?
.project(&["key"])
.context("project")?;
let stream = scanner.try_into_stream().await.context("scan stream")?;
let batches: Vec<_> = futures::stream::TryStreamExt::try_collect(stream)
.await
.context("scan collect")?;
let scanned_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
matrix.add(
"P4 scan with filter on indexed col",
if scanned_rows == 1 { "FULL_SCAN_FALLBACK" } else { "UNEXPECTED" },
format!(
"rows={scanned_rows} (expected 1); SCALAR_INDEX_PLUGIN_REGISTRY refuses unknown type_url '{TYPE_URL}' so scanner falls back to full scan"
),
);
// P5: run compact_files (Rewrite). Observe whether our IndexMetadata
// survives the rewrite. The Operation::Rewrite path remaps row addresses
// for *recognized* indices (BTreeMap of `rewritten_indices`) — our index
// is not recognized, so we expect Lance to either (a) leave the
// IndexMetadata in place with stale fragment_bitmap, or (b) drop it.
let pre_compact_indices = ds.load_indices().await?.len();
let metrics = compact_files(&mut ds, CompactionOptions::default(), None)
.await
.context("compact_files")?;
let post_compact_indices = ds.load_indices().await?;
let ours_after_compact: Vec<&IndexMetadata> = post_compact_indices
.iter()
.filter(|i| i.uuid == our_uuid)
.collect();
let frags_after: Vec<u64> = ds
.get_fragments()
.iter()
.map(|f| f.id() as u64)
.collect();
if let Some(idx) = ours_after_compact.first() {
let bitmap: Vec<u32> = idx
.fragment_bitmap
.as_ref()
.map(|b| b.iter().collect())
.unwrap_or_default();
let outcome = if frags_after.iter().all(|f| bitmap.contains(&(*f as u32))) {
"REMAPPED"
} else if bitmap.is_empty() {
"EMPTIED"
} else {
"STALE_BITMAP"
};
matrix.add(
"P5 compact_files (Rewrite)",
outcome,
format!(
"before={pre_compact_indices} indices; after={} indices; rewritten files={}; new fragments={frags_after:?}; idx.fragment_bitmap={bitmap:?}",
post_compact_indices.len(),
metrics.files_added
),
);
} else {
matrix.add(
"P5 compact_files (Rewrite)",
"DROPPED",
format!(
"index dropped during compaction; before={pre_compact_indices} indices, after={} indices; files_added={}",
post_compact_indices.len(),
metrics.files_added
),
);
}
matrix.print();
// Final commentary printed for the writeup.
println!("\n[note] Lance 4.0.0 has a private static `SCALAR_INDEX_PLUGIN_REGISTRY` (see");
println!(" lance/src/index/scalar.rs:223). The `// TODO: Allow users to register their own plugins`");
println!(" comment confirms this surface is not yet pluggable. We can write");
println!(" custom IndexMetadata, but the Lance scanner cannot dispatch to a custom plugin.");
Ok(())
}

View file

@ -0,0 +1,34 @@
[package]
name = "factorized-batches"
version = "0.0.0"
edition = "2024"
publish = false
# Experiment 1.1 (MR-925) — factorized batches through DataFusion ops.
# Validates MR-737 §5.2 / Open Q2.
[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-cast = { workspace = true }
datafusion = { workspace = true, features = [
"sql",
"nested_expressions",
"unicode_expressions",
"string_expressions",
"math_expressions",
"regex_expressions",
"datetime_expressions",
] }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
anyhow = { workspace = true }
rand = { workspace = true }
[[bin]]
name = "factorized-batches"
path = "src/main.rs"

View file

@ -0,0 +1,113 @@
[cell] n_src=10000 fanout=u=1 edges=10000
[cell] n_src=10000 fanout=u=10 edges=100000
[cell] n_src=10000 fanout=u=100 edges=1000000
[cell] n_src=10000 fanout=u=1000 edges=10000000
[cell] n_src=10000 fanout=s=10/0.02 edges=118141
-------------------------------------------------------- factorized-batches results --------------------------------------------------------
op n_src fanout f_ok f_rows f_time_ms x_ok x_rows x_time_ms speedup recommendation
--------------------------------------------------------------------------------------------------------------------------------------------
filter 10000 u=1 Y 5000 2.31 Y 5000 0.75 0.32x KEEP_FACTORIZED
project 10000 u=1 Y 10000 0.21 Y 10000 0.17 0.81x KEEP_FACTORIZED
sort 10000 u=1 Y 1000 2.14 Y 1000 2.02 0.94x KEEP_FACTORIZED
aggregate_scalar 10000 u=1 Y 1 2.04 Y 1 1.45 0.71x KEEP_FACTORIZED
aggregate_on_list 10000 u=1 Y 6353 2.64 - - - - KEEP_FACTORIZED
join_scalar 10000 u=1 Y 100 1.27 Y 100 1.06 0.83x KEEP_FACTORIZED
join_on_list 10000 u=1 Y 1 1.88 - - - - KEEP_FACTORIZED
unnest_flatten 10000 u=1 N 0 0.53 - - - - FLATTEN_BEFORE
factorized error: execute: This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "_neighbors", data_type: List(Field { data_type: UInt64 }) }, Column { relation: Some(Bare { table: "t" }), name: "_neighbors" })
filter 10000 u=10 Y 5000 1.16 Y 50000 0.84 0.72x KEEP_FACTORIZED
project 10000 u=10 Y 10000 0.26 Y 100000 0.27 1.03x KEEP_FACTORIZED
sort 10000 u=10 Y 1000 2.72 Y 1000 19.53 7.18x KEEP_FACTORIZED
aggregate_scalar 10000 u=10 Y 1 1.46 Y 1 4.04 2.77x KEEP_FACTORIZED
aggregate_on_list 10000 u=10 Y 10000 12.37 - - - - KEEP_FACTORIZED
join_scalar 10000 u=10 Y 100 1.17 Y 100 4.16 3.57x KEEP_FACTORIZED
join_on_list 10000 u=10 Y 1 3.84 - - - - KEEP_FACTORIZED
unnest_flatten 10000 u=10 N 0 0.45 - - - - FLATTEN_BEFORE
factorized error: execute: This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "_neighbors", data_type: List(Field { data_type: UInt64 }) }, Column { relation: Some(Bare { table: "t" }), name: "_neighbors" })
filter 10000 u=100 Y 5000 1.40 Y 500000 2.73 1.95x KEEP_FACTORIZED
project 10000 u=100 Y 10000 0.20 Y 1000000 0.25 1.26x KEEP_FACTORIZED
sort 10000 u=100 Y 1000 2.58 Y 1000 180.72 70.18x KEEP_FACTORIZED
aggregate_scalar 10000 u=100 Y 1 1.74 Y 1 28.69 16.47x KEEP_FACTORIZED
aggregate_on_list 10000 u=100 Y 10000 113.60 - - - - KEEP_FACTORIZED
join_scalar 10000 u=100 Y 100 4.32 Y 100 17.92 4.15x KEEP_FACTORIZED
join_on_list 10000 u=100 Y 1 26.24 - - - - KEEP_FACTORIZED
unnest_flatten 10000 u=100 N 0 0.64 - - - - FLATTEN_BEFORE
factorized error: execute: This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "_neighbors", data_type: List(Field { data_type: UInt64 }) }, Column { relation: Some(Bare { table: "t" }), name: "_neighbors" })
filter 10000 u=1000 Y 5000 46.29 Y 5000000 22.12 0.48x KEEP_FACTORIZED
project 10000 u=1000 Y 10000 0.31 Y 10000000 0.44 1.43x KEEP_FACTORIZED
sort 10000 u=1000 Y 1000 4.75 Y 1000 1597.33 336.28x KEEP_FACTORIZED
aggregate_scalar 10000 u=1000 Y 1 2.01 Y 1 282.68 140.36x KEEP_FACTORIZED
aggregate_on_list 10000 u=1000 Y 10000 1624.65 - - - - KEEP_FACTORIZED
join_scalar 10000 u=1000 Y 100 5.79 Y 100 196.15 33.88x KEEP_FACTORIZED
join_on_list 10000 u=1000 Y 1 659.47 - - - - KEEP_FACTORIZED
unnest_flatten 10000 u=1000 N 0 0.62 - - - - FLATTEN_BEFORE
factorized error: execute: This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "_neighbors", data_type: List(Field { data_type: UInt64 }) }, Column { relation: Some(Bare { table: "t" }), name: "_neighbors" })
filter 10000 s=10/0.02 Y 5000 0.91 Y 68142 1.02 1.11x KEEP_FACTORIZED
project 10000 s=10/0.02 Y 10000 0.21 Y 118141 0.19 0.88x KEEP_FACTORIZED
sort 10000 s=10/0.02 Y 1000 2.23 Y 1000 22.38 10.05x KEEP_FACTORIZED
aggregate_scalar 10000 s=10/0.02 Y 1 1.93 Y 1 4.47 2.32x KEEP_FACTORIZED
aggregate_on_list 10000 s=10/0.02 Y 10000 10.21 - - - - KEEP_FACTORIZED
join_scalar 10000 s=10/0.02 Y 100 1.46 Y 100 3.87 2.65x KEEP_FACTORIZED
join_on_list 10000 s=10/0.02 Y 1 4.98 - - - - KEEP_FACTORIZED
unnest_flatten 10000 s=10/0.02 N 0 0.43 - - - - FLATTEN_BEFORE
factorized error: execute: This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Field { name: "_neighbors", data_type: List(Field { data_type: UInt64 }) }, Column { relation: Some(Bare { table: "t" }), name: "_neighbors" })
[explain] aggregate_scalar (factorized input):
logical_plan Sort: bucket ASC NULLS LAST
Projection: substr(t.payload,Int64(1),Int64(4)) AS bucket, count(Int64(1)) AS count(*) AS n
Aggregate: groupBy=[[substr(t.payload, Int64(1), Int64(4))]], aggr=[[count(Int64(1))]]
TableScan: t projection=[payload]
physical_plan SortPreservingMergeExec: [bucket@0 ASC NULLS LAST]
SortExec: expr=[bucket@0 ASC NULLS LAST], preserve_partitioning=[true]
ProjectionExec: expr=[substr(t.payload,Int64(1),Int64(4))@0 as bucket, count(Int64(1))@1 as n]
AggregateExec: mode=FinalPartitioned, gby=[substr(t.payload,Int64(1),Int64(4))@0 as substr(t.payload,Int64(1),Int64(4))], aggr=[count(Int64(1))]
RepartitionExec: partitioning=Hash([substr(t.payload,Int64(1),Int64(4))@0], 2), input_partitions=1
AggregateExec: mode=Partial, gby=[substr(payload@0, 1, 4) as substr(t.payload,Int64(1),Int64(4))], aggr=[count(Int64(1))]
DataSourceExec: partitions=1, partition_sizes=[1]
[explain] join_scalar (factorized input):
logical_plan Projection: a.src_id, a._neighbors
Limit: skip=0, fetch=100
Inner Join: a.src_id = b.src_id
SubqueryAlias: a
TableScan: t projection=[src_id, _neighbors]
SubqueryAlias: b
TableScan: t projection=[src_id]
physical_plan ProjectionExec: expr=[src_id@1 as src_id, _neighbors@2 as _neighbors]
GlobalLimitExec: skip=0, fetch=100
HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(src_id@0, src_id@0)]
DataSourceExec: partitions=1, partition_sizes=[1]
DataSourceExec: partitions=1, partition_sizes=[1]
[explain] aggregate_on_list (factorized input):
logical_plan Projection: t._neighbors, count(Int64(1)) AS count(*) AS n
Aggregate: groupBy=[[t._neighbors]], aggr=[[count(Int64(1))]]
TableScan: t projection=[_neighbors]
physical_plan ProjectionExec: expr=[_neighbors@0 as _neighbors, count(Int64(1))@1 as n]
AggregateExec: mode=FinalPartitioned, gby=[_neighbors@0 as _neighbors], aggr=[count(Int64(1))]
RepartitionExec: partitioning=Hash([_neighbors@0], 2), input_partitions=1
AggregateExec: mode=Partial, gby=[_neighbors@0 as _neighbors], aggr=[count(Int64(1))]
DataSourceExec: partitions=1, partition_sizes=[1]
[explain] sort (factorized input):
logical_plan Sort: t.src_id DESC NULLS FIRST, fetch=1000
TableScan: t projection=[src_id, _neighbors]
physical_plan SortExec: TopK(fetch=1000), expr=[src_id@0 DESC], preserve_partitioning=[false]
DataSourceExec: partitions=1, partition_sizes=[1]
Exit code: 0

View file

@ -0,0 +1,145 @@
//! Synthetic data generation for the factorized-batches experiment.
//!
//! Two shapes are produced:
//! * `factorized`: one row per `src_id`, `_neighbors: List<UInt64>` carrying
//! the neighbor set for that source.
//! * `flat`: one row per `(src_id, neighbor)` pair (exploded baseline).
use std::sync::Arc;
use arrow_array::builder::{ListBuilder, UInt64Builder};
use arrow_array::{Float64Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use rand::SeedableRng;
use rand::rngs::StdRng;
use rand::Rng;
/// Distribution of neighbor-list lengths per source row.
#[derive(Clone, Copy, Debug)]
pub enum FanoutShape {
/// Every src_id has exactly `target` neighbors.
Uniform { target: usize },
/// Skewed: most rows have ~target neighbors, a small fraction have 10×.
Skewed { target: usize, heavy_fraction: f64 },
}
#[derive(Clone, Debug)]
pub struct DataParams {
pub n_src: usize,
pub fanout: FanoutShape,
pub seed: u64,
}
/// Returns `(factorized_batch, flat_batch)` with the same logical content.
///
/// Schema:
/// factorized: src_id: UInt64, payload: Utf8, weight: Float64,
/// _neighbors: List<UInt64 not null> not null
/// flat: src_id: UInt64, payload: Utf8, weight: Float64, dst: UInt64
pub fn build(params: &DataParams) -> (RecordBatch, RecordBatch) {
let mut rng = StdRng::seed_from_u64(params.seed);
// factorized columns
let mut src_ids = UInt64Array::builder(params.n_src);
let mut payloads: Vec<String> = Vec::with_capacity(params.n_src);
let mut weights: Vec<f64> = Vec::with_capacity(params.n_src);
let mut list_builder = ListBuilder::new(UInt64Builder::new())
.with_field(Field::new("item", DataType::UInt64, false));
// flat columns
let mut flat_src: Vec<u64> = Vec::new();
let mut flat_payload: Vec<String> = Vec::new();
let mut flat_weight: Vec<f64> = Vec::new();
let mut flat_dst: Vec<u64> = Vec::new();
let len_for = |i: usize, rng: &mut StdRng| -> usize {
match params.fanout {
FanoutShape::Uniform { target } => target,
FanoutShape::Skewed { target, heavy_fraction } => {
if (i as f64) / (params.n_src as f64) < heavy_fraction {
target.saturating_mul(10)
} else {
let jitter: i64 = rng.gen_range(-2..=2);
((target as i64 + jitter).max(0)) as usize
}
}
}
};
for i in 0..params.n_src {
let src = i as u64;
let payload = format!("p_{:06}", i);
let weight = rng.r#gen::<f64>();
src_ids.append_value(src);
payloads.push(payload.clone());
weights.push(weight);
let n_neighbors = len_for(i, &mut rng);
for _ in 0..n_neighbors {
let dst: u64 = rng.gen_range(0..(params.n_src as u64).max(1));
list_builder.values().append_value(dst);
flat_src.push(src);
flat_payload.push(payload.clone());
flat_weight.push(weight);
flat_dst.push(dst);
}
list_builder.append(true);
}
let neighbors_field = Field::new(
"_neighbors",
DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))),
false,
);
let factorized_schema = Arc::new(Schema::new(vec![
Field::new("src_id", DataType::UInt64, false),
Field::new("payload", DataType::Utf8, false),
Field::new("weight", DataType::Float64, false),
neighbors_field,
]));
let factorized = RecordBatch::try_new(
factorized_schema,
vec![
Arc::new(src_ids.finish()),
Arc::new(StringArray::from(payloads)),
Arc::new(Float64Array::from(weights)),
Arc::new(list_builder.finish()),
],
)
.expect("factorized record batch");
let flat_schema = Arc::new(Schema::new(vec![
Field::new("src_id", DataType::UInt64, false),
Field::new("payload", DataType::Utf8, false),
Field::new("weight", DataType::Float64, false),
Field::new("dst", DataType::UInt64, false),
]));
let flat = RecordBatch::try_new(
flat_schema,
vec![
Arc::new(UInt64Array::from(flat_src)),
Arc::new(StringArray::from(flat_payload)),
Arc::new(Float64Array::from(flat_weight)),
Arc::new(UInt64Array::from(flat_dst)),
],
)
.expect("flat record batch");
(factorized, flat)
}
/// Total number of (src, dst) edges encoded in a factorized batch.
pub fn factorized_edge_count(batch: &RecordBatch) -> usize {
let list = batch
.column_by_name("_neighbors")
.expect("_neighbors column")
.as_any()
.downcast_ref::<arrow_array::ListArray>()
.expect("ListArray");
let offsets = list.value_offsets();
let last = offsets.last().copied().unwrap_or(0);
last as usize
}

View file

@ -0,0 +1,301 @@
mod data;
mod ops;
use anyhow::Result;
use arrow_array::RecordBatch;
use crate::data::{DataParams, FanoutShape, build, factorized_edge_count};
use crate::ops::{
OpResult, aggregate_on_list_sql_factorized, aggregate_sql_factorized, aggregate_sql_flat,
explain_factorized, filter_sql, join_on_list_sql_factorized, join_sql_factorized,
join_sql_flat, probe_unnest_flatten, project_sql_factorized, project_sql_flat, run_sql,
sort_sql_factorized, sort_sql_flat,
};
/// One row in the final per-op recommendation matrix.
#[derive(Debug, Clone)]
struct OpRow {
op_name: &'static str,
n_src: usize,
fanout: String,
factorized: OpResult,
flat: Option<OpResult>,
}
fn print_table(rows: &[OpRow]) {
println!("{:-^140}", " factorized-batches results ");
println!(
"{:<22} {:>6} {:>14} {:>8} {:>10} {:>10} {:>10} {:>10} {:>10} {:>12} {}",
"op", "n_src", "fanout", "f_ok", "f_rows", "f_time_ms", "x_ok", "x_rows", "x_time_ms",
"speedup", "recommendation"
);
println!("{:-<140}", "");
for r in rows {
let f_ok = if r.factorized.accepts { "Y" } else { "N" };
let f_time = format!("{:.2}", r.factorized.time_ms);
let (x_ok, x_rows, x_time, speedup) = match &r.flat {
Some(flat) => {
let ok = if flat.accepts { "Y" } else { "N" };
let speedup = if flat.accepts && r.factorized.accepts && flat.time_ms > 0.0 {
format!("{:.2}x", flat.time_ms / r.factorized.time_ms.max(1e-3))
} else {
"-".to_string()
};
(
ok.to_string(),
flat.out_rows.to_string(),
format!("{:.2}", flat.time_ms),
speedup,
)
}
None => ("-".into(), "-".into(), "-".into(), "-".into()),
};
let rec = recommendation(r);
println!(
"{:<22} {:>6} {:>14} {:>8} {:>10} {:>10} {:>10} {:>10} {:>10} {:>12} {}",
r.op_name, r.n_src, r.fanout, f_ok, r.factorized.out_rows, f_time,
x_ok, x_rows, x_time, speedup, rec
);
if let Some(err) = &r.factorized.error {
println!(" factorized error: {err}");
}
if let Some(flat) = &r.flat {
if let Some(err) = &flat.error {
println!(" flat error: {err}");
}
}
}
}
/// Map (accepts, error class) -> {KEEP_FACTORIZED, FLATTEN_BEFORE, MULTIPLICITY_AWARE_FUTURE}.
fn recommendation(row: &OpRow) -> &'static str {
if !row.factorized.accepts {
return "FLATTEN_BEFORE";
}
match (&row.flat, row.factorized.out_rows) {
(Some(flat), f_rows) if flat.accepts => {
// If factorized emits a superset of rows-of-interest with no
// multiplicity loss, KEEP. If it changes semantics, demand
// multiplicity awareness.
if row.op_name == "aggregate_on_list" || row.op_name == "join_on_list" {
// Semantically different from a flat baseline.
"MULTIPLICITY_AWARE_FUTURE"
} else if f_rows <= flat.out_rows {
"KEEP_FACTORIZED"
} else {
"FLATTEN_BEFORE"
}
}
_ => "KEEP_FACTORIZED",
}
}
async fn run_one_op(
op_name: &'static str,
factorized: RecordBatch,
flat_for_op: Option<RecordBatch>,
factorized_sql: &str,
flat_sql: Option<&str>,
params: &DataParams,
fanout_label: String,
) -> OpRow {
let f = run_sql(op_name, "factorized", factorized, "t", factorized_sql).await;
let x = match (flat_for_op, flat_sql) {
(Some(b), Some(sql)) => Some(run_sql(op_name, "flat", b, "t", sql).await),
_ => None,
};
OpRow {
op_name,
n_src: params.n_src,
fanout: fanout_label,
factorized: f,
flat: x,
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> Result<()> {
// Cells from the ticket: 10K source rows × {1, 10, 100, 1000} neighbors,
// plus a skewed cell.
let cells: Vec<DataParams> = vec![
DataParams {
n_src: 10_000,
fanout: FanoutShape::Uniform { target: 1 },
seed: 7,
},
DataParams {
n_src: 10_000,
fanout: FanoutShape::Uniform { target: 10 },
seed: 7,
},
DataParams {
n_src: 10_000,
fanout: FanoutShape::Uniform { target: 100 },
seed: 7,
},
DataParams {
n_src: 10_000,
fanout: FanoutShape::Uniform { target: 1000 },
seed: 7,
},
DataParams {
n_src: 10_000,
fanout: FanoutShape::Skewed {
target: 10,
heavy_fraction: 0.02,
},
seed: 7,
},
];
let mut rows: Vec<OpRow> = Vec::new();
for params in &cells {
let (factorized, flat) = build(params);
let edges = factorized_edge_count(&factorized);
let label = match params.fanout {
FanoutShape::Uniform { target } => format!("u={target}"),
FanoutShape::Skewed { target, heavy_fraction } => format!("s={target}/{heavy_fraction}"),
};
println!(
"\n[cell] n_src={} fanout={} edges={}\n",
params.n_src, label, edges
);
rows.push(
run_one_op(
"filter",
factorized.clone(),
Some(flat.clone()),
filter_sql(),
Some(filter_sql()),
params,
label.clone(),
)
.await,
);
rows.push(
run_one_op(
"project",
factorized.clone(),
Some(flat.clone()),
project_sql_factorized(),
Some(project_sql_flat()),
params,
label.clone(),
)
.await,
);
rows.push(
run_one_op(
"sort",
factorized.clone(),
Some(flat.clone()),
sort_sql_factorized(),
Some(sort_sql_flat()),
params,
label.clone(),
)
.await,
);
rows.push(
run_one_op(
"aggregate_scalar",
factorized.clone(),
Some(flat.clone()),
aggregate_sql_factorized(),
Some(aggregate_sql_flat()),
params,
label.clone(),
)
.await,
);
rows.push(
run_one_op(
"aggregate_on_list",
factorized.clone(),
None,
aggregate_on_list_sql_factorized(),
None,
params,
label.clone(),
)
.await,
);
rows.push(
run_one_op(
"join_scalar",
factorized.clone(),
Some(flat.clone()),
join_sql_factorized(),
Some(join_sql_flat()),
params,
label.clone(),
)
.await,
);
rows.push(
run_one_op(
"join_on_list",
factorized.clone(),
None,
join_on_list_sql_factorized(),
None,
params,
label.clone(),
)
.await,
);
// Calibrate the cost of an explicit `Flatten` (UNNEST) on the
// factorized batch alone. This is the "flatten cost" column the
// writeup needs.
let unnest = probe_unnest_flatten(factorized.clone(), "t").await;
rows.push(OpRow {
op_name: "unnest_flatten",
n_src: params.n_src,
fanout: label.clone(),
factorized: unnest,
flat: None,
});
}
print_table(&rows);
// Capture one EXPLAIN per representative op to anchor the writeup.
let probe_params = DataParams {
n_src: 1000,
fanout: FanoutShape::Uniform { target: 10 },
seed: 1,
};
let (factorized, _) = build(&probe_params);
println!("\n[explain] aggregate_scalar (factorized input):");
println!(
"{}",
explain_factorized(factorized.clone(), "t", aggregate_sql_factorized())
.await
.unwrap_or_else(|e| format!("<explain failed: {e:#}>"))
);
println!("\n[explain] join_scalar (factorized input):");
println!(
"{}",
explain_factorized(factorized.clone(), "t", join_sql_factorized())
.await
.unwrap_or_else(|e| format!("<explain failed: {e:#}>"))
);
println!("\n[explain] aggregate_on_list (factorized input):");
println!(
"{}",
explain_factorized(factorized.clone(), "t", aggregate_on_list_sql_factorized())
.await
.unwrap_or_else(|e| format!("<explain failed: {e:#}>"))
);
println!("\n[explain] sort (factorized input):");
println!(
"{}",
explain_factorized(factorized, "t", sort_sql_factorized())
.await
.unwrap_or_else(|e| format!("<explain failed: {e:#}>"))
);
Ok(())
}

View file

@ -0,0 +1,188 @@
//! Per-operator probes.
//!
//! Each probe runs a tiny DataFusion pipeline once. We capture:
//! * accepts_list_input: did planning + execution complete without error?
//! * time_ms: wall-clock execution time.
//! * out_rows: total rows emitted across all output batches.
//! * out_bytes: summed estimated arrow buffer size of output rows
//! (a stand-in for peak memory of the consumer side).
use std::sync::Arc;
use std::time::Instant;
use anyhow::{Context, Result};
use arrow_array::RecordBatch;
use datafusion::datasource::MemTable;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::prelude::*;
use futures::stream::StreamExt;
#[derive(Clone, Debug)]
pub struct OpResult {
pub op_name: &'static str,
pub variant: &'static str, // "factorized" | "flat"
pub accepts: bool,
pub error: Option<String>,
pub time_ms: f64,
pub out_rows: usize,
pub out_batches: usize,
pub out_bytes: usize,
}
fn make_ctx(batch: RecordBatch, table_name: &str) -> Result<SessionContext> {
let ctx = SessionContext::new();
let schema = batch.schema();
let table = MemTable::try_new(schema, vec![vec![batch]])?;
ctx.register_table(table_name, Arc::new(table))?;
Ok(ctx)
}
fn batch_bytes(b: &RecordBatch) -> usize {
b.columns()
.iter()
.map(|c| c.get_array_memory_size())
.sum::<usize>()
}
async fn collect_stream(stream: SendableRecordBatchStream) -> Result<(Vec<RecordBatch>, usize, usize)> {
let mut batches = Vec::new();
let mut rows = 0usize;
let mut bytes = 0usize;
let mut s = stream;
while let Some(b) = s.next().await {
let b = b?;
rows += b.num_rows();
bytes += batch_bytes(&b);
batches.push(b);
}
Ok((batches, rows, bytes))
}
pub async fn run_sql(
op_name: &'static str,
variant: &'static str,
batch: RecordBatch,
table_name: &str,
sql: &str,
) -> OpResult {
let mut result = OpResult {
op_name,
variant,
accepts: false,
error: None,
time_ms: 0.0,
out_rows: 0,
out_batches: 0,
out_bytes: 0,
};
let ctx = match make_ctx(batch, table_name) {
Ok(v) => v,
Err(e) => {
result.error = Some(format!("setup: {e:#}"));
return result;
}
};
let started = Instant::now();
let df = match ctx.sql(sql).await {
Ok(df) => df,
Err(e) => {
result.error = Some(format!("plan: {e:#}"));
result.time_ms = started.elapsed().as_secs_f64() * 1e3;
return result;
}
};
let stream = match df.execute_stream().await {
Ok(s) => s,
Err(e) => {
result.error = Some(format!("execute: {e:#}"));
result.time_ms = started.elapsed().as_secs_f64() * 1e3;
return result;
}
};
match collect_stream(stream).await {
Ok((batches, rows, bytes)) => {
result.accepts = true;
result.out_rows = rows;
result.out_batches = batches.len();
result.out_bytes = bytes;
}
Err(e) => {
result.error = Some(format!("collect: {e:#}"));
}
}
result.time_ms = started.elapsed().as_secs_f64() * 1e3;
result
}
pub fn filter_sql() -> &'static str {
"SELECT * FROM t WHERE src_id < 5000"
}
pub fn project_sql_factorized() -> &'static str {
"SELECT src_id, _neighbors FROM t"
}
pub fn project_sql_flat() -> &'static str {
"SELECT src_id, dst FROM t"
}
pub fn sort_sql_factorized() -> &'static str {
"SELECT src_id, _neighbors FROM t ORDER BY src_id DESC LIMIT 1000"
}
pub fn sort_sql_flat() -> &'static str {
"SELECT src_id, dst FROM t ORDER BY src_id DESC LIMIT 1000"
}
pub fn aggregate_sql_factorized() -> &'static str {
"SELECT substr(payload, 1, 4) AS bucket, count(*) AS n FROM t GROUP BY 1 ORDER BY 1"
}
pub fn aggregate_sql_flat() -> &'static str {
"SELECT substr(payload, 1, 4) AS bucket, count(*) AS n FROM t GROUP BY 1 ORDER BY 1"
}
pub fn aggregate_on_list_sql_factorized() -> &'static str {
"SELECT _neighbors, count(*) AS n FROM t GROUP BY _neighbors"
}
pub fn join_sql_factorized() -> &'static str {
"SELECT a.src_id, a._neighbors FROM t a JOIN t b ON a.src_id = b.src_id LIMIT 100"
}
pub fn join_on_list_sql_factorized() -> &'static str {
"SELECT count(*) FROM t a JOIN t b ON a._neighbors = b._neighbors"
}
pub fn join_sql_flat() -> &'static str {
"SELECT a.src_id, a.dst FROM t a JOIN t b ON a.src_id = b.src_id LIMIT 100"
}
pub async fn probe_unnest_flatten(batch: RecordBatch, table_name: &str) -> OpResult {
let sql = "SELECT src_id, n.* FROM t CROSS JOIN UNNEST(_neighbors) AS n(dst)";
run_sql("unnest_flatten", "factorized", batch, table_name, sql).await
}
pub async fn explain_factorized(batch: RecordBatch, table_name: &str, sql: &str) -> Result<String> {
let ctx = make_ctx(batch, table_name)?;
let plan = ctx
.sql(&format!("EXPLAIN {sql}"))
.await?
.collect()
.await
.context("explain collect")?;
let mut out = String::new();
for b in plan {
let cols = b.num_columns();
let rows = b.num_rows();
for r in 0..rows {
for c in 0..cols {
let arr = b.column(c);
let s = arrow_cast::display::array_value_to_string(arr, r).unwrap_or_default();
if !s.is_empty() {
out.push_str(&s);
out.push(' ');
}
}
out.push('\n');
}
}
Ok(out)
}
#[allow(dead_code)]
pub fn batch_size(b: &RecordBatch) -> usize {
batch_bytes(b)
}