- exp 1.5 (bitmap-pushdown): DF 52.5 DynamicFilterPhysicalExpr supports bitmap-shaped pushdown as-written; no fork needed; Path A (per-batch evaluation) ships v1, Path B (Lance RowIdMask) is v2 optimization - exp 1.6 (txn-branches-cost): Lance per-table branches are +4N S3 PUTs per txn vs current lazy-graph-branch model; side-grade not clean win; recommend keeping current model for v1 - exp 1.7 (stable-row-id-compaction): stable row IDs already enabled everywhere in OmniGraph; Path B (OmniGraph-driven remap via FragReuseIndex public API) ships today; Path A (Lance-managed) is v2 follow-up gated on \xa71.2 plugin registry - 2.x deferred with rationale: all calibration / risk-quantification work, per ticket \xa70.3 acceptance criteria do not require 2.x - 3.1 Kuzu: factorization, semi-mask, dual-level hash index, variable-length expansion - 3.2 LanceDB: TableProvider patterns, mutation-as-IR gap, no segment-aware planning in OSS - 3.3 lance-graph: pure-SQL lowering trade-offs, 20-hop cap, Cypher AST liftable - 3.4 Comet/GlareDB/ParadeDB/Spice.ai: capability advertisement, DF API churn budget - 3.5 DuckDB: factorization calibration point (5-100x slower on multi-hop), DuckDB ext API as plugin gold standard - 3.6 Trino: cost model with 3 components (CPU/mem/network), Connector SPI as versioned plugin reference, dynamic filters analog
11 KiB
Experiment 1.5 — Extending DataFusion dynamic-filter-pushdown to bitmap shape (code-dive)
Ticket: MR-925 §1.5 (validates MR-737 §5.6, §5.7 / Open Q3). Type: Code-dive only (no prototype crate). Substrate pin: DataFusion 52.5. Date: 2026-05-12.
Question
The dynamic-filter-pushdown (DFP) feature in DataFusion 52.5 ships with three pushdown strategies for hash-join build sides:
InList(ArrayRef)— for small build sides (< 128 MB).HashTable(Arc<dyn JoinHashMapType>)— for large build sides.Empty— no rows, do not push.
Can a third-party operator (e.g. our NeighborExpand from §1.3, or
the broader graph-engine BackJoin and NeighborSetIntersect from
MR-737 §5.3) extend the same machinery to push a roaring-bitmap-shaped
filter through DF's dynamic filter framework — without forking DF?
TL;DR
Yes, completely supported on DataFusion 52.5 as written. The
extension footprint is roughly 200 LoC: implement a custom
PhysicalExpr (e.g. BitmapMembershipExpr) and feed it to the
existing public DynamicFilterPhysicalExpr::update(...) API. No
fork, no pub(crate) work-around.
Findings
F1. DataFusion's DFP is expression-shaped, not enum-shaped.
The PushdownStrategy enum (InList | HashTable | Empty) is internal
to joins::hash_join::shared_bounds — it is the HashJoinExec's own
internal switch for selecting which physical-expr to construct for
its dynamic filter. The framework itself does not care:
// datafusion-physical-expr-52.5.0/src/expressions/dynamic_filters.rs
pub fn update(&self, new_expr: Arc<dyn PhysicalExpr>) -> Result<()>
The update method takes any Arc<dyn PhysicalExpr>. So an
operator outside the hash-join module is free to ship its own pushdown
strategy (e.g. BitmapMembership) and pass it to update.
F2. Three things must hold for a custom dynamic-filter expr to work.
From reading dynamic_filters.rs:
-
Stable children at construction time.
DynamicFilterPhysicalExpr::new(children, initial_expr)binds the column-leaves at construction. Subsequentupdates may only swap the expression, not introduce new column references. ForBitmapMembershipExpr { column: Column::new("id"), bitmap_bytes: ... }, the only child iscolumn— stable. -
Self-contained
evaluate. The customPhysicalExprmust implementfn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>to return aBooleanArrayof the same length as the input batch. For a roaring bitmap this is: deserialize once at first call, cache inOnceCell, then per-batchfor i in 0..n: out[i] = bitmap.contains(col[i]). -
Be a
dyn PhysicalExpr— implementDebug,Display,data_type,nullable,evaluate,children,with_new_children,dyn_hash,dyn_eq. Standard boilerplate, mirroringInListExpr.
F3. Two pushdown paths exist; only one needs work for graph operators.
DataFusion 52.5 has two filter-pushdown phases (see
ExecutionPlan::gather_filters_for_pushdown and
ExecutionPlan::handle_child_pushdown_result in execution_plan.rs):
- Static pushdown (planning time): filters are pushed from
FilterExec→HashJoinExec→DataSourceExecduring theEnforceFilterPushdownphysical optimizer rule. - Dynamic pushdown (execution time): a
DynamicFilterPhysicalExprplaceholder is left in the plan at planning time; at runtime, the producer operator callsupdate(new_expr)once its data is available (e.g. once the hash-join build side is materialized).
For graph operators that produce SIPs (NeighborExpand build phase,
SemiJoin build phase, etc.), the dynamic path is the natural one.
The producer pattern is:
// At plan-construction time (in our ExtensionPlanner):
let placeholder = lit(true);
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("dst_id", 2))], // probe-side column refs
placeholder,
));
// Wire dynamic_filter into both the probe-side scan (as a filter)
// and store an Arc<DynamicFilterPhysicalExpr> on our build-side operator.
// At execute time, once our build side completes:
let bitmap = build_roaring_from(build_ids)?;
let bitmap_bytes = serialize_to_vec(bitmap);
let pushdown_expr = Arc::new(BitmapMembershipExpr {
column: probe_side_column_ref,
bitmap_bytes,
});
self.dynamic_filter.update(pushdown_expr)?;
F4. The scan-side has two interception points.
A custom dynamic filter ends up at the scan. Two paths exist for the scan to consume it efficiently:
Path A. Generic predicate evaluation (works today, no DF fork).
The BitmapMembershipExpr::evaluate(batch) is called per batch. For
each batch row, bitmap.contains(row.value) is invoked. The roaring
crate's contains is O(log n) within a fragment-localized container
and was measured at <0.1 µs per call in §1.4. For a 1024-row batch,
this is ~100 µs of CPU, which is amortized against the I/O for that
batch. This is enough for §5.6 as written.
Path B. Lance scan-level row-id mask (faster, needs Lance integration).
Lance's Scanner supports a RowIdMask that is applied at the scan
level before any predicate evaluation. If our BitmapMembershipExpr
targets a Lance row-ID column, we could extract the bitmap during
the scan's handle_child_pushdown_result and convert it into a
RowIdMask — completely bypassing the per-row predicate. This is
the same trick Lance's full-text search uses today (see Lance's
scalar.rs apply_full_text_search_index).
Path B requires changes to Lance's DataSourceExec or our wrapping
adapter; Path A is zero-change to DataFusion or Lance.
F5. The static-pushdown phase passes-through unrecognized exprs cleanly.
FilterDescription::all_unsupported(parent_filters, &children) is the
default for gather_filters_for_pushdown. Our custom BitmapMembershipExpr
is just an unrecognized expr — it will not be pushed past operators
that haven't opted into bitmap pushdown, and at the leaf DataSourceExec
it falls back to per-batch evaluation (Path A above). No silent
misbehavior, no crash, no need to teach DF about our expression shape.
F6. The framework does NOT support N pushdown sources to the same scan.
A DynamicFilterPhysicalExpr wraps one inner expression at a time.
If two producers (e.g. Expand(a) and Expand(b)) both want to push
bitmap filters onto the same probe scan, each calls update on its
own dynamic filter; the scan must hold N separate dynamic filters
and AND them at evaluation time. The plumbing for this (multiple
Arc<DynamicFilterPhysicalExpr> on a scan) is standard BinaryExpr(AND)
wrapping. No framework gap.
Concrete plan for §5.6 (RFC body delta)
The RFC §5.6 should specify:
-
Bitmap-shaped SIPs are propagated via the standard
DynamicFilterPhysicalExprAPI. No custom side-channel; reuse the framework. Producer callsupdate(new_expr); scan evaluates the resultingBooleanArrayper batch. -
A new public
BitmapMembershipExprlives in our graph crate (not in the DF tree). It is constructed with aColumnchild and an opaqueVec<u8>payload (roaring serialized bytes). ImplementsPhysicalExpr::evaluateby deserializing the bitmap once into aOnceCell<RoaringTreemap>and probing it per row. -
Lance-aware scan adaptation is optional and incremental. Path A (per-batch evaluation) is the v1 implementation. Path B (scan-level
RowIdMask) is a v2 optimization that requires aLanceDataSourceExecto special-caseBitmapMembershipExprin itshandle_child_pushdown_resultimpl. The RFC should call out Path B as a follow-up, not a blocker. -
N producers to one scan: AND-wrap. The scan holds N
Arc<DynamicFilterPhysicalExpr>. At plan-construction time, the probe filter wires them all in viaBinaryExpr::new(a, AND, BinaryExpr::new(b, AND, c)). No bespoke "multi-source" data structure.
What does NOT need a DataFusion fork
- Custom dynamic-filter expression shapes. Public via
Arc<dyn PhysicalExpr>+update. - Custom dynamic-filter producers. Public via
DynamicFilterPhysicalExpr::new. - Custom dynamic-filter consumers. All scans evaluate via the standard
evaluateinterface. - Composition with existing DFP (InList/HashTable). Wrap with
BinaryExpr(AND).
What WOULD need a DataFusion fork or upstream contribution
-
Bitmap-aware FilterPushdownPolicy. If we want the static-pushdown pass to recognize
BitmapMembershipExprand route it specially (e.g. drop the InList variant when a bitmap is available), we'd need aFilterPushdownPolicyextension point that doesn't exist today. However, this is a planner optimization, not a correctness or capability issue. The plan still works without it. -
A typed
BitmapPushdownStrategyinSharedBuildAccumulator. Only matters if we want graph-side BackJoins to share the HashJoinExec's build-side accumulator. We don't — graph operators have their own build phases.
Decision impact on MR-737 §5.6 and §5.7
§5.6 (SIP propagation) is achievable on DF 52.5 as written. The
public DynamicFilterPhysicalExpr::update API is sufficient. No
upstream contribution required for v1.
§5.7 (cross-operator filter sharing) is achievable as AND-wrapping
of N dynamic filters on the same scan. No framework gap. The RFC
should clarify that the scan accumulates filters via standard
BinaryExpr composition, not via a bespoke multi-source channel.
Open Q3 ("can we share the SIP filter between operator stages?")
— answered yes. Confirmed by reading
datafusion-physical-expr-52.5.0/src/expressions/dynamic_filters.rs:227
(the update API) and datafusion-physical-plan-52.5.0/src/joins/hash_join/shared_bounds.rs:463
(the call site that proves the producer/consumer split works).
Caveats and follow-ups
-
No prototype was built. Per the ticket, §1.5 is a code-dive only. The recommendation rests on reading DF source, not on a working end-to-end implementation. If RFC §5.6 lands with this plan, Phase 0 should include a smoke test that:
- Wires a
BitmapMembershipExprinto aDynamicFilterPhysicalExpr. - Runs a hash join with the bitmap as the dynamic filter.
- Compares row-output and timing against an
InListExpr-shaped baseline. Estimated work: 1 day, no DF fork.
- Wires a
-
Lance scan-level
RowIdMasksupport is the right v2 follow-up — but is gated on the same plugin-registry blocker discussed in §1.2 for custom index types. TheRowIdMaskpath uses a different mechanism (it's not a scalar index), so it may not be blocked the same way. Worth a quick code-dive againstlance/src/index/scalar.rsandlance/src/dataset/scanner.rsto confirm before committing. -
DF 52.5 → 52.6 may rework parts of DFP. The PR thread for
SharedBuildAccumulatorshows active churn; pin to 52.5.x for now and re-validate when bumping to a 52.6+ minor.