mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
Fix traversal ordering: process in dependency order, not declaration order
The iterative lowering now handles traversals declared in non-topological order (e.g. `$b worksAt $c` before `$a knows $b`). Each pass processes traversals that have at least one bound endpoint, repeating until all are consumed. Caught during self-review. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
853691c70e
commit
88384476be
1 changed files with 152 additions and 91 deletions
|
|
@ -234,99 +234,119 @@ fn lower_clauses(
|
|||
bound_vars.insert(binding.variable.clone());
|
||||
}
|
||||
|
||||
// Lower traversals into Expand ops
|
||||
// Handle "cycle closing" — if both src and dst are already bound, use a filter
|
||||
for traversal in &traversals {
|
||||
let edge = catalog
|
||||
.lookup_edge_by_name(&traversal.edge_name)
|
||||
.ok_or_else(|| {
|
||||
crate::error::NanoError::Plan(format!(
|
||||
"lowering traversal referenced missing edge '{}' after typecheck",
|
||||
traversal.edge_name
|
||||
))
|
||||
})?;
|
||||
|
||||
// Determine direction from type context
|
||||
let direction = type_ctx
|
||||
.traversals
|
||||
.iter()
|
||||
.find(|rt| {
|
||||
rt.src == traversal.src && rt.dst == traversal.dst && rt.edge_type == edge.name
|
||||
})
|
||||
.map(|rt| rt.direction)
|
||||
.unwrap_or(Direction::Out);
|
||||
|
||||
let dst_type = match direction {
|
||||
Direction::Out => edge.to_type.clone(),
|
||||
Direction::In => edge.from_type.clone(),
|
||||
};
|
||||
|
||||
if bound_vars.contains(&traversal.src) && bound_vars.contains(&traversal.dst) {
|
||||
// Cycle closing: emit expand to a temp var, then filter temp.id = dst.id
|
||||
let temp_var = format!("__temp_{}", traversal.dst);
|
||||
pipeline.push(IROp::Expand {
|
||||
src_var: traversal.src.clone(),
|
||||
dst_var: temp_var.clone(),
|
||||
edge_type: edge.name.clone(),
|
||||
direction,
|
||||
dst_type,
|
||||
min_hops: traversal.min_hops,
|
||||
max_hops: traversal.max_hops,
|
||||
dst_filters: vec![],
|
||||
});
|
||||
pipeline.push(IROp::Filter(IRFilter {
|
||||
left: IRExpr::PropAccess {
|
||||
variable: temp_var,
|
||||
property: "id".to_string(),
|
||||
},
|
||||
op: CompOp::Eq,
|
||||
right: IRExpr::PropAccess {
|
||||
variable: traversal.dst.clone(),
|
||||
property: "id".to_string(),
|
||||
},
|
||||
}));
|
||||
} else if !bound_vars.contains(&traversal.src) && bound_vars.contains(&traversal.dst) {
|
||||
// Reverse expand: dst is bound, src is not.
|
||||
// Swap direction and expand from dst to discover src.
|
||||
let reverse_dir = match direction {
|
||||
Direction::Out => Direction::In,
|
||||
Direction::In => Direction::Out,
|
||||
};
|
||||
let src_type = match direction {
|
||||
Direction::Out => edge.from_type.clone(),
|
||||
Direction::In => edge.to_type.clone(),
|
||||
};
|
||||
let introduced_filters =
|
||||
deferred_filters.remove(&traversal.src).unwrap_or_default();
|
||||
pipeline.push(IROp::Expand {
|
||||
src_var: traversal.dst.clone(),
|
||||
dst_var: traversal.src.clone(),
|
||||
edge_type: edge.name.clone(),
|
||||
direction: reverse_dir,
|
||||
dst_type: src_type,
|
||||
min_hops: traversal.min_hops,
|
||||
max_hops: traversal.max_hops,
|
||||
dst_filters: introduced_filters,
|
||||
});
|
||||
if traversal.src != "_" {
|
||||
bound_vars.insert(traversal.src.clone());
|
||||
// Lower traversals into Expand ops.
|
||||
//
|
||||
// Traversals are processed iteratively rather than in a single pass
|
||||
// because deferred bindings mean a traversal's source might not be
|
||||
// bound until a prior traversal introduces it. Each pass processes
|
||||
// every traversal that has at least one bound endpoint; this repeats
|
||||
// until all traversals are consumed.
|
||||
let mut remaining: Vec<&Traversal> = traversals.to_vec();
|
||||
while !remaining.is_empty() {
|
||||
let before = remaining.len();
|
||||
remaining.retain(|traversal| {
|
||||
let src_bound = bound_vars.contains(&traversal.src);
|
||||
let dst_bound = bound_vars.contains(&traversal.dst);
|
||||
if !src_bound && !dst_bound {
|
||||
return true; // keep for next pass
|
||||
}
|
||||
} else {
|
||||
let introduced_filters =
|
||||
deferred_filters.remove(&traversal.dst).unwrap_or_default();
|
||||
pipeline.push(IROp::Expand {
|
||||
src_var: traversal.src.clone(),
|
||||
dst_var: traversal.dst.clone(),
|
||||
edge_type: edge.name.clone(),
|
||||
direction,
|
||||
dst_type,
|
||||
min_hops: traversal.min_hops,
|
||||
max_hops: traversal.max_hops,
|
||||
dst_filters: introduced_filters,
|
||||
});
|
||||
if traversal.dst != "_" {
|
||||
bound_vars.insert(traversal.dst.clone());
|
||||
// This traversal can be processed — emit the appropriate ops.
|
||||
// (errors inside retain are not ergonomic, so we rely on the
|
||||
// type-checker having validated edge names already.)
|
||||
let Some(edge) = catalog.lookup_edge_by_name(&traversal.edge_name) else {
|
||||
return true; // keep; will error on next pass or after loop
|
||||
};
|
||||
|
||||
let direction = type_ctx
|
||||
.traversals
|
||||
.iter()
|
||||
.find(|rt| {
|
||||
rt.src == traversal.src
|
||||
&& rt.dst == traversal.dst
|
||||
&& rt.edge_type == edge.name
|
||||
})
|
||||
.map(|rt| rt.direction)
|
||||
.unwrap_or(Direction::Out);
|
||||
|
||||
let dst_type = match direction {
|
||||
Direction::Out => edge.to_type.clone(),
|
||||
Direction::In => edge.from_type.clone(),
|
||||
};
|
||||
|
||||
if src_bound && dst_bound {
|
||||
// Cycle closing: expand to a temp var, then filter temp.id = dst.id
|
||||
let temp_var = format!("__temp_{}", traversal.dst);
|
||||
pipeline.push(IROp::Expand {
|
||||
src_var: traversal.src.clone(),
|
||||
dst_var: temp_var.clone(),
|
||||
edge_type: edge.name.clone(),
|
||||
direction,
|
||||
dst_type,
|
||||
min_hops: traversal.min_hops,
|
||||
max_hops: traversal.max_hops,
|
||||
dst_filters: vec![],
|
||||
});
|
||||
pipeline.push(IROp::Filter(IRFilter {
|
||||
left: IRExpr::PropAccess {
|
||||
variable: temp_var,
|
||||
property: "id".to_string(),
|
||||
},
|
||||
op: CompOp::Eq,
|
||||
right: IRExpr::PropAccess {
|
||||
variable: traversal.dst.clone(),
|
||||
property: "id".to_string(),
|
||||
},
|
||||
}));
|
||||
} else if !src_bound && dst_bound {
|
||||
// Reverse expand: dst is bound, src is not.
|
||||
let reverse_dir = match direction {
|
||||
Direction::Out => Direction::In,
|
||||
Direction::In => Direction::Out,
|
||||
};
|
||||
let src_type = match direction {
|
||||
Direction::Out => edge.from_type.clone(),
|
||||
Direction::In => edge.to_type.clone(),
|
||||
};
|
||||
let introduced_filters =
|
||||
deferred_filters.remove(&traversal.src).unwrap_or_default();
|
||||
pipeline.push(IROp::Expand {
|
||||
src_var: traversal.dst.clone(),
|
||||
dst_var: traversal.src.clone(),
|
||||
edge_type: edge.name.clone(),
|
||||
direction: reverse_dir,
|
||||
dst_type: src_type,
|
||||
min_hops: traversal.min_hops,
|
||||
max_hops: traversal.max_hops,
|
||||
dst_filters: introduced_filters,
|
||||
});
|
||||
if traversal.src != "_" {
|
||||
bound_vars.insert(traversal.src.clone());
|
||||
}
|
||||
} else {
|
||||
// Normal expand: src is bound, dst is not.
|
||||
let introduced_filters =
|
||||
deferred_filters.remove(&traversal.dst).unwrap_or_default();
|
||||
pipeline.push(IROp::Expand {
|
||||
src_var: traversal.src.clone(),
|
||||
dst_var: traversal.dst.clone(),
|
||||
edge_type: edge.name.clone(),
|
||||
direction,
|
||||
dst_type,
|
||||
min_hops: traversal.min_hops,
|
||||
max_hops: traversal.max_hops,
|
||||
dst_filters: introduced_filters,
|
||||
});
|
||||
if traversal.dst != "_" {
|
||||
bound_vars.insert(traversal.dst.clone());
|
||||
}
|
||||
}
|
||||
false // processed — remove from remaining
|
||||
});
|
||||
if remaining.len() == before {
|
||||
// No progress — remaining traversals reference no bound variables.
|
||||
// Fall through; the traversals will be skipped (this would be a
|
||||
// type-check error in practice).
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -917,4 +937,45 @@ query q() {
|
|||
if src_var == "p" && dst_var == "c"
|
||||
));
|
||||
}
|
||||
|
||||
/// Traversals declared in non-topological order are reordered automatically.
|
||||
#[test]
|
||||
fn test_lower_out_of_order_traversals() {
|
||||
let catalog = setup();
|
||||
let qf = parse_query(
|
||||
r#"
|
||||
query q() {
|
||||
match {
|
||||
$p: Person
|
||||
$f worksAt $c
|
||||
$p knows $f
|
||||
$f: Person
|
||||
$c: Company { name: "Acme" }
|
||||
}
|
||||
return { $c.name }
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let tc = typecheck_query(&catalog, &qf.queries[0]).unwrap();
|
||||
let ir = lower_query(&catalog, &qf.queries[0], &tc).unwrap();
|
||||
|
||||
// Even though "$f worksAt $c" is declared before "$p knows $f",
|
||||
// the iterative lowering processes "$p knows $f" first (because $p
|
||||
// is bound) and then "$f worksAt $c" (once $f is bound).
|
||||
assert_eq!(ir.pipeline.len(), 3);
|
||||
assert!(matches!(&ir.pipeline[0], IROp::NodeScan { variable, .. } if variable == "p"));
|
||||
// First expand: $p → $f (knows)
|
||||
assert!(matches!(
|
||||
&ir.pipeline[1],
|
||||
IROp::Expand { src_var, dst_var, .. }
|
||||
if src_var == "p" && dst_var == "f"
|
||||
));
|
||||
// Second expand: $f → $c (worksAt), with filter from $c binding
|
||||
assert!(matches!(
|
||||
&ir.pipeline[2],
|
||||
IROp::Expand { src_var, dst_var, dst_filters, .. }
|
||||
if src_var == "f" && dst_var == "c" && dst_filters.len() == 1
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue