mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-30 02:49:39 +02:00
Merge pull request #16 from ModernRelay/tin-epoch
Fix join alignment for traversal-introduced bindings
This commit is contained in:
commit
063be3ddc7
4 changed files with 1032 additions and 139 deletions
|
|
@ -645,6 +645,7 @@ fn execute_pipeline<'a>(
|
|||
dst_type,
|
||||
min_hops,
|
||||
max_hops,
|
||||
dst_filters,
|
||||
} => {
|
||||
let gi = graph_index.ok_or_else(|| {
|
||||
OmniError::manifest("graph index required for traversal".to_string())
|
||||
|
|
@ -652,7 +653,7 @@ fn execute_pipeline<'a>(
|
|||
if let Some(batch) = wide.as_mut() {
|
||||
execute_expand(
|
||||
batch, gi, snapshot, catalog, src_var, dst_var, edge_type, *direction,
|
||||
dst_type, *min_hops, *max_hops,
|
||||
dst_type, *min_hops, *max_hops, dst_filters, params,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
|
@ -683,6 +684,8 @@ async fn execute_expand(
|
|||
dst_type: &str,
|
||||
min_hops: u32,
|
||||
max_hops: Option<u32>,
|
||||
dst_filters: &[IRFilter],
|
||||
params: &ParamMap,
|
||||
) -> Result<()> {
|
||||
let src_id_col_name = format!("{}.id", src_var);
|
||||
let src_ids = wide
|
||||
|
|
@ -766,8 +769,15 @@ async fn execute_expand(
|
|||
}
|
||||
}
|
||||
|
||||
// Hydrate destination nodes from the snapshot
|
||||
let dst_batch = hydrate_nodes(snapshot, catalog, dst_type, &dst_id_list).await?;
|
||||
// Split dst_filters: SQL-pushable go to Lance, the rest applied post-hconcat
|
||||
let pushdown_sql = build_lance_filter(dst_filters, params);
|
||||
let non_pushable: Vec<&IRFilter> = dst_filters
|
||||
.iter()
|
||||
.filter(|f| ir_filter_to_sql(f, params).is_none())
|
||||
.collect();
|
||||
|
||||
// Hydrate destination nodes from the snapshot (with pushed-down filters)
|
||||
let dst_batch = hydrate_nodes(snapshot, catalog, dst_type, &dst_id_list, pushdown_sql.as_deref()).await?;
|
||||
|
||||
// Build a mapping from dst_id to row index in dst_batch
|
||||
let dst_batch_id_col = dst_batch
|
||||
|
|
@ -796,15 +806,26 @@ async fn execute_expand(
|
|||
let dst_prefixed = prefix_batch(&dst_batch, dst_var)?;
|
||||
let aligned_dst = take_batch(&dst_prefixed, &dst_take)?;
|
||||
*wide = hconcat_batches(&expanded_wide, &aligned_dst)?;
|
||||
|
||||
// Apply any non-pushable destination filters (e.g. list-contains) in memory
|
||||
for f in &non_pushable {
|
||||
apply_filter(wide, f, params)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load full node rows for a set of IDs from a snapshot.
|
||||
///
|
||||
/// When `extra_filter_sql` is provided (from deferred destination-binding
|
||||
/// filters), it is ANDed with the `id IN (...)` clause so that Lance can
|
||||
/// skip non-matching rows at the storage level.
|
||||
async fn hydrate_nodes(
|
||||
snapshot: &Snapshot,
|
||||
catalog: &Catalog,
|
||||
type_name: &str,
|
||||
ids: &[String],
|
||||
extra_filter_sql: Option<&str>,
|
||||
) -> Result<RecordBatch> {
|
||||
let node_type = catalog
|
||||
.node_types
|
||||
|
|
@ -823,7 +844,10 @@ async fn hydrate_nodes(
|
|||
.iter()
|
||||
.map(|id| format!("'{}'", id.replace('\'', "''")))
|
||||
.collect();
|
||||
let filter_sql = format!("id IN ({})", escaped.join(", "));
|
||||
let mut filter_sql = format!("id IN ({})", escaped.join(", "));
|
||||
if let Some(extra) = extra_filter_sql {
|
||||
filter_sql = format!("({}) AND ({})", filter_sql, extra);
|
||||
}
|
||||
let has_blobs = !node_type.blob_properties.is_empty();
|
||||
let non_blob_cols: Vec<&str> = node_type
|
||||
.arrow_schema
|
||||
|
|
@ -877,6 +901,7 @@ fn try_bulk_anti_join_mask(
|
|||
src_var,
|
||||
edge_type,
|
||||
direction,
|
||||
dst_filters,
|
||||
..
|
||||
} = &inner_pipeline[0]
|
||||
else {
|
||||
|
|
@ -885,6 +910,11 @@ fn try_bulk_anti_join_mask(
|
|||
if src_var != outer_var {
|
||||
return None;
|
||||
}
|
||||
// Bulk CSR check only tests neighbor existence, not destination
|
||||
// properties. Fall back to the slow path when dst_filters are present.
|
||||
if !dst_filters.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let gi = graph_index?;
|
||||
let edge_def = catalog.edge_types.get(edge_type.as_str())?;
|
||||
|
||||
|
|
|
|||
|
|
@ -396,3 +396,318 @@ query insert_no_name($age: I32) {
|
|||
|
||||
assert!(result.is_err(), "insert without @key property should fail");
|
||||
}
|
||||
|
||||
// ─── Join alignment: traversal + destination binding ───────────────────────
|
||||
|
||||
/// Traversal with destination binding filter constrains the source.
|
||||
/// Regression: previously over-returned because the lowering created a
|
||||
/// cross-join followed by cycle-closing instead of Expand + post-filter.
|
||||
#[tokio::test]
|
||||
async fn traversal_destination_binding_constrains_source() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
// Only Alice works at Acme. The binding on $c must constrain $p.
|
||||
let queries = r#"
|
||||
query at_acme() {
|
||||
match {
|
||||
$p: Person
|
||||
$p worksAt $c
|
||||
$c: Company { name: "Acme" }
|
||||
}
|
||||
return { $p.name }
|
||||
}
|
||||
"#;
|
||||
let result = query_main(&mut db, queries, "at_acme", &ParamMap::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
let names = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
assert_eq!(names.len(), 1);
|
||||
assert_eq!(names.value(0), "Alice");
|
||||
}
|
||||
|
||||
/// Multi-variable projection: columns from source and destination must be
|
||||
/// row-aligned. Previously this could fail with "all columns must have
|
||||
/// the same length" when variables had different cardinalities.
|
||||
#[tokio::test]
|
||||
async fn traversal_multi_variable_projection_aligned() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
let queries = r#"
|
||||
query employee_companies() {
|
||||
match {
|
||||
$p: Person
|
||||
$p worksAt $c
|
||||
$c: Company
|
||||
}
|
||||
return { $p.name, $c.name }
|
||||
}
|
||||
"#;
|
||||
let result = query_main(&mut db, queries, "employee_companies", &ParamMap::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
// Alice→Acme, Bob→Globex
|
||||
assert_eq!(batch.num_rows(), 2);
|
||||
let person_names = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
let company_names = batch
|
||||
.column(1)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
|
||||
let mut pairs: Vec<(&str, &str)> = (0..batch.num_rows())
|
||||
.map(|i| (person_names.value(i), company_names.value(i)))
|
||||
.collect();
|
||||
pairs.sort();
|
||||
assert_eq!(pairs, vec![("Alice", "Acme"), ("Bob", "Globex")]);
|
||||
}
|
||||
|
||||
/// Multi-hop projection: all three variables must be row-aligned.
|
||||
#[tokio::test]
|
||||
async fn multi_hop_projection_aligned() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
// Alice knows Bob, Bob knows Diana.
|
||||
// Alice→Bob→Diana is the only 2-hop path.
|
||||
let queries = r#"
|
||||
query fof_chain($name: String) {
|
||||
match {
|
||||
$p: Person { name: $name }
|
||||
$p knows $mid
|
||||
$mid knows $fof
|
||||
}
|
||||
return { $p.name, $mid.name, $fof.name }
|
||||
}
|
||||
"#;
|
||||
let result = query_main(
|
||||
&mut db,
|
||||
queries,
|
||||
"fof_chain",
|
||||
¶ms(&[("$name", "Alice")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
assert_eq!(batch.num_rows(), 1);
|
||||
let col0 = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
let col1 = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
let col2 = batch.column(2).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
assert_eq!(col0.value(0), "Alice");
|
||||
assert_eq!(col1.value(0), "Bob");
|
||||
assert_eq!(col2.value(0), "Diana");
|
||||
}
|
||||
|
||||
/// Multi-hop with destination binding filters at each hop.
|
||||
#[tokio::test]
|
||||
async fn multi_hop_with_intermediate_binding_filters() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
// Alice knows Bob and Charlie.
|
||||
// Bob knows Diana. Charlie knows nobody.
|
||||
// Filter $mid to only "Bob" → only Alice→Bob→Diana survives.
|
||||
let queries = r#"
|
||||
query fof_via($name: String, $mid_name: String) {
|
||||
match {
|
||||
$p: Person { name: $name }
|
||||
$p knows $mid
|
||||
$mid: Person { name: $mid_name }
|
||||
$mid knows $fof
|
||||
}
|
||||
return { $fof.name }
|
||||
}
|
||||
"#;
|
||||
let result = query_main(
|
||||
&mut db,
|
||||
queries,
|
||||
"fof_via",
|
||||
¶ms(&[("$name", "Alice"), ("$mid_name", "Bob")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
let names = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
assert_eq!(names.len(), 1);
|
||||
assert_eq!(names.value(0), "Diana");
|
||||
}
|
||||
|
||||
/// Destination binding with filter + multi-variable return: the classic
|
||||
/// "join across a traversal" scenario that triggers the bug.
|
||||
#[tokio::test]
|
||||
async fn traversal_destination_filter_with_multi_return() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
let queries = r#"
|
||||
query at_acme_named() {
|
||||
match {
|
||||
$p: Person
|
||||
$p worksAt $c
|
||||
$c: Company { name: "Acme" }
|
||||
}
|
||||
return { $p.name, $c.name }
|
||||
}
|
||||
"#;
|
||||
let result = query_main(&mut db, queries, "at_acme_named", &ParamMap::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
assert_eq!(batch.num_rows(), 1);
|
||||
let person = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
let company = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
assert_eq!(person.value(0), "Alice");
|
||||
assert_eq!(company.value(0), "Acme");
|
||||
}
|
||||
|
||||
/// Parameterized destination filter exercises param resolution through the
|
||||
/// Lance SQL pushdown path (params are resolved to literals in ir_expr_to_sql).
|
||||
#[tokio::test]
|
||||
async fn traversal_destination_filter_pushdown_with_param() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
let queries = r#"
|
||||
query at_company($company: String) {
|
||||
match {
|
||||
$p: Person
|
||||
$p worksAt $c
|
||||
$c: Company { name: $company }
|
||||
}
|
||||
return { $p.name, $c.name }
|
||||
}
|
||||
"#;
|
||||
let result = query_main(
|
||||
&mut db,
|
||||
queries,
|
||||
"at_company",
|
||||
¶ms(&[("$company", "Globex")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
assert_eq!(batch.num_rows(), 1);
|
||||
let person = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
let company = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
assert_eq!(person.value(0), "Bob");
|
||||
assert_eq!(company.value(0), "Globex");
|
||||
}
|
||||
|
||||
/// Fan-out: one source expanded to two different destination types.
|
||||
/// Each (friend, company) pair should be a cross-product per source row.
|
||||
#[tokio::test]
|
||||
async fn fan_out_two_destinations() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
let queries = r#"
|
||||
query fan_out($name: String) {
|
||||
match {
|
||||
$p: Person { name: $name }
|
||||
$p knows $f
|
||||
$p worksAt $c
|
||||
}
|
||||
return { $f.name, $c.name }
|
||||
}
|
||||
"#;
|
||||
// Alice knows Bob and Charlie, works at Acme.
|
||||
// Each friend paired with her company → 2 rows.
|
||||
let result = query_main(
|
||||
&mut db,
|
||||
queries,
|
||||
"fan_out",
|
||||
¶ms(&[("$name", "Alice")]),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
assert_eq!(batch.num_rows(), 2);
|
||||
let friends = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
let companies = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
|
||||
|
||||
let mut pairs: Vec<(&str, &str)> = (0..batch.num_rows())
|
||||
.map(|i| (friends.value(i), companies.value(i)))
|
||||
.collect();
|
||||
pairs.sort();
|
||||
assert_eq!(pairs, vec![("Bob", "Acme"), ("Charlie", "Acme")]);
|
||||
}
|
||||
|
||||
/// Deferred destination filter that matches nothing → empty result.
|
||||
#[tokio::test]
|
||||
async fn traversal_destination_filter_no_match() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
let queries = r#"
|
||||
query at_phantom() {
|
||||
match {
|
||||
$p: Person
|
||||
$p worksAt $c
|
||||
$c: Company { name: "NonExistent" }
|
||||
}
|
||||
return { $p.name }
|
||||
}
|
||||
"#;
|
||||
let result = query_main(&mut db, queries, "at_phantom", &ParamMap::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.num_rows(), 0);
|
||||
}
|
||||
|
||||
/// Negation with inner destination binding filter.
|
||||
/// "People who do NOT work at Acme" — uses binding syntax inside negation.
|
||||
#[tokio::test]
|
||||
async fn negation_with_inner_destination_binding() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
let queries = r#"
|
||||
query not_at_acme_binding() {
|
||||
match {
|
||||
$p: Person
|
||||
not {
|
||||
$p worksAt $c
|
||||
$c: Company { name: "Acme" }
|
||||
}
|
||||
}
|
||||
return { $p.name }
|
||||
}
|
||||
"#;
|
||||
// Alice→Acme. Everyone else should be returned.
|
||||
let result = query_main(&mut db, queries, "not_at_acme_binding", &ParamMap::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = result.concat_batches().unwrap();
|
||||
let names = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
||||
names_vec.sort();
|
||||
assert_eq!(names_vec, vec!["Bob", "Charlie", "Diana"]);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue