mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-21 02:28:07 +02:00
* perf(engine): route Expand node hydration through the id BTREE via structured filter
hydrate_nodes built an `id IN (...)` SQL string applied via Scanner::filter,
which DataFusion evaluates with InListEval (O(N×M)) rather than using the id
BTREE scalar index — measured at 72× the indexed cost on a 100k-node hop
(MR-376). Build the id IN-list as a structured DataFusion Expr, AND it with
the pushable destination filters, and apply via Scanner::filter_expr (the same
path execute_node_scan already uses); Lance then compiles it to
scalar-index-search -> take.
Destination-filter pushability is now decided by ir_filter_to_expr (structured)
instead of ir_filter_to_sql, so list-contains (array_has) pushes down too.
Removes the now-dead string-filter helpers build_lance_filter, ir_filter_to_sql,
and ir_expr_to_sql; literal_to_sql stays (still used by the mutation delete path).
* feat(engine): add TableStore::scan_edges_by_endpoint for indexed neighbor lookup
Static helper returning edge rows that match a set of endpoint keys on src/dst,
projected to [key_col, opposite_col], via a structured `key_col IN (keys)`
filter_expr. Lance routes it through the persisted BTREE on the endpoint column
(index-search -> take), so cost scales with the frontier size rather than |E|.
Unused until execute_expand's indexed mode lands; isolated in its own commit so
the storage-layer primitive is reviewable on its own.
* feat(engine): add BTREE-indexed Expand traversal path
Split execute_expand into a dispatcher over execute_expand_csr (the existing
in-memory CSR BFS, unchanged) and a new execute_expand_indexed that serves each
hop by batching the frontier into one scan_edges_by_endpoint call against the
persisted src/dst BTREE (index-search -> take), then fans out per source row.
Both share expand_hydrate_and_align — the destination hydration + alignment +
hconcat + in-memory non-pushable filters — which now aligns by string id (a
HashMap) instead of a dense row-id vec, so one tail serves both modes.
Mode selection is OMNIGRAPH_TRAVERSAL_MODE for now (default csr); the
frontier-size auto policy and lazy CSR build follow. AntiJoin stays on CSR.
tests/traversal_indexed.rs (its own #[serial] binary, so env writes never race a
reader) asserts the indexed path matches CSR for one-hop, multi-hop, cross-type,
and no-match cases, and that a freshly-appended unindexed edge is still found
(partial index coverage — fast_search=false unindexed-fragment scan).
* feat(engine): frontier-size Expand dispatcher + lazy CSR build
Replace the env-only mode switch with an auto policy: Expand uses the
BTREE-indexed path when the source frontier is small and the hop count bounded
(OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024, OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6),
else the in-memory CSR. OMNIGRAPH_TRAVERSAL_MODE=indexed|csr still forces a mode.
Make the CSR index lazy: thread a GraphIndexHandle (memoizing OnceCell over a
Cached/Direct/None builder) through execute_query/execute_pipeline/
execute_rrf_query/execute_anti_join instead of a pre-built Option<&GraphIndex>.
A query served entirely by the indexed path with no AntiJoin never pays the
O(|E|) CSR build — the perf win of Tier 3. AntiJoin still realizes the index
(its negation uses CSR has_neighbors).
Net effect: selective traversals (the common case) skip the whole-graph CSR
build and resolve neighbors from the persisted, incrementally-maintained
src/dst BTREE. Existing traversal/aggregation/end_to_end/search suites now run
the indexed path by default and stay green.
Docs: constants.md (new env knobs), query-language.md (Expand dual path),
indexes.md (graph index is lazy + the indexed alternative).
* test(engine): bench indexed vs CSR selective traversal
Add a selective single-source knows{1,2} comparison to bench_expand: per growing
|E|, time the cold query in csr vs indexed mode (fresh db each, so CSR pays its
O(|E|) build) and assert both modes return identical rows — a guard against the
scalar-index physical_rows silent fallback dropping unindexed-fragment rows. The
existing dense hop1/2/3 latency bench is unchanged.
* feat(engine): surface silent scalar-index fallback in indexed traversal (C6)
Add TableStore::key_column_index_coverage — a metadata-only check (no IO) of
whether a `key_col IN (...)` scan will be served by the persisted BTREE or
silently fall back to a full filtered scan, mirroring Lance's own decision:
no BTREE on the column, or any fragment missing physical_rows (which disables
scalar indices for the whole scan, lance dataset/scanner.rs create_filter_plan).
execute_expand_indexed calls it once per traversal and tracing::warn!s on
Degraded, so the perf cliff is observable instead of hidden behind a bench oracle.
Detection-only: results are correct either way (the scan returns all rows). Closes
the "no silent failures" gap the traversal best-practice audit flagged as the top
deviation, and adds an IndexCoverage value a future cost-based planner can consume.
* perf(engine): dense-id BFS on the indexed traversal path (C3)
execute_expand_indexed ran its per-source BFS in string space
(Vec<HashSet<String>>, HashMap<String,Vec<String>>, ~4 String clones per neighbor
occurrence). Intern node ids to u32 once via a per-traversal TypeIndex (no
GraphIndex/CSR build — laziness preserved) and run visited/seen/frontier/
neighbor-map in dense u32 space, mirroring the CSR path; de-intern only for the
per-hop IN-list and the emitted dst ids handed to the hydrate+align tail.
Behavior-preserving — the traversal_indexed CSR-vs-indexed equivalence tests are
the guard (results are identical, the key type just changes String -> u32).
* refactor(engine): thread the opened edge dataset into indexed Expand
Hoist the edge-dataset open and the C6 index-coverage warning out of
execute_expand_indexed into execute_expand, threading the opened dataset in
as a parameter so it is opened exactly once. Extract the endpoint-column
mapping (endpoint_columns) and the coverage warning (warn_on_degraded_coverage)
as helpers.
Behavior-preserving: same dataset, same warning, same dispatch decision. This
only relocates the open so the upcoming cost-based chooser can consult index
coverage before dispatch without opening the dataset twice.
* feat(engine): cost-based Expand dispatch chooser (C5)
Replace the fixed frontier<=1024 && hops<=6 dispatch threshold with a pure,
IO-free cost model. choose_expand_mode compares the indexed path's
frontier-relative work (hops * frontier * fanout, or hops * |E| when BTREE
coverage is degraded) against the cost of building the whole-graph CSR
(BUILD_FACTOR * |E|), from cheap manifest row counts. Under good coverage this
reduces to a selectivity ratio independent of |E|, preserving the flat-in-|E|
indexed win for selective traversals while routing dense / deep / high-fanout
or degraded-and-expensive traversals to CSR.
execute_expand decides cardinality-first and only opens the edge dataset to
confirm coverage when it leans indexed (no open on a clearly-CSR traversal).
The two env knobs become hard ceilings layered on the model; the
OMNIGRAPH_TRAVERSAL_MODE override still forces a path; the chosen mode is
traced. Results are unchanged across modes — only the path differs.
Adds inline crossover unit tests and extends the traversal_indexed both_modes
harness with an auto pass asserting the chooser is result-preserving across
every traversal shape. Documents the new flag semantics in
docs/user/{constants,query-language}.md.
* test(engine): pin Lance scalar-index coverage + system-column/deletion-metadata surface
Add three Lance surface guards de-risking a future persisted-adjacency cache:
- a compile-only guard pinning the fragment physical_rows + index-detail
surface that key_column_index_coverage mirrors (the C6 fallback);
- a runtime probe confirming a scalar BTREE on the system column
_row_last_updated_at_version is not buildable via the normal create-index
path (the column is not in the user schema), so a version-column range delta
is not viable as drafted;
- a runtime probe confirming per-fragment deletion metadata
(deletion_file.num_deleted_rows) is available as cheap O(fragments) metadata,
the primitive a fragment-coverage delete model would rely on.
The probes turn the two largest substrate assumptions into green/red CI facts
before any cache work begins.
* test(engine): regression for cross-type id-collision in indexed traversal
A node id is unique only within a type, so a Person and a Company can share an
id string. A variable-length traversal over a cross-type edge (WorksAt) must
structurally stop after one hop. This test builds a graph where 'shared' is both
a Person and a Company id and asserts worksAt{1,2} returns only the one-hop
company. It fails today: the indexed path's single string interner de-interns
the hop-1 Company id back to the colliding Person id and runs a hop-2 scan that
matches that Person's edges, emitting a spurious second-hop company (indexed
["other","shared"] vs csr ["shared"]).
* fix(engine): structurally cap cross-type Expand at one hop
A cross-type edge cannot chain (e.g. a Company is not a WorksAt source), so a
variable-length traversal over one is structurally single-hop. Both traversal
paths now enforce this by capping max hops at 1 when from_type != to_type,
instead of relying on the hop-2 scan returning empty.
That reliance was a correctness hole on the indexed path: it interns every
endpoint string into one dense id space, so a cross-type id-string collision (a
Person and a Company sharing an id) let hop 2 de-intern a destination id back to
the colliding source-type id and match its edges, emitting rows the CSR path
never produces. With the cap the cross-type second-hop scan never runs, so the
shared interner can no longer alias across types. Turns the regression test
green (indexed == csr == ["shared"]).
* perf(engine): set-oriented filtered anti-join, remove per-row dispatch
execute_anti_join's filtered slow path sliced the outer batch to one row at a
time and re-ran the inner pipeline per row, so each 1-row inner Expand dispatched
to the indexed path — one Lance scan per outer row, while the CSR realized up
front sat unused.
Replace it with a set-oriented anti-semi-join: tag each outer row with a
synthetic index column, run the inner pipeline once over the whole frontier (the
tag survives Expand's hconcat and Filter's row-drop), then exclude outer rows
whose tag survived. The inner Expand now runs as a single set-at-a-time traversal
over the full frontier; config is read once per operator, not per row (the env
nit is mooted). A produced-but-untagged inner batch fails loudly rather than
silently keeping every row. Results are unchanged (the predicated-negation tests
exercise the path over a multi-row outer with dst-filters).
* test(engine): drop flaky wall-clock budget from the merge truth table
The 30s wall-clock assertion in merge_pair_truth_table flakes under parallel
test load: it tripped at ~31s in the full --test-threads=4 gate while passing at
~20s in isolation. A fixed time budget in a correctness test depends on machine
and parallelism, not correctness; elapsed is still logged for visibility, and a
real merge-perf regression belongs in a bench. The cell-count correctness
assertions (81 / 36 / 45) are unchanged.
* fix(engine): total deterministic ORDER via entity-key tie-break + NULL contract
apply_ordering used an unstable lexsort with no tie-break, so rows with equal
user-sort keys came out in a run-dependent order (the input order depends on
scan parallelism / upstream hashing) — making ORDER ... LIMIT non-deterministic,
a latent deny-list violation (no nondeterministic result ordering).
Append the bound entities' key columns (<var>.id, unique per row) in canonical
name-sorted order as ascending tie-breaks, giving a total, reproducible order
(and a deterministic top-N when ties straddle the LIMIT cutoff). NULL placement
(nulls_first = !descending) is unchanged and now documented as the contract.
New tests/ordering.rs locks descending, multi-key precedence, the deterministic
key tie-break (data loaded in a different order than the expected output, so it
proves the tie sorts by key not by load order), and NULL placement under ASC/DESC.
docs/user/query-language.md documents the total-order + NULL contract.
* test(engine): property-based query-correctness invariants over generated graphs
Adds a proptest harness (new dev-dep) that generates small graphs whose Person
and Company keys are drawn from a shared 5-key alphabet, so cross-type id
collisions, cycles, and self-loops arise by search rather than from one
hand-built fixture. Three invariants:
- prop_expand_indexed_eq_csr: csr == indexed == auto over knows{1,3} (same-type,
cycles) and worksAt{1,2} (cross-type, collision-prone) from every start.
- prop_results_subset_of_existing_nodes: no phantom rows (catches over-emission
even if both modes are wrong identically).
- prop_antijoin_partitions_persons: not{worksAt} and its complement are disjoint
and cover all persons.
Verified the guard bites: neutering the cross-type hop cap makes
prop_expand_indexed_eq_csr fail and proptest shrinks it to persons["c","e"] /
companies["b","c"] — the cross-type collision class the hand-built fixture
only sampled once. Tests are sync + #[serial] (per-case runtime; the mode test
writes OMNIGRAPH_TRAVERSAL_MODE).
* test(engine): cover cycle/self-loop termination + nested anti-join (C5 edge cases)
- variable_hops_terminate_and_dedup_on_cycle: a 3-cycle a->b->c->a traversed with
knows{1,5} (ceiling above the cycle length) terminates and emits each node once
(the c->a back-edge hits the seeded source); both_modes confirms indexed == csr.
Uses a bounded range deliberately — unbounded {1,} is a typecheck error, not a
runtime path.
- variable_hops_handle_self_loop: a->a self-loop does not loop forever and does
not re-emit the seeded source.
- nested_anti_join_double_negation: not { worksAt; not { name = Acme } } recurses
through execute_pipeline, yielding [Alice,Charlie,Diana] (people with no non-Acme
employer) — distinct from plain unemployed [Charlie,Diana].
* test(engine): execution goldens for typed-literal filters (C4 gap #4)
New literal_filters.rs covers filtering by F64/F32/Bool/Date/DateTime LITERALS
across both arms: standalone comparisons ($m.score > 1.5, $m.ratio <= 0.25,
$m.active = true, $m.born >= date(...), $m.seen < datetime(...)) exercise the
in-memory comparison path, and inline bindings (Metric { active: true },
Metric { score: 3.0 }) exercise Lance filter_expr pushdown. Seeds partition each
predicate so a dropped/miscast filter returns all rows. (Param-bound scalars and
list-column contains are covered elsewhere.)
* test(engine): full rank-order goldens for nearest + bm25 (gap #2)
Existing search tests stopped at top-1 (nearest) or non-empty (bm25), so a
regression corrupting ranks 2..k or reversing the sort direction passed CI
silently. Pin the FULL ordered slug list: nearest([0.1,0.2,0.3,0.4]) ->
[ml-intro, nlp-guide, rl-intro] (ml-intro exact at dist 0, rest by ascending
L2); bm25(Learning) -> [rl-intro, ml-intro, dl-basics] (descending score).
nearest/bm25 skip apply_ordering (is_search_ordered) and return Lance native
order, so result_slugs row order == rank order; values resolved by running and
confirmed stable across runs.
* test(engine): search fuzzy/match_text characterization + RRF non-default pairings
- match_text_matches_exact_set_excludes_unrelated: match_text(body,'neural') ==
[dl-basics] exactly (not just contains).
- fuzzy_does_not_match_under_default_tokenizer: characterizes that fuzzy() is
inert with the default tokenizer here (search/match_text work, fuzzy returns
nothing); turns red — to be promoted to a real golden — if fuzzy starts matching.
- rrf_fuses_two_fts_fields / rrf_fuses_two_vector_queries: RRF fuses arms other
than the default nearest+bm25 (bm25 title+body; two vector queries), proving
primary_var resolves and fusion runs. New fixtures/search.gq queries +
two_vector_params helper. Orders resolved by running, confirmed stable.
* test(engine): anti-join fast-vs-slow path equivalence harness
anti_join_fast_and_slow_paths_agree: the CSR has_neighbors fast path
(not { $p worksAt $_ }) and the set-oriented inner-pipeline replay (same
negation forced slow by an always-true $c.name != "" dst filter) must produce
the same result ([Charlie, Diana]). Closes the second real engine fork explicitly.
* test(engine): regression for nested slow-path anti-join tag collision
A nested not { ... not { ... } } where both levels hit the set-oriented slow
path collides on the fixed __antijoin_outer_row correlation column: the inner
call appends a duplicate, and column_by_name reads the OUTER tag. Fan-out (p1
works at two companies) makes inner row indices diverge from outer tags, so the
bug returns the wrong person set. Fails on current code (left ["p2","p4"] vs
right ["p3","p4"]).
* fix(engine): collision-free anti-join correlation tag for nested negation
The set-oriented anti-join tagged the outer batch with a fixed column name and
read it back by name. Under a nested slow-path anti-join the enclosing tag rides
through the inner pipeline, so the inner call produced a duplicate field; Arrow
permits duplicate names and column_by_name returns the first, so the inner
negation mis-correlated against the outer row indices.
Choose a tag name not already present in the batch (suffix-incremented), so each
nesting level reads its own correlation column. Turns the fan-out regression
green; the existing nested/fast-vs-slow/proptest anti-join invariants still pass.
* fix(engine): cap cross-type hops in the Expand cost model
gather_cost_inputs fed the requested max_hops into choose_expand_mode even though
execute_expand_indexed runs at most one hop for a cross-type edge. So a cross-type
variable-length expand (e.g. worksAt{1,5}) had its indexed cost scaled by 5 while
only one hop runs, skewing the chooser toward CSR (an unnecessary whole-graph
build) near the crossover. Results were unaffected (modes are equivalent); this
is a plan-accuracy fix.
Add cost_effective_hops(requested, same_type) — caps to 1 for cross-type — and
apply it in gather_cost_inputs so the estimate matches what executes. Unit test
covers the cap and the crossover consequence (capped 1 hop stays indexed where
the requested 5 would have flipped to CSR).
* perf(engine): realize anti-join CSR lazily + reuse a warm CSR in the chooser
Two CSR build/reuse fixes flagged on the set-oriented anti-join work (results
unchanged — plan/perf accuracy):
- execute_anti_join called graph_index.get() (the O(|E|) whole-graph CSR build)
unconditionally, but only the bulk fast path consumes it; a filtered/nested
slow-path anti-join's inner Expand picks its own access path. Gate the build on
a pure shape predicate (bulk_anti_join_applies) so a selective anti-join over a
large graph no longer pays a build it won't use.
- gather_cost_inputs hardcoded csr_cached=false, so once an earlier op realized
the CSR, later Expands still cost it as a cold build and could pick per-hop
indexed scans over reusing the warm in-memory CSR. Add GraphIndexHandle::
is_built() and thread it through so the chooser reuses a materialized CSR.
Anti-join, cross-type, proptest-equivalence, and chooser unit tests stay green.
* test(engine): RAII traversal-mode guard in proptest equivalence
prop_expand_indexed_eq_csr set/cleared OMNIGRAPH_TRAVERSAL_MODE manually; a panic
between set and clear (e.g. a query unwrap on a generated case) would leak the
forced mode into proptest's shrink/subsequent cases and mask the divergence under
test. Replace with a ModeGuard that clears on drop (including on unwind), scoping
the forced mode to a single query.
* test(engine): regression for multi-hop anti-join hop bounds
The bulk anti-join fast path answers via has_neighbors (one-hop existence), so
not { $p knows{2,2} $x } wrongly drops a node with a 1-hop neighbor but no
2-hop path. On a->b (sink) and c->d->e, only c has a 2-hop path; the query should
keep [a,b,d,e]. Fails on current code (left ["b","e"] — only the sinks).
* fix(engine): restrict anti-join bulk fast path to one-hop expands
bulk_anti_join_applies accepted any single Expand, but try_bulk_anti_join_mask
decides via the CSR has_neighbors one-hop existence check — wrong for multi-hop
negations. Require min_hops==1 && max_hops==1 in the predicate; anything else
falls to the slow path, whose inner Expand runs the real bounded traversal.
Turns the multi-hop regression green; one-hop anti-joins unchanged.
* fix(engine): IndexCoverage reports Degraded for uncovered fragments
key_column_index_coverage checked BTREE-exists + physical_rows but not that the
index actually covers the current fragments. Since edge-index creation is skipped
once a BTREE exists, fragments appended later stay unindexed while coverage still
reported Indexed — so the cost chooser priced a partly-full scan as fully indexed.
Compare the BTREE's fragment_bitmap (public on lance_table IndexMetadata) against
the dataset's current fragment ids; report Degraded when any are uncovered. A None
bitmap means Lance can't report coverage — don't over-degrade. Results are
unaffected (the scan returns unindexed-fragment rows either way); this corrects
the cost signal.
Test: a freshly-loaded edge BTREE is Indexed; after appending an edge the new
fragment is uncovered → Degraded. Surface guard pins IndexMetadata.fragment_bitmap.
* docs: clarify the Expand frontier ceiling bounds the initial dispatch frontier
The cap is applied at dispatch on the initial frontier; per-hop fan-out
(union_dense) is not hard-capped. Correct the constants.md and query-language.md
claims: the ceilings bound the initial-dispatch frontier/hops, the cost model
estimates total indexed work as ~hops*frontier*fanout (pricing dense fan-out
toward CSR), and per-hop work is not a hard bound. Drops the overstated 'hard
caps bound indexed work' / 'cost ∝ frontier' wording.
932 lines
27 KiB
Rust
932 lines
27 KiB
Rust
mod helpers;
|
|
|
|
use arrow_array::{Array, Int32Array, StringArray};
|
|
|
|
use omnigraph::db::Omnigraph;
|
|
use omnigraph::loader::{LoadMode, load_jsonl};
|
|
use omnigraph_compiler::ir::ParamMap;
|
|
|
|
use helpers::*;
|
|
|
|
// ─── Anti-join slow path (predicated negation) ──────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn anti_join_predicated_negation() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// "People who do NOT work at Acme"
|
|
// Inner pipeline: Expand(worksAt) + Filter(name="Acme") → 2 ops → slow path
|
|
let queries = r#"
|
|
query not_at_acme() {
|
|
match {
|
|
$p: Person
|
|
not {
|
|
$p worksAt $c
|
|
$c.name = "Acme"
|
|
}
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
// Test data: Alice→Acme, Bob→Globex. Charlie and Diana have no WorksAt.
|
|
// Expected: everyone except Alice = {Bob, Charlie, Diana}
|
|
let result = query_main(&mut db, queries, "not_at_acme", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
assert_eq!(names_vec, vec!["Bob", "Charlie", "Diana"]);
|
|
}
|
|
|
|
// Nested anti-join (double negation): proves `not { … not { … } }` recurses
|
|
// through execute_pipeline. "People who do NOT work at any NON-Acme company":
|
|
// inner `not { $c.name = "Acme" }` keeps the non-Acme employers, the outer `not`
|
|
// removes anyone who has one. Alice (Acme only), Charlie & Diana (no employer)
|
|
// remain — distinct from plain unemployed {Charlie, Diana}.
|
|
#[tokio::test]
|
|
async fn nested_anti_join_double_negation() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query no_nonacme_employer() {
|
|
match {
|
|
$p: Person
|
|
not {
|
|
$p worksAt $c
|
|
not {
|
|
$c.name = "Acme"
|
|
}
|
|
}
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "no_nonacme_employer", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
assert_eq!(names_vec, vec!["Alice", "Charlie", "Diana"]);
|
|
}
|
|
|
|
// The anti-join has two execution forks: the CSR `has_neighbors` fast path
|
|
// (bare single-op Expand inner) and the set-oriented inner-pipeline replay (when
|
|
// dst_filters force a multi-op inner). They must agree. `not { $p worksAt $_ }`
|
|
// takes the fast path; the same negation with an always-true dst filter
|
|
// (`$c.name != ""`) is semantically identical but forces the slow path.
|
|
#[tokio::test]
|
|
async fn anti_join_fast_and_slow_paths_agree() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query fast() {
|
|
match {
|
|
$p: Person
|
|
not { $p worksAt $_ }
|
|
}
|
|
return { $p.name }
|
|
}
|
|
query slow() {
|
|
match {
|
|
$p: Person
|
|
not {
|
|
$p worksAt $c
|
|
$c.name != ""
|
|
}
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
let names = |result: omnigraph_compiler::result::QueryResult| {
|
|
let batch = result.concat_batches().unwrap();
|
|
let col = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut v: Vec<String> = (0..col.len()).map(|i| col.value(i).to_string()).collect();
|
|
v.sort();
|
|
v
|
|
};
|
|
|
|
let fast = names(query_main(&mut db, queries, "fast", &ParamMap::new()).await.unwrap());
|
|
let slow = names(query_main(&mut db, queries, "slow", &ParamMap::new()).await.unwrap());
|
|
|
|
assert_eq!(fast, slow, "anti-join fast and slow paths must agree");
|
|
// Alice->Acme, Bob->Globex employed; Charlie & Diana have no employer.
|
|
assert_eq!(fast, vec!["Charlie", "Diana"]);
|
|
}
|
|
|
|
// Regression: nested slow-path anti-joins must not collide on the synthetic
|
|
// correlation tag. The outer anti-join tags rows with a correlation column that
|
|
// rides through its inner pipeline; when the inner pipeline contains ANOTHER
|
|
// slow-path anti-join, a fixed tag name would duplicate, and reading it by name
|
|
// returns the OUTER tag — mis-correlating the inner negation. Fan-out (p1 works
|
|
// at two companies) makes the inner row indices diverge from the outer tags, so
|
|
// the bug produces a different person set than the correct one.
|
|
#[tokio::test]
|
|
async fn nested_anti_join_with_fanout_correlates_correctly() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
// p1 -> {Acme, Globex} (fan-out), p2 -> Globex, p3 -> Acme, p4 -> (none).
|
|
let data = r#"{"type":"Person","data":{"name":"p1"}}
|
|
{"type":"Person","data":{"name":"p2"}}
|
|
{"type":"Person","data":{"name":"p3"}}
|
|
{"type":"Person","data":{"name":"p4"}}
|
|
{"type":"Company","data":{"name":"Acme"}}
|
|
{"type":"Company","data":{"name":"Globex"}}
|
|
{"edge":"WorksAt","from":"p1","to":"Acme"}
|
|
{"edge":"WorksAt","from":"p1","to":"Globex"}
|
|
{"edge":"WorksAt","from":"p2","to":"Globex"}
|
|
{"edge":"WorksAt","from":"p3","to":"Acme"}"#;
|
|
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
|
|
|
|
let queries = r#"
|
|
query no_nonacme_employer() {
|
|
match {
|
|
$p: Person
|
|
not {
|
|
$p worksAt $c
|
|
not {
|
|
$c.name = "Acme"
|
|
}
|
|
}
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "no_nonacme_employer", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
// p1 & p2 have a non-Acme employer (Globex) -> excluded; p3 (Acme only) and
|
|
// p4 (no employer) remain.
|
|
assert_eq!(names_vec, vec!["p3", "p4"]);
|
|
}
|
|
|
|
// Regression: a multi-hop anti-join must not take the bulk fast path. The fast
|
|
// path answers via `has_neighbors` (ONE-hop existence), so `not { $p knows{2,2}
|
|
// $x }` would wrongly drop a node that has a 1-hop neighbor but no 2-hop path.
|
|
// Graph: a->b (b is a sink, so a has no 2-hop path), c->d->e (c has a 2-hop
|
|
// path). Only c has a 2-hop knows path, so only c is removed.
|
|
#[tokio::test]
|
|
async fn anti_join_respects_multi_hop_bounds() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let data = r#"{"type":"Person","data":{"name":"a"}}
|
|
{"type":"Person","data":{"name":"b"}}
|
|
{"type":"Person","data":{"name":"c"}}
|
|
{"type":"Person","data":{"name":"d"}}
|
|
{"type":"Person","data":{"name":"e"}}
|
|
{"edge":"Knows","from":"a","to":"b"}
|
|
{"edge":"Knows","from":"c","to":"d"}
|
|
{"edge":"Knows","from":"d","to":"e"}"#;
|
|
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
|
|
|
|
let queries = r#"
|
|
query no_two_hop() {
|
|
match {
|
|
$p: Person
|
|
not { $p knows{2,2} $x }
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "no_two_hop", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
// Only c has a 2-hop knows path → removed; everyone else (incl. a, which has
|
|
// a 1-hop neighbor but no 2-hop path) is kept.
|
|
assert_eq!(names_vec, vec!["a", "b", "d", "e"]);
|
|
}
|
|
|
|
// ─── Variable-length hops ───────────────────────────────────────────────────
|
|
|
|
const CHAIN_SCHEMA: &str = r#"
|
|
node Person { name: String @key }
|
|
edge Knows: Person -> Person
|
|
"#;
|
|
|
|
const CHAIN_DATA: &str = r#"{"type": "Person", "data": {"name": "A"}}
|
|
{"type": "Person", "data": {"name": "B"}}
|
|
{"type": "Person", "data": {"name": "C"}}
|
|
{"type": "Person", "data": {"name": "D"}}
|
|
{"edge": "Knows", "from": "A", "to": "B"}
|
|
{"edge": "Knows", "from": "B", "to": "C"}
|
|
{"edge": "Knows", "from": "C", "to": "D"}
|
|
"#;
|
|
|
|
async fn init_chain(dir: &tempfile::TempDir) -> Omnigraph {
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, CHAIN_SCHEMA).await.unwrap();
|
|
load_jsonl(&mut db, CHAIN_DATA, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
db
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn variable_hops_1_to_3() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_chain(&dir).await;
|
|
|
|
let queries = r#"
|
|
query reachable($name: String) {
|
|
match {
|
|
$p: Person { name: $name }
|
|
$p knows{1,3} $f
|
|
}
|
|
return { $f.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "reachable", ¶ms(&[("$name", "A")]))
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
// A→B (1 hop), A→B→C (2 hops), A→B→C→D (3 hops)
|
|
assert_eq!(names_vec, vec!["B", "C", "D"]);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn variable_hops_2_to_3() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_chain(&dir).await;
|
|
|
|
let queries = r#"
|
|
query far_reachable($name: String) {
|
|
match {
|
|
$p: Person { name: $name }
|
|
$p knows{2,3} $f
|
|
}
|
|
return { $f.name }
|
|
}
|
|
"#;
|
|
let result = query_main(
|
|
&mut db,
|
|
queries,
|
|
"far_reachable",
|
|
¶ms(&[("$name", "A")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
// Skip 1-hop (B), keep 2-hop (C) and 3-hop (D)
|
|
assert_eq!(names_vec, vec!["C", "D"]);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn variable_hops_exact_2() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_chain(&dir).await;
|
|
|
|
let queries = r#"
|
|
query exactly_2($name: String) {
|
|
match {
|
|
$p: Person { name: $name }
|
|
$p knows{2,2} $f
|
|
}
|
|
return { $f.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "exactly_2", ¶ms(&[("$name", "A")]))
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
// Exactly 2 hops from A: only C (A→B→C)
|
|
assert_eq!(names_vec, vec!["C"]);
|
|
}
|
|
|
|
// ─── Ordering ASC ───────────────────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn ordering_ascending() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query by_age_asc() {
|
|
match { $p: Person }
|
|
return { $p.name, $p.age }
|
|
order { $p.age asc }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "by_age_asc", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = &result.batches()[0];
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let ages = batch
|
|
.column(1)
|
|
.as_any()
|
|
.downcast_ref::<Int32Array>()
|
|
.unwrap();
|
|
|
|
// Bob(25), Diana(28), Alice(30), Charlie(35) — ascending by age
|
|
assert_eq!(batch.num_rows(), 4);
|
|
assert_eq!(ages.value(0), 25);
|
|
assert_eq!(ages.value(1), 28);
|
|
assert_eq!(ages.value(2), 30);
|
|
assert_eq!(ages.value(3), 35);
|
|
|
|
assert_eq!(names.value(0), "Bob");
|
|
assert_eq!(names.value(3), "Charlie");
|
|
}
|
|
|
|
// ─── Empty graph traversal ──────────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn traversal_no_edges_returns_empty() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let uri = dir.path().to_str().unwrap();
|
|
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
|
|
|
// Load only nodes, no edges
|
|
let data = r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}
|
|
{"type": "Person", "data": {"name": "Bob", "age": 25}}
|
|
{"type": "Company", "data": {"name": "Acme"}}"#;
|
|
load_jsonl(&mut db, data, LoadMode::Overwrite)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Traversal should return empty, not crash
|
|
let result = query_main(
|
|
&mut db,
|
|
TEST_QUERIES,
|
|
"friends_of",
|
|
¶ms(&[("$name", "Alice")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(result.num_rows(), 0);
|
|
|
|
// Anti-join: everyone is "unemployed" since no WorksAt edges exist
|
|
let result = query_main(&mut db, TEST_QUERIES, "unemployed", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(names.len(), 2); // Alice and Bob
|
|
}
|
|
|
|
// ─── Filter comparison operators ─────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn filter_less_than() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query young($age: I32) {
|
|
match {
|
|
$p: Person
|
|
$p.age < $age
|
|
}
|
|
return { $p.name, $p.age }
|
|
order { $p.age asc }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "young", &int_params(&[("$age", 28)]))
|
|
.await
|
|
.unwrap();
|
|
|
|
// Only Bob (25) is < 28
|
|
assert_eq!(result.num_rows(), 1);
|
|
let batch = &result.batches()[0];
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(names.value(0), "Bob");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn filter_greater_equal() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query at_least_30() {
|
|
match {
|
|
$p: Person
|
|
$p.age >= 30
|
|
}
|
|
return { $p.name }
|
|
order { $p.age asc }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "at_least_30", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
// Alice (30) and Charlie (35)
|
|
assert_eq!(result.num_rows(), 2);
|
|
let batch = &result.batches()[0];
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(names.value(0), "Alice");
|
|
assert_eq!(names.value(1), "Charlie");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn filter_less_equal() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query at_most_28() {
|
|
match {
|
|
$p: Person
|
|
$p.age <= 28
|
|
}
|
|
return { $p.name }
|
|
order { $p.age asc }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "at_most_28", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
// Bob (25) and Diana (28)
|
|
assert_eq!(result.num_rows(), 2);
|
|
let batch = &result.batches()[0];
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(names.value(0), "Bob");
|
|
assert_eq!(names.value(1), "Diana");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn filter_not_equal() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query not_alice() {
|
|
match {
|
|
$p: Person
|
|
$p.name != "Alice"
|
|
}
|
|
return { $p.name }
|
|
order { $p.name asc }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "not_alice", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
// Bob, Charlie, Diana
|
|
assert_eq!(result.num_rows(), 3);
|
|
let batch = &result.batches()[0];
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut name_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
name_vec.sort();
|
|
assert_eq!(name_vec, vec!["Bob", "Charlie", "Diana"]);
|
|
}
|
|
|
|
// ─── Error paths ────────────────────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn insert_missing_required_property_fails() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// Insert Person with no name — name is @key, so this should fail
|
|
let queries = r#"
|
|
query insert_no_name($age: I32) {
|
|
insert Person { age: $age }
|
|
}
|
|
"#;
|
|
let result = mutate_main(
|
|
&mut db,
|
|
queries,
|
|
"insert_no_name",
|
|
&int_params(&[("$age", 25)]),
|
|
)
|
|
.await;
|
|
|
|
assert!(result.is_err(), "insert without @key property should fail");
|
|
}
|
|
|
|
// ─── Join alignment: traversal + destination binding ───────────────────────
|
|
|
|
/// Traversal with destination binding filter constrains the source.
|
|
/// Regression: previously over-returned because the lowering created a
|
|
/// cross-join followed by cycle-closing instead of Expand + post-filter.
|
|
#[tokio::test]
|
|
async fn traversal_destination_binding_constrains_source() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// Only Alice works at Acme. The binding on $c must constrain $p.
|
|
let queries = r#"
|
|
query at_acme() {
|
|
match {
|
|
$p: Person
|
|
$p worksAt $c
|
|
$c: Company { name: "Acme" }
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "at_acme", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(names.len(), 1);
|
|
assert_eq!(names.value(0), "Alice");
|
|
}
|
|
|
|
/// Multi-variable projection: columns from source and destination must be
|
|
/// row-aligned. Previously this could fail with "all columns must have
|
|
/// the same length" when variables had different cardinalities.
|
|
#[tokio::test]
|
|
async fn traversal_multi_variable_projection_aligned() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query employee_companies() {
|
|
match {
|
|
$p: Person
|
|
$p worksAt $c
|
|
$c: Company
|
|
}
|
|
return { $p.name, $c.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "employee_companies", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
// Alice→Acme, Bob→Globex
|
|
assert_eq!(batch.num_rows(), 2);
|
|
let person_names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let company_names = batch
|
|
.column(1)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
|
|
let mut pairs: Vec<(&str, &str)> = (0..batch.num_rows())
|
|
.map(|i| (person_names.value(i), company_names.value(i)))
|
|
.collect();
|
|
pairs.sort();
|
|
assert_eq!(pairs, vec![("Alice", "Acme"), ("Bob", "Globex")]);
|
|
}
|
|
|
|
/// Multi-hop projection: all three variables must be row-aligned.
|
|
#[tokio::test]
|
|
async fn multi_hop_projection_aligned() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// Alice knows Bob, Bob knows Diana.
|
|
// Alice→Bob→Diana is the only 2-hop path.
|
|
let queries = r#"
|
|
query fof_chain($name: String) {
|
|
match {
|
|
$p: Person { name: $name }
|
|
$p knows $mid
|
|
$mid knows $fof
|
|
}
|
|
return { $p.name, $mid.name, $fof.name }
|
|
}
|
|
"#;
|
|
let result = query_main(
|
|
&mut db,
|
|
queries,
|
|
"fof_chain",
|
|
¶ms(&[("$name", "Alice")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
assert_eq!(batch.num_rows(), 1);
|
|
let col0 = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let col1 = batch
|
|
.column(1)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let col2 = batch
|
|
.column(2)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(col0.value(0), "Alice");
|
|
assert_eq!(col1.value(0), "Bob");
|
|
assert_eq!(col2.value(0), "Diana");
|
|
}
|
|
|
|
/// Multi-hop with destination binding filters at each hop.
|
|
#[tokio::test]
|
|
async fn multi_hop_with_intermediate_binding_filters() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
// Alice knows Bob and Charlie.
|
|
// Bob knows Diana. Charlie knows nobody.
|
|
// Filter $mid to only "Bob" → only Alice→Bob→Diana survives.
|
|
let queries = r#"
|
|
query fof_via($name: String, $mid_name: String) {
|
|
match {
|
|
$p: Person { name: $name }
|
|
$p knows $mid
|
|
$mid: Person { name: $mid_name }
|
|
$mid knows $fof
|
|
}
|
|
return { $fof.name }
|
|
}
|
|
"#;
|
|
let result = query_main(
|
|
&mut db,
|
|
queries,
|
|
"fof_via",
|
|
¶ms(&[("$name", "Alice"), ("$mid_name", "Bob")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(names.len(), 1);
|
|
assert_eq!(names.value(0), "Diana");
|
|
}
|
|
|
|
/// Destination binding with filter + multi-variable return: the classic
|
|
/// "join across a traversal" scenario that triggers the bug.
|
|
#[tokio::test]
|
|
async fn traversal_destination_filter_with_multi_return() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query at_acme_named() {
|
|
match {
|
|
$p: Person
|
|
$p worksAt $c
|
|
$c: Company { name: "Acme" }
|
|
}
|
|
return { $p.name, $c.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "at_acme_named", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
assert_eq!(batch.num_rows(), 1);
|
|
let person = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let company = batch
|
|
.column(1)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(person.value(0), "Alice");
|
|
assert_eq!(company.value(0), "Acme");
|
|
}
|
|
|
|
/// Parameterized destination filter exercises param resolution through the
|
|
/// Lance SQL pushdown path (params are resolved to literals in ir_expr_to_sql).
|
|
#[tokio::test]
|
|
async fn traversal_destination_filter_pushdown_with_param() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query at_company($company: String) {
|
|
match {
|
|
$p: Person
|
|
$p worksAt $c
|
|
$c: Company { name: $company }
|
|
}
|
|
return { $p.name, $c.name }
|
|
}
|
|
"#;
|
|
let result = query_main(
|
|
&mut db,
|
|
queries,
|
|
"at_company",
|
|
¶ms(&[("$company", "Globex")]),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
assert_eq!(batch.num_rows(), 1);
|
|
let person = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let company = batch
|
|
.column(1)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
assert_eq!(person.value(0), "Bob");
|
|
assert_eq!(company.value(0), "Globex");
|
|
}
|
|
|
|
/// Fan-out: one source expanded to two different destination types.
|
|
/// Each (friend, company) pair should be a cross-product per source row.
|
|
#[tokio::test]
|
|
async fn fan_out_two_destinations() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query fan_out($name: String) {
|
|
match {
|
|
$p: Person { name: $name }
|
|
$p knows $f
|
|
$p worksAt $c
|
|
}
|
|
return { $f.name, $c.name }
|
|
}
|
|
"#;
|
|
// Alice knows Bob and Charlie, works at Acme.
|
|
// Each friend paired with her company → 2 rows.
|
|
let result = query_main(&mut db, queries, "fan_out", ¶ms(&[("$name", "Alice")]))
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
assert_eq!(batch.num_rows(), 2);
|
|
let friends = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let companies = batch
|
|
.column(1)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
|
|
let mut pairs: Vec<(&str, &str)> = (0..batch.num_rows())
|
|
.map(|i| (friends.value(i), companies.value(i)))
|
|
.collect();
|
|
pairs.sort();
|
|
assert_eq!(pairs, vec![("Bob", "Acme"), ("Charlie", "Acme")]);
|
|
}
|
|
|
|
/// Deferred destination filter that matches nothing → empty result.
|
|
#[tokio::test]
|
|
async fn traversal_destination_filter_no_match() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query at_phantom() {
|
|
match {
|
|
$p: Person
|
|
$p worksAt $c
|
|
$c: Company { name: "NonExistent" }
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
let result = query_main(&mut db, queries, "at_phantom", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(result.num_rows(), 0);
|
|
}
|
|
|
|
/// Negation with inner destination binding filter.
|
|
/// "People who do NOT work at Acme" — uses binding syntax inside negation.
|
|
#[tokio::test]
|
|
async fn negation_with_inner_destination_binding() {
|
|
let dir = tempfile::tempdir().unwrap();
|
|
let mut db = init_and_load(&dir).await;
|
|
|
|
let queries = r#"
|
|
query not_at_acme_binding() {
|
|
match {
|
|
$p: Person
|
|
not {
|
|
$p worksAt $c
|
|
$c: Company { name: "Acme" }
|
|
}
|
|
}
|
|
return { $p.name }
|
|
}
|
|
"#;
|
|
// Alice→Acme. Everyone else should be returned.
|
|
let result = query_main(&mut db, queries, "not_at_acme_binding", &ParamMap::new())
|
|
.await
|
|
.unwrap();
|
|
|
|
let batch = result.concat_batches().unwrap();
|
|
let names = batch
|
|
.column(0)
|
|
.as_any()
|
|
.downcast_ref::<StringArray>()
|
|
.unwrap();
|
|
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
|
|
names_vec.sort();
|
|
assert_eq!(names_vec, vec!["Bob", "Charlie", "Diana"]);
|
|
}
|