feat(engine): indexed graph traversal (#149)

* perf(engine): route Expand node hydration through the id BTREE via structured filter

hydrate_nodes built an `id IN (...)` SQL string applied via Scanner::filter,
which DataFusion evaluates with InListEval (O(N×M)) rather than using the id
BTREE scalar index — measured at 72× the indexed cost on a 100k-node hop
(MR-376). Build the id IN-list as a structured DataFusion Expr, AND it with
the pushable destination filters, and apply via Scanner::filter_expr (the same
path execute_node_scan already uses); Lance then compiles it to
scalar-index-search -> take.

Destination-filter pushability is now decided by ir_filter_to_expr (structured)
instead of ir_filter_to_sql, so list-contains (array_has) pushes down too.
Removes the now-dead string-filter helpers build_lance_filter, ir_filter_to_sql,
and ir_expr_to_sql; literal_to_sql stays (still used by the mutation delete path).

* feat(engine): add TableStore::scan_edges_by_endpoint for indexed neighbor lookup

Static helper returning edge rows that match a set of endpoint keys on src/dst,
projected to [key_col, opposite_col], via a structured `key_col IN (keys)`
filter_expr. Lance routes it through the persisted BTREE on the endpoint column
(index-search -> take), so cost scales with the frontier size rather than |E|.

Unused until execute_expand's indexed mode lands; isolated in its own commit so
the storage-layer primitive is reviewable on its own.

* feat(engine): add BTREE-indexed Expand traversal path

Split execute_expand into a dispatcher over execute_expand_csr (the existing
in-memory CSR BFS, unchanged) and a new execute_expand_indexed that serves each
hop by batching the frontier into one scan_edges_by_endpoint call against the
persisted src/dst BTREE (index-search -> take), then fans out per source row.
Both share expand_hydrate_and_align — the destination hydration + alignment +
hconcat + in-memory non-pushable filters — which now aligns by string id (a
HashMap) instead of a dense row-id vec, so one tail serves both modes.

Mode selection is OMNIGRAPH_TRAVERSAL_MODE for now (default csr); the
frontier-size auto policy and lazy CSR build follow. AntiJoin stays on CSR.

tests/traversal_indexed.rs (its own #[serial] binary, so env writes never race a
reader) asserts the indexed path matches CSR for one-hop, multi-hop, cross-type,
and no-match cases, and that a freshly-appended unindexed edge is still found
(partial index coverage — fast_search=false unindexed-fragment scan).

* feat(engine): frontier-size Expand dispatcher + lazy CSR build

Replace the env-only mode switch with an auto policy: Expand uses the
BTREE-indexed path when the source frontier is small and the hop count bounded
(OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024, OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6),
else the in-memory CSR. OMNIGRAPH_TRAVERSAL_MODE=indexed|csr still forces a mode.

Make the CSR index lazy: thread a GraphIndexHandle (memoizing OnceCell over a
Cached/Direct/None builder) through execute_query/execute_pipeline/
execute_rrf_query/execute_anti_join instead of a pre-built Option<&GraphIndex>.
A query served entirely by the indexed path with no AntiJoin never pays the
O(|E|) CSR build — the perf win of Tier 3. AntiJoin still realizes the index
(its negation uses CSR has_neighbors).

Net effect: selective traversals (the common case) skip the whole-graph CSR
build and resolve neighbors from the persisted, incrementally-maintained
src/dst BTREE. Existing traversal/aggregation/end_to_end/search suites now run
the indexed path by default and stay green.

Docs: constants.md (new env knobs), query-language.md (Expand dual path),
indexes.md (graph index is lazy + the indexed alternative).

* test(engine): bench indexed vs CSR selective traversal

Add a selective single-source knows{1,2} comparison to bench_expand: per growing
|E|, time the cold query in csr vs indexed mode (fresh db each, so CSR pays its
O(|E|) build) and assert both modes return identical rows — a guard against the
scalar-index physical_rows silent fallback dropping unindexed-fragment rows. The
existing dense hop1/2/3 latency bench is unchanged.

* feat(engine): surface silent scalar-index fallback in indexed traversal (C6)

Add TableStore::key_column_index_coverage — a metadata-only check (no IO) of
whether a `key_col IN (...)` scan will be served by the persisted BTREE or
silently fall back to a full filtered scan, mirroring Lance's own decision:
no BTREE on the column, or any fragment missing physical_rows (which disables
scalar indices for the whole scan, lance dataset/scanner.rs create_filter_plan).
execute_expand_indexed calls it once per traversal and tracing::warn!s on
Degraded, so the perf cliff is observable instead of hidden behind a bench oracle.

Detection-only: results are correct either way (the scan returns all rows). Closes
the "no silent failures" gap the traversal best-practice audit flagged as the top
deviation, and adds an IndexCoverage value a future cost-based planner can consume.

* perf(engine): dense-id BFS on the indexed traversal path (C3)

execute_expand_indexed ran its per-source BFS in string space
(Vec<HashSet<String>>, HashMap<String,Vec<String>>, ~4 String clones per neighbor
occurrence). Intern node ids to u32 once via a per-traversal TypeIndex (no
GraphIndex/CSR build — laziness preserved) and run visited/seen/frontier/
neighbor-map in dense u32 space, mirroring the CSR path; de-intern only for the
per-hop IN-list and the emitted dst ids handed to the hydrate+align tail.

Behavior-preserving — the traversal_indexed CSR-vs-indexed equivalence tests are
the guard (results are identical, the key type just changes String -> u32).

* refactor(engine): thread the opened edge dataset into indexed Expand

Hoist the edge-dataset open and the C6 index-coverage warning out of
execute_expand_indexed into execute_expand, threading the opened dataset in
as a parameter so it is opened exactly once. Extract the endpoint-column
mapping (endpoint_columns) and the coverage warning (warn_on_degraded_coverage)
as helpers.

Behavior-preserving: same dataset, same warning, same dispatch decision. This
only relocates the open so the upcoming cost-based chooser can consult index
coverage before dispatch without opening the dataset twice.

* feat(engine): cost-based Expand dispatch chooser (C5)

Replace the fixed frontier<=1024 && hops<=6 dispatch threshold with a pure,
IO-free cost model. choose_expand_mode compares the indexed path's
frontier-relative work (hops * frontier * fanout, or hops * |E| when BTREE
coverage is degraded) against the cost of building the whole-graph CSR
(BUILD_FACTOR * |E|), from cheap manifest row counts. Under good coverage this
reduces to a selectivity ratio independent of |E|, preserving the flat-in-|E|
indexed win for selective traversals while routing dense / deep / high-fanout
or degraded-and-expensive traversals to CSR.

execute_expand decides cardinality-first and only opens the edge dataset to
confirm coverage when it leans indexed (no open on a clearly-CSR traversal).
The two env knobs become hard ceilings layered on the model; the
OMNIGRAPH_TRAVERSAL_MODE override still forces a path; the chosen mode is
traced. Results are unchanged across modes — only the path differs.

Adds inline crossover unit tests and extends the traversal_indexed both_modes
harness with an auto pass asserting the chooser is result-preserving across
every traversal shape. Documents the new flag semantics in
docs/user/{constants,query-language}.md.

* test(engine): pin Lance scalar-index coverage + system-column/deletion-metadata surface

Add three Lance surface guards de-risking a future persisted-adjacency cache:
- a compile-only guard pinning the fragment physical_rows + index-detail
  surface that key_column_index_coverage mirrors (the C6 fallback);
- a runtime probe confirming a scalar BTREE on the system column
  _row_last_updated_at_version is not buildable via the normal create-index
  path (the column is not in the user schema), so a version-column range delta
  is not viable as drafted;
- a runtime probe confirming per-fragment deletion metadata
  (deletion_file.num_deleted_rows) is available as cheap O(fragments) metadata,
  the primitive a fragment-coverage delete model would rely on.

The probes turn the two largest substrate assumptions into green/red CI facts
before any cache work begins.

* test(engine): regression for cross-type id-collision in indexed traversal

A node id is unique only within a type, so a Person and a Company can share an
id string. A variable-length traversal over a cross-type edge (WorksAt) must
structurally stop after one hop. This test builds a graph where 'shared' is both
a Person and a Company id and asserts worksAt{1,2} returns only the one-hop
company. It fails today: the indexed path's single string interner de-interns
the hop-1 Company id back to the colliding Person id and runs a hop-2 scan that
matches that Person's edges, emitting a spurious second-hop company (indexed
["other","shared"] vs csr ["shared"]).

* fix(engine): structurally cap cross-type Expand at one hop

A cross-type edge cannot chain (e.g. a Company is not a WorksAt source), so a
variable-length traversal over one is structurally single-hop. Both traversal
paths now enforce this by capping max hops at 1 when from_type != to_type,
instead of relying on the hop-2 scan returning empty.

That reliance was a correctness hole on the indexed path: it interns every
endpoint string into one dense id space, so a cross-type id-string collision (a
Person and a Company sharing an id) let hop 2 de-intern a destination id back to
the colliding source-type id and match its edges, emitting rows the CSR path
never produces. With the cap the cross-type second-hop scan never runs, so the
shared interner can no longer alias across types. Turns the regression test
green (indexed == csr == ["shared"]).

* perf(engine): set-oriented filtered anti-join, remove per-row dispatch

execute_anti_join's filtered slow path sliced the outer batch to one row at a
time and re-ran the inner pipeline per row, so each 1-row inner Expand dispatched
to the indexed path — one Lance scan per outer row, while the CSR realized up
front sat unused.

Replace it with a set-oriented anti-semi-join: tag each outer row with a
synthetic index column, run the inner pipeline once over the whole frontier (the
tag survives Expand's hconcat and Filter's row-drop), then exclude outer rows
whose tag survived. The inner Expand now runs as a single set-at-a-time traversal
over the full frontier; config is read once per operator, not per row (the env
nit is mooted). A produced-but-untagged inner batch fails loudly rather than
silently keeping every row. Results are unchanged (the predicated-negation tests
exercise the path over a multi-row outer with dst-filters).

* test(engine): drop flaky wall-clock budget from the merge truth table

The 30s wall-clock assertion in merge_pair_truth_table flakes under parallel
test load: it tripped at ~31s in the full --test-threads=4 gate while passing at
~20s in isolation. A fixed time budget in a correctness test depends on machine
and parallelism, not correctness; elapsed is still logged for visibility, and a
real merge-perf regression belongs in a bench. The cell-count correctness
assertions (81 / 36 / 45) are unchanged.

* fix(engine): total deterministic ORDER via entity-key tie-break + NULL contract

apply_ordering used an unstable lexsort with no tie-break, so rows with equal
user-sort keys came out in a run-dependent order (the input order depends on
scan parallelism / upstream hashing) — making ORDER ... LIMIT non-deterministic,
a latent deny-list violation (no nondeterministic result ordering).

Append the bound entities' key columns (<var>.id, unique per row) in canonical
name-sorted order as ascending tie-breaks, giving a total, reproducible order
(and a deterministic top-N when ties straddle the LIMIT cutoff). NULL placement
(nulls_first = !descending) is unchanged and now documented as the contract.

New tests/ordering.rs locks descending, multi-key precedence, the deterministic
key tie-break (data loaded in a different order than the expected output, so it
proves the tie sorts by key not by load order), and NULL placement under ASC/DESC.
docs/user/query-language.md documents the total-order + NULL contract.

* test(engine): property-based query-correctness invariants over generated graphs

Adds a proptest harness (new dev-dep) that generates small graphs whose Person
and Company keys are drawn from a shared 5-key alphabet, so cross-type id
collisions, cycles, and self-loops arise by search rather than from one
hand-built fixture. Three invariants:

- prop_expand_indexed_eq_csr: csr == indexed == auto over knows{1,3} (same-type,
  cycles) and worksAt{1,2} (cross-type, collision-prone) from every start.
- prop_results_subset_of_existing_nodes: no phantom rows (catches over-emission
  even if both modes are wrong identically).
- prop_antijoin_partitions_persons: not{worksAt} and its complement are disjoint
  and cover all persons.

Verified the guard bites: neutering the cross-type hop cap makes
prop_expand_indexed_eq_csr fail and proptest shrinks it to persons["c","e"] /
companies["b","c"] — the cross-type collision class the hand-built fixture
only sampled once. Tests are sync + #[serial] (per-case runtime; the mode test
writes OMNIGRAPH_TRAVERSAL_MODE).

* test(engine): cover cycle/self-loop termination + nested anti-join (C5 edge cases)

- variable_hops_terminate_and_dedup_on_cycle: a 3-cycle a->b->c->a traversed with
  knows{1,5} (ceiling above the cycle length) terminates and emits each node once
  (the c->a back-edge hits the seeded source); both_modes confirms indexed == csr.
  Uses a bounded range deliberately — unbounded {1,} is a typecheck error, not a
  runtime path.
- variable_hops_handle_self_loop: a->a self-loop does not loop forever and does
  not re-emit the seeded source.
- nested_anti_join_double_negation: not { worksAt; not { name = Acme } } recurses
  through execute_pipeline, yielding [Alice,Charlie,Diana] (people with no non-Acme
  employer) — distinct from plain unemployed [Charlie,Diana].

* test(engine): execution goldens for typed-literal filters (C4 gap #4)

New literal_filters.rs covers filtering by F64/F32/Bool/Date/DateTime LITERALS
across both arms: standalone comparisons ($m.score > 1.5, $m.ratio <= 0.25,
$m.active = true, $m.born >= date(...), $m.seen < datetime(...)) exercise the
in-memory comparison path, and inline bindings (Metric { active: true },
Metric { score: 3.0 }) exercise Lance filter_expr pushdown. Seeds partition each
predicate so a dropped/miscast filter returns all rows. (Param-bound scalars and
list-column contains are covered elsewhere.)

* test(engine): full rank-order goldens for nearest + bm25 (gap #2)

Existing search tests stopped at top-1 (nearest) or non-empty (bm25), so a
regression corrupting ranks 2..k or reversing the sort direction passed CI
silently. Pin the FULL ordered slug list: nearest([0.1,0.2,0.3,0.4]) ->
[ml-intro, nlp-guide, rl-intro] (ml-intro exact at dist 0, rest by ascending
L2); bm25(Learning) -> [rl-intro, ml-intro, dl-basics] (descending score).
nearest/bm25 skip apply_ordering (is_search_ordered) and return Lance native
order, so result_slugs row order == rank order; values resolved by running and
confirmed stable across runs.

* test(engine): search fuzzy/match_text characterization + RRF non-default pairings

- match_text_matches_exact_set_excludes_unrelated: match_text(body,'neural') ==
  [dl-basics] exactly (not just contains).
- fuzzy_does_not_match_under_default_tokenizer: characterizes that fuzzy() is
  inert with the default tokenizer here (search/match_text work, fuzzy returns
  nothing); turns red — to be promoted to a real golden — if fuzzy starts matching.
- rrf_fuses_two_fts_fields / rrf_fuses_two_vector_queries: RRF fuses arms other
  than the default nearest+bm25 (bm25 title+body; two vector queries), proving
  primary_var resolves and fusion runs. New fixtures/search.gq queries +
  two_vector_params helper. Orders resolved by running, confirmed stable.

* test(engine): anti-join fast-vs-slow path equivalence harness

anti_join_fast_and_slow_paths_agree: the CSR has_neighbors fast path
(not { $p worksAt $_ }) and the set-oriented inner-pipeline replay (same
negation forced slow by an always-true $c.name != "" dst filter) must produce
the same result ([Charlie, Diana]). Closes the second real engine fork explicitly.

* test(engine): regression for nested slow-path anti-join tag collision

A nested not { ... not { ... } } where both levels hit the set-oriented slow
path collides on the fixed __antijoin_outer_row correlation column: the inner
call appends a duplicate, and column_by_name reads the OUTER tag. Fan-out (p1
works at two companies) makes inner row indices diverge from outer tags, so the
bug returns the wrong person set. Fails on current code (left ["p2","p4"] vs
right ["p3","p4"]).

* fix(engine): collision-free anti-join correlation tag for nested negation

The set-oriented anti-join tagged the outer batch with a fixed column name and
read it back by name. Under a nested slow-path anti-join the enclosing tag rides
through the inner pipeline, so the inner call produced a duplicate field; Arrow
permits duplicate names and column_by_name returns the first, so the inner
negation mis-correlated against the outer row indices.

Choose a tag name not already present in the batch (suffix-incremented), so each
nesting level reads its own correlation column. Turns the fan-out regression
green; the existing nested/fast-vs-slow/proptest anti-join invariants still pass.

* fix(engine): cap cross-type hops in the Expand cost model

gather_cost_inputs fed the requested max_hops into choose_expand_mode even though
execute_expand_indexed runs at most one hop for a cross-type edge. So a cross-type
variable-length expand (e.g. worksAt{1,5}) had its indexed cost scaled by 5 while
only one hop runs, skewing the chooser toward CSR (an unnecessary whole-graph
build) near the crossover. Results were unaffected (modes are equivalent); this
is a plan-accuracy fix.

Add cost_effective_hops(requested, same_type) — caps to 1 for cross-type — and
apply it in gather_cost_inputs so the estimate matches what executes. Unit test
covers the cap and the crossover consequence (capped 1 hop stays indexed where
the requested 5 would have flipped to CSR).

* perf(engine): realize anti-join CSR lazily + reuse a warm CSR in the chooser

Two CSR build/reuse fixes flagged on the set-oriented anti-join work (results
unchanged — plan/perf accuracy):

- execute_anti_join called graph_index.get() (the O(|E|) whole-graph CSR build)
  unconditionally, but only the bulk fast path consumes it; a filtered/nested
  slow-path anti-join's inner Expand picks its own access path. Gate the build on
  a pure shape predicate (bulk_anti_join_applies) so a selective anti-join over a
  large graph no longer pays a build it won't use.
- gather_cost_inputs hardcoded csr_cached=false, so once an earlier op realized
  the CSR, later Expands still cost it as a cold build and could pick per-hop
  indexed scans over reusing the warm in-memory CSR. Add GraphIndexHandle::
  is_built() and thread it through so the chooser reuses a materialized CSR.

Anti-join, cross-type, proptest-equivalence, and chooser unit tests stay green.

* test(engine): RAII traversal-mode guard in proptest equivalence

prop_expand_indexed_eq_csr set/cleared OMNIGRAPH_TRAVERSAL_MODE manually; a panic
between set and clear (e.g. a query unwrap on a generated case) would leak the
forced mode into proptest's shrink/subsequent cases and mask the divergence under
test. Replace with a ModeGuard that clears on drop (including on unwind), scoping
the forced mode to a single query.

* test(engine): regression for multi-hop anti-join hop bounds

The bulk anti-join fast path answers via has_neighbors (one-hop existence), so
not { $p knows{2,2} $x } wrongly drops a node with a 1-hop neighbor but no
2-hop path. On a->b (sink) and c->d->e, only c has a 2-hop path; the query should
keep [a,b,d,e]. Fails on current code (left ["b","e"] — only the sinks).

* fix(engine): restrict anti-join bulk fast path to one-hop expands

bulk_anti_join_applies accepted any single Expand, but try_bulk_anti_join_mask
decides via the CSR has_neighbors one-hop existence check — wrong for multi-hop
negations. Require min_hops==1 && max_hops==1 in the predicate; anything else
falls to the slow path, whose inner Expand runs the real bounded traversal.
Turns the multi-hop regression green; one-hop anti-joins unchanged.

* fix(engine): IndexCoverage reports Degraded for uncovered fragments

key_column_index_coverage checked BTREE-exists + physical_rows but not that the
index actually covers the current fragments. Since edge-index creation is skipped
once a BTREE exists, fragments appended later stay unindexed while coverage still
reported Indexed — so the cost chooser priced a partly-full scan as fully indexed.

Compare the BTREE's fragment_bitmap (public on lance_table IndexMetadata) against
the dataset's current fragment ids; report Degraded when any are uncovered. A None
bitmap means Lance can't report coverage — don't over-degrade. Results are
unaffected (the scan returns unindexed-fragment rows either way); this corrects
the cost signal.

Test: a freshly-loaded edge BTREE is Indexed; after appending an edge the new
fragment is uncovered → Degraded. Surface guard pins IndexMetadata.fragment_bitmap.

* docs: clarify the Expand frontier ceiling bounds the initial dispatch frontier

The cap is applied at dispatch on the initial frontier; per-hop fan-out
(union_dense) is not hard-capped. Correct the constants.md and query-language.md
claims: the ceilings bound the initial-dispatch frontier/hops, the cost model
estimates total indexed work as ~hops*frontier*fanout (pricing dense fan-out
toward CSR), and per-hop work is not a hard bound. Drops the overstated 'hard
caps bound indexed work' / 'cost ∝ frontier' wording.
This commit is contained in:
Ragnor Comerford 2026-06-09 18:09:13 +02:00 committed by GitHub
parent 59b64ea097
commit dbfdddc952
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 2570 additions and 196 deletions

53
Cargo.lock generated
View file

@ -4627,6 +4627,7 @@ dependencies = [
"object_store 0.12.5",
"omnigraph-compiler",
"omnigraph-policy",
"proptest",
"regex",
"reqwest",
"serde",
@ -5141,6 +5142,25 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "proptest"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744"
dependencies = [
"bit-set",
"bit-vec",
"bitflags",
"num-traits",
"rand 0.9.2",
"rand_chacha 0.9.0",
"rand_xorshift",
"regex-syntax",
"rusty-fork",
"tempfile",
"unarray",
]
[[package]]
name = "prost"
version = "0.14.3"
@ -5202,6 +5222,12 @@ dependencies = [
"cc",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-xml"
version = "0.37.5"
@ -5373,6 +5399,15 @@ dependencies = [
"rand 0.9.2",
]
[[package]]
name = "rand_xorshift"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a"
dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "rand_xoshiro"
version = "0.7.0"
@ -5772,6 +5807,18 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "rusty-fork"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc6bf79ff24e648f6da1f8d1f011e9cac26491b619e6b9280f2b47f1774e6ee2"
dependencies = [
"fnv",
"quick-error",
"tempfile",
"wait-timeout",
]
[[package]]
name = "ryu"
version = "1.0.23"
@ -6759,6 +6806,12 @@ dependencies = [
"web-time",
]
[[package]]
name = "unarray"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicase"
version = "2.9.0"

View file

@ -55,3 +55,4 @@ omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
tokio = { workspace = true }
lance-namespace-impls = { workspace = true }
serial_test = "3"
proptest = "1"

View file

@ -221,6 +221,65 @@ fn microbench_dedup() {
);
}
/// Selective single-source traversal, timed cold in CSR vs indexed mode across
/// growing |E|. The win of the indexed path: a small fixed frontier should be
/// ~flat in |E| (one BTREE scan per hop), whereas CSR pays an O(|E|) adjacency
/// build on the first (cold) query. Also asserts both modes return the same
/// rows — a guard against the scalar-index `physical_rows` silent fallback
/// dropping unindexed-fragment rows.
async fn bench_selective_modes() {
println!("\n── Selective traversal: indexed vs CSR (cold, single-source knows{{1,2}}) ──");
let sel = r#"
query sel($name: String) {
match {
$a: Person { name: $name }
$a knows{1,2} $b
}
return { $b.name }
}
"#;
for &(n, avg_deg) in &[(1_000usize, 8usize), (10_000, 8), (30_000, 8)] {
let jsonl = generate_jsonl(n, avg_deg, 42);
let mut params = ParamMap::new();
params.insert(
"name".to_string(),
omnigraph_compiler::query::ast::Literal::String("p0".to_string()),
);
let mut rows_by_mode: Vec<(&str, usize)> = Vec::new();
for mode in ["csr", "indexed"] {
// Fresh db per measurement so the query is cold (CSR pays its build).
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
load_jsonl(&mut db, &jsonl, LoadMode::Overwrite).await.unwrap();
// SAFE: example main drives queries sequentially; no concurrent env reader.
unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) };
let t = Instant::now();
let r = db
.query(ReadTarget::branch("main"), sel, "sel", &params)
.await
.expect("sel query");
let elapsed = t.elapsed();
let rows = r.num_rows();
rows_by_mode.push((mode, rows));
println!(
" |E|≈{:>7} {:<8} cold={:>9.2?} rows={}",
n * avg_deg,
mode,
elapsed,
rows
);
}
unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") };
assert_eq!(
rows_by_mode[0].1, rows_by_mode[1].1,
"indexed and CSR must return identical rows (no silent drop under partial index coverage)"
);
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() {
println!("── End-to-end query latency ──");
@ -262,5 +321,7 @@ async fn main() {
}
}
bench_selective_modes().await;
microbench_dedup();
}

View file

@ -422,6 +422,35 @@ pub(super) fn apply_ordering(
});
}
// Deterministic tie-break for a TOTAL order. `lexsort_to_indices` is unstable
// and the input row order is not guaranteed (scan parallelism, upstream
// hashing), so equal user-sort keys would otherwise come out run-dependent —
// making `ORDER ... LIMIT` non-deterministic. Append the bound entities' key
// columns (`<var>.id`, unique per row) in canonical (name-sorted) order as
// ascending tie-breaks. The combination of all bound keys uniquely identifies
// a result row, so the order is total and reproducible. (Aggregate results
// have no `.id` columns; their group rows are already distinct on the
// projected group keys.)
let mut tiebreak_cols: Vec<String> = source
.schema()
.fields()
.iter()
.map(|f| f.name().to_string())
.filter(|name| name.ends_with(".id"))
.collect();
tiebreak_cols.sort();
for name in &tiebreak_cols {
if let Some(col) = source.column_by_name(name) {
sort_columns.push(SortColumn {
values: col.clone(),
options: Some(arrow_schema::SortOptions {
descending: false,
nulls_first: true,
}),
});
}
}
let indices =
lexsort_to_indices(&sort_columns, None).map_err(|e| OmniError::Lance(e.to_string()))?;

File diff suppressed because it is too large Load diff

View file

@ -43,6 +43,19 @@ pub struct DeleteState {
pub(crate) version_metadata: TableVersionMetadata,
}
/// Whether a `key_col IN (...)` scan on a dataset will be served by the
/// persisted scalar (BTREE) index, or silently fall back to a full filtered
/// scan. Detection-only (metadata, no IO); the scan returns the correct rows
/// either way. Surfaced by the indexed traversal path so the silent perf
/// fallback is observable, and available to a future cost-based planner.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IndexCoverage {
/// The column has a usable BTREE and every fragment records `physical_rows`.
Indexed,
/// Lance will not use the scalar index for this scan (correct, full scan).
Degraded { reason: String },
}
/// A Lance write that has produced fragment files on object storage but is
/// not yet committed to the dataset's manifest. The staged-write primitives
/// are consumed by `MutationStaging` (`exec/staging.rs`,
@ -582,6 +595,117 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Indexed neighbor lookup for graph traversal. Given an edge dataset and a
/// set of endpoint keys on `key_col` (`"src"` for out-traversal, `"dst"` for
/// in-traversal), return the matching edge rows projected to
/// `[key_col, opposite_col]`.
///
/// The `key_col IN (keys)` predicate is built as a structured DataFusion
/// `Expr` and applied via `Scanner::filter_expr`, so Lance routes it through
/// the persisted BTREE on `key_col` (index-search → take). Cost scales with
/// the frontier size, not |E| — the basis for serving selective traversals
/// without building the whole in-memory CSR. Empty `keys` returns empty
/// without scanning.
///
/// Note: like any indexed scan, this observes only fragments the BTREE
/// covers plus an unindexed-fragment scan fallback; it reads the committed
/// snapshot `ds` was opened at.
pub async fn scan_edges_by_endpoint(
ds: &Dataset,
key_col: &str,
opposite_col: &str,
keys: &[String],
) -> Result<Vec<RecordBatch>> {
use datafusion::prelude::{col, lit};
if keys.is_empty() {
return Ok(Vec::new());
}
let key_list: Vec<datafusion::prelude::Expr> =
keys.iter().map(|k| lit(k.clone())).collect();
let filter_expr = col(key_col).in_list(key_list, false);
Self::scan_stream_with(
ds,
Some(&[key_col, opposite_col]),
None,
None,
false,
|scanner| {
scanner.filter_expr(filter_expr);
Ok(())
},
)
.await?
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Metadata-only check (no IO) of whether `scan_edges_by_endpoint` — a
/// `key_col IN (...)` filter — on `ds` will be served by the persisted BTREE
/// on `column`, or silently fall back to a full filtered scan. Mirrors
/// Lance's own decision: scalar indices are disabled for the whole scan if
/// ANY fragment lacks `physical_rows` (lance `dataset/scanner.rs`
/// `create_filter_plan`), and are obviously unused if no BTREE on the
/// column exists. The scan is correct (returns all rows) either way — this
/// only surfaces the perf cliff so the indexed traversal can warn on it.
pub async fn key_column_index_coverage(ds: &Dataset, column: &str) -> Result<IndexCoverage> {
let Some(field_id) = ds.schema().field(column).map(|field| field.id) else {
return Ok(IndexCoverage::Degraded {
reason: format!("column '{}' not in schema", column),
});
};
let indices = ds
.load_indices()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let btree = indices
.iter()
.filter(|index| !is_system_index(index))
.filter(|index| index.fields.len() == 1 && index.fields[0] == field_id)
.find(|index| {
index
.index_details
.as_ref()
.map(|details| details.type_url.ends_with("BTreeIndexDetails"))
.unwrap_or(false)
});
let Some(btree) = btree else {
return Ok(IndexCoverage::Degraded {
reason: format!("no BTREE index on '{}'", column),
});
};
// Same check Lance runs: a fragment missing physical_rows disables
// scalar indices for the entire scan (all-or-nothing).
if ds.fragments().iter().any(|f| f.physical_rows.is_none()) {
return Ok(IndexCoverage::Degraded {
reason: "a fragment is missing physical_rows".to_string(),
});
}
// An index only covers the fragments it was built over; fragments
// appended afterward (edge-index creation is skipped once a BTREE exists)
// are scanned unindexed. If any CURRENT fragment is absent from the
// index's `fragment_bitmap`, the scan is partly a full scan — so the
// chooser must not price it as fully indexed. A `None` bitmap means Lance
// can't report coverage; don't over-degrade in that case.
if let Some(bitmap) = btree.fragment_bitmap.as_ref() {
let uncovered = ds
.fragments()
.iter()
.filter(|f| !bitmap.contains(f.id as u32))
.count();
if uncovered > 0 {
return Ok(IndexCoverage::Degraded {
reason: format!(
"{} fragment(s) not covered by the index on '{}'",
uncovered, column
),
});
}
}
Ok(IndexCoverage::Indexed)
}
pub async fn count_rows(&self, ds: &Dataset, filter: Option<String>) -> Result<usize> {
ds.count_rows(filter)
.await

View file

@ -42,3 +42,17 @@ query hybrid_search($vq: Vector(4), $tq: String) {
order { rrf(nearest($d.embedding, $vq), bm25($d.title, $tq)) }
limit 3
}
query rrf_two_fts($q: String) {
match { $d: Doc }
return { $d.slug, $d.title }
order { rrf(bm25($d.title, $q), bm25($d.body, $q)) }
limit 3
}
query rrf_two_vectors($q1: Vector(4), $q2: Vector(4)) {
match { $d: Doc }
return { $d.slug, $d.title }
order { rrf(nearest($d.embedding, $q1), nearest($d.embedding, $q2)) }
limit 3
}

View file

@ -236,6 +236,15 @@ pub fn vector_param(name: &str, values: &[f32]) -> ParamMap {
map
}
/// Build a ParamMap with two vector params.
pub fn two_vector_params(name1: &str, vals1: &[f32], name2: &str, vals2: &[f32]) -> ParamMap {
let mut map = vector_param(name1, vals1);
let key = name2.strip_prefix('$').unwrap_or(name2).to_string();
let lit = Literal::List(vals2.iter().map(|v| Literal::Float(*v as f64)).collect());
map.insert(key, lit);
map
}
/// Build a ParamMap with a vector param and a string param.
pub fn vector_and_string_params(
vec_name: &str,

View file

@ -33,7 +33,10 @@ use lance::dataset::optimize::{CompactionOptions, compact_files};
use lance::dataset::transaction::Operation;
use lance::dataset::write::delete::DeleteResult;
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams};
use lance::index::DatasetIndexExt;
use lance_file::version::LanceFileVersion;
use lance_index::IndexType;
use lance_index::scalar::ScalarIndexParams;
use lance_namespace::LanceNamespace;
use lance_table::io::commit::ManifestNamingScheme;
@ -406,3 +409,135 @@ async fn compact_files_still_fails_on_blob_columns() {
shifted): {err}"
);
}
// --- Guard 11: scalar-index coverage surface (physical_rows + index details) ---
//
// `table_store.rs::key_column_index_coverage` mirrors Lance's `create_filter_plan`
// C6 fallback: it reads `fragment.physical_rows` (the field whose absence on ANY
// fragment disables the scalar index for the whole scan) and sniffs the BTREE via
// `load_indices()` → `index.fields` / `index.index_details.type_url`. This is the
// one real Lance-internal coupling on the indexed-traversal read path. If any of
// these surfaces renames or changes type, the coverage check (and the cost-based
// traversal chooser that consumes it) silently misclassifies. Compile-only.
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_scalar_index_coverage_surface() -> lance::Result<()> {
let ds: Dataset = unimplemented!();
// The create_filter_plan coupling: a fragment lacking `physical_rows`
// disables the scalar index for the entire scan.
for frag in ds.fragments().iter() {
let _physical_rows: Option<usize> = frag.physical_rows;
// `key_column_index_coverage` checks each current fragment id against the
// index `fragment_bitmap`.
let _id: u64 = frag.id;
}
// The index sniff: BTREE presence is detected by single-field index whose
// details type_url ends with "BTreeIndexDetails". The fragment coverage check
// reads `fragment_bitmap` (Option<RoaringBitmap>) and calls `.contains(u32)`.
let indices = ds.load_indices().await?;
for index in indices.iter() {
let _fields: &Vec<i32> = &index.fields;
if let Some(details) = index.index_details.as_ref() {
let _type_url: &str = details.type_url.as_str();
}
let _covered: Option<bool> = index.fragment_bitmap.as_ref().map(|b| b.contains(0u32));
}
Ok(())
}
// --- Guard 12: can a scalar BTREE be built on a system version column? --------
//
// The deferred persisted-adjacency artifact plan assumed a cheap delta read of
// `_row_last_updated_at_version > V` could be a BTREE range lookup. Lance resolves
// index columns from the dataset schema, and the version columns are system
// metadata — so this probe documents whether the assumption holds. The outcome is
// the load-bearing fact, not a pass/fail of intent: if this starts SUCCEEDING when
// it currently errors (or vice versa), the artifact's delta-cost story changes.
#[tokio::test]
async fn scalar_index_on_system_version_column_probe() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard12.lance");
let mut ds = fresh_dataset(uri.to_str().unwrap()).await;
// Sanity: the system version column is present (stable row ids + V2_2).
assert!(
ds.schema().field("_row_last_updated_at_version").is_none(),
"PROBE NOTE: `_row_last_updated_at_version` is NOT in the user schema \
(it is system metadata); indexing it resolves through a different path."
);
let result = ds
.create_index_builder(
&["_row_last_updated_at_version"],
IndexType::BTree,
&ScalarIndexParams::default(),
)
.replace(true)
.await;
// Pin the observed behavior: a scalar index on the system version column is
// NOT buildable via the normal create-index path in this Lance. If this turns
// green (Ok), the artifact delta CAN use a version-column BTREE — revisit the
// deferred plan's Phase-2 delta-cost note in docs/dev/traversal handoff.
assert!(
result.is_err(),
"create_index on `_row_last_updated_at_version` unexpectedly SUCCEEDED — \
a system-column scalar index is now buildable; the persisted-artifact \
delta read could use it. Update the deferred-design notes."
);
}
// --- Guard 13: per-fragment deletion metadata is exposed without a scan -------
//
// The deferred artifact's delete-correctness coverage model needs to detect,
// cheaply (O(fragments), no row scan), that a covered fragment acquired new
// deletions. That hinges on Lance tracking deletions at fragment-metadata level.
// This pins that a delete populates `fragment.deletion_file`, and probes whether
// the deleted-row COUNT is available as metadata (`num_deleted_rows`) — the
// difference between an O(fragments) coverage check and an O(|E|) scan.
#[tokio::test]
async fn fragment_deletion_metadata_is_available() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().join("guard13.lance");
let ds = fresh_dataset(uri.to_str().unwrap()).await; // 2 rows: alice, bob
let deleted: DeleteResult = {
let mut ds = ds;
ds.delete("id = 'alice'").await.unwrap()
};
assert_eq!(deleted.num_deleted_rows, 1, "one row deleted");
let ds = deleted.new_dataset;
// A delete must be tracked at fragment-metadata level (not only in data).
let with_deletion = ds
.fragments()
.iter()
.find(|f| f.deletion_file.is_some())
.expect(
"after a delete, some fragment must carry a deletion_file — if not, \
Lance changed deletion tracking; the artifact coverage model's \
cheap delete-detection assumption is invalid.",
);
// Probe: is the deleted-row count available as metadata (cheap), or must the
// deletion vector be read? Pin whichever holds so the artifact plan knows.
let count: Option<usize> = with_deletion
.deletion_file
.as_ref()
.and_then(|df| df.num_deleted_rows);
assert_eq!(
count,
Some(1),
"PROBE: deletion_file.num_deleted_rows is not a populated metadata count \
(got {count:?}); the artifact coverage model cannot cheaply detect \
per-fragment deletions and would need to read the deletion vector.",
);
}

View file

@ -0,0 +1,96 @@
//! Execution goldens for filtering by non-string/non-integer scalar LITERALS
//! (F64, F32, Bool, Date, DateTime), across both the in-memory comparison arm
//! (standalone `$m.prop op lit`) and the Lance-pushdown arm (inline binding
//! `Metric { prop: lit }`). Param-bound scalar filters and list-column
//! `contains` are already covered elsewhere; this closes the literal-RHS gap.
mod helpers;
use arrow_array::{Array, StringArray};
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;
use helpers::*;
const SCHEMA: &str = r#"
node Metric {
name: String @key
score: F64?
ratio: F32?
active: Bool?
born: Date?
seen: DateTime?
}
"#;
// Seeds partition every predicate, so a dropped filter returns all 4 rows.
const DATA: &str = r#"{"type":"Metric","data":{"name":"m1","score":2.5,"ratio":0.5,"active":true,"born":"2024-06-01","seen":"2024-06-01T12:00:00Z"}}
{"type":"Metric","data":{"name":"m2","score":1.0,"ratio":0.25,"active":false,"born":"2023-01-01","seen":"2023-01-01T00:00:00Z"}}
{"type":"Metric","data":{"name":"m3","score":3.0,"ratio":0.75,"active":true,"born":"2025-01-01","seen":"2025-01-01T00:00:00Z"}}
{"type":"Metric","data":{"name":"m4","score":0.5,"ratio":0.1,"active":false,"born":"2022-12-31","seen":"2022-01-01T00:00:00Z"}}"#;
async fn metric_db(dir: &tempfile::TempDir) -> Omnigraph {
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
load_jsonl(&mut db, DATA, LoadMode::Overwrite).await.unwrap();
db
}
async fn sorted_metric_names(db: &mut Omnigraph, queries: &str, name: &str) -> Vec<String> {
let r = query_main(db, queries, name, &ParamMap::new()).await.unwrap();
if r.num_rows() == 0 {
return Vec::new();
}
let b = r.concat_batches().unwrap();
let col = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let mut v: Vec<String> = (0..col.len()).map(|i| col.value(i).to_string()).collect();
v.sort();
v
}
#[tokio::test]
async fn float_literal_filters_execute() {
let dir = tempfile::tempdir().unwrap();
let mut db = metric_db(&dir).await;
let q = r#"
query gt() { match { $m: Metric $m.score > 1.5 } return { $m.name } }
query le() { match { $m: Metric $m.ratio <= 0.25 } return { $m.name } }
query inline() { match { $m: Metric { score: 3.0 } } return { $m.name } }
"#;
// F64 standalone: scores 2.5, 3.0 > 1.5
assert_eq!(sorted_metric_names(&mut db, q, "gt").await, vec!["m1", "m3"]);
// F32 standalone: ratios 0.25, 0.1 <= 0.25
assert_eq!(sorted_metric_names(&mut db, q, "le").await, vec!["m2", "m4"]);
// F64 inline-binding pushdown: score == 3.0
assert_eq!(sorted_metric_names(&mut db, q, "inline").await, vec!["m3"]);
}
#[tokio::test]
async fn bool_literal_filters_execute() {
let dir = tempfile::tempdir().unwrap();
let mut db = metric_db(&dir).await;
let q = r#"
query standalone() { match { $m: Metric $m.active = true } return { $m.name } }
query inline() { match { $m: Metric { active: true } } return { $m.name } }
query negated() { match { $m: Metric $m.active != true } return { $m.name } }
"#;
assert_eq!(sorted_metric_names(&mut db, q, "standalone").await, vec!["m1", "m3"]);
assert_eq!(sorted_metric_names(&mut db, q, "inline").await, vec!["m1", "m3"]);
assert_eq!(sorted_metric_names(&mut db, q, "negated").await, vec!["m2", "m4"]);
}
#[tokio::test]
async fn date_and_datetime_literal_filters_execute() {
let dir = tempfile::tempdir().unwrap();
let mut db = metric_db(&dir).await;
let q = r#"
query born_ge() { match { $m: Metric $m.born >= date("2024-01-01") } return { $m.name } }
query seen_lt() { match { $m: Metric $m.seen < datetime("2024-01-01T00:00:00Z") } return { $m.name } }
"#;
// born: m1 2024-06, m3 2025 >= 2024-01-01
assert_eq!(sorted_metric_names(&mut db, q, "born_ge").await, vec!["m1", "m3"]);
// seen: m2 2023, m4 2022 < 2024-01-01
assert_eq!(sorted_metric_names(&mut db, q, "seen_lt").await, vec!["m2", "m4"]);
}

View file

@ -941,8 +941,8 @@ async fn merge_pair_truth_table() {
unsupported_cells, 45,
"expected 45 cells involving dropProperty/addLabel/removeLabel"
);
assert!(
elapsed.as_secs() < 30,
"merge truth table exceeded 30s budget: {elapsed:?}"
);
// No wall-clock assertion here: `elapsed` is logged above for visibility, but
// a fixed time budget in a correctness test flakes under parallel test load
// (it tripped at ~31s in the full `--test-threads=4` gate while passing at
// ~20s in isolation). Merge-perf regressions belong in a bench, not here.
}

View file

@ -0,0 +1,134 @@
//! ORDER BY golden coverage: descending, multi-key precedence, deterministic
//! tie-break (total order), and NULL placement.
//!
//! These pin the observable output-ordering contract (deny-list: "output
//! ordering … become dependencies once shipped"). `apply_ordering` appends the
//! bound entities' key columns as an ascending tie-break, so equal user-sort
//! keys yield a TOTAL, deterministic order (and `ORDER … LIMIT` is
//! deterministic). NULL placement is `nulls_first = !descending` (NULLs first
//! under ASC, last under DESC). Both are documented in
//! `docs/user/query-language.md`.
mod helpers;
use arrow_array::{Array, StringArray};
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;
use omnigraph_compiler::result::QueryResult;
use helpers::*;
/// Names in result ROW order (not sorted) — these tests assert positional order.
fn names_in_order(result: &QueryResult) -> Vec<String> {
let batch = result.concat_batches().unwrap();
if batch.num_rows() == 0 {
return Vec::new();
}
let col = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
(0..col.len()).map(|i| col.value(i).to_string()).collect()
}
/// Init the standard schema and load a custom Person-only dataset.
async fn init_people(dir: &tempfile::TempDir, jsonl: &str) -> Omnigraph {
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, jsonl, LoadMode::Overwrite).await.unwrap();
db
}
#[tokio::test]
async fn ordering_descending() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let q = r#"
query q() {
match { $p: Person }
return { $p.name }
order { $p.age desc }
}
"#;
let got = names_in_order(&query_main(&mut db, q, "q", &ParamMap::new()).await.unwrap());
// Charlie(35), Alice(30), Diana(28), Bob(25)
assert_eq!(got, vec!["Charlie", "Alice", "Diana", "Bob"]);
}
#[tokio::test]
async fn ordering_multi_key_age_desc_name_asc() {
let dir = tempfile::tempdir().unwrap();
// Alice & Bob tie at age 30; loaded Bob-first so the expected output order
// cannot be the load order.
let data = r#"{"type":"Person","data":{"name":"Bob","age":30}}
{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Person","data":{"name":"Charlie","age":25}}"#;
let mut db = init_people(&dir, data).await;
let q = r#"
query q() {
match { $p: Person }
return { $p.name }
order { $p.age desc, $p.name asc }
}
"#;
let got = names_in_order(&query_main(&mut db, q, "q", &ParamMap::new()).await.unwrap());
// age desc -> [30,30,25]; the 30-tie broken by name asc -> Alice before Bob.
assert_eq!(got, vec!["Alice", "Bob", "Charlie"]);
}
#[tokio::test]
async fn ordering_tiebreak_by_key_is_deterministic() {
let dir = tempfile::tempdir().unwrap();
// Same tie at age 30, NO secondary sort key. Loaded Bob-first; the tie must
// break by the entity key (name) ascending -> Alice before Bob, regardless
// of load order. This locks the total-order tie-break in apply_ordering.
let data = r#"{"type":"Person","data":{"name":"Bob","age":30}}
{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Person","data":{"name":"Charlie","age":25}}"#;
let mut db = init_people(&dir, data).await;
let q = r#"
query q() {
match { $p: Person }
return { $p.name }
order { $p.age asc }
}
"#;
let got = names_in_order(&query_main(&mut db, q, "q", &ParamMap::new()).await.unwrap());
// age asc -> Charlie(25), then the 30-tie broken by key asc -> Alice, Bob.
assert_eq!(got, vec!["Charlie", "Alice", "Bob"]);
}
#[tokio::test]
async fn ordering_nulls_placement_asc_and_desc() {
let dir = tempfile::tempdir().unwrap();
// Bob has a NULL age.
let data = r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Person","data":{"name":"Bob","age":null}}
{"type":"Person","data":{"name":"Charlie","age":25}}"#;
let mut db = init_people(&dir, data).await;
let asc = r#"
query q() {
match { $p: Person }
return { $p.name }
order { $p.age asc }
}
"#;
let got_asc = names_in_order(&query_main(&mut db, asc, "q", &ParamMap::new()).await.unwrap());
// ASC: nulls_first -> Bob(null), then 25, 30.
assert_eq!(got_asc, vec!["Bob", "Charlie", "Alice"]);
let desc = r#"
query q() {
match { $p: Person }
return { $p.name }
order { $p.age desc }
}
"#;
let got_desc = names_in_order(&query_main(&mut db, desc, "q", &ParamMap::new()).await.unwrap());
// DESC: nulls last -> 30, 25, then Bob(null).
assert_eq!(got_desc, vec!["Alice", "Charlie", "Bob"]);
}

View file

@ -0,0 +1,311 @@
//! Property-based query-correctness invariants over generated graphs.
//!
//! The cross-type id-collision bug (fixed in f6a0e53) was a silent wrong-result
//! divergence between the two Expand modes, caught only because someone
//! hand-built the one colliding fixture. This turns that single example into a
//! search over the whole class: node keys for BOTH types are drawn from a small
//! SHARED alphabet, so cross-type collisions — plus cycles and self-loops —
//! arise frequently. The invariants make any future fork divergence (the planned
//! third ExpandMode, the anti-join fast/slow fork) fail loudly instead of
//! silently.
//!
//! Each test is a sync `#[test]` + `#[serial]`: it builds its own runtime and
//! `block_on`s per generated case (proptest closures are sync), and the
//! mode-equivalence test writes `OMNIGRAPH_TRAVERSAL_MODE`, so serial execution
//! keeps env writes from racing other tests in this binary.
mod helpers;
use std::collections::HashSet;
use arrow_array::{Array, StringArray};
use proptest::prelude::*;
use proptest::test_runner::{Config, TestRunner};
use serial_test::serial;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;
use omnigraph_compiler::query::ast::Literal;
use helpers::*;
/// Small SHARED key alphabet — Person and Company keys are both drawn from this,
/// so cross-type id collisions are common.
const KEYS: &[&str] = &["a", "b", "c", "d", "e"];
const QUERIES: &str = r#"
query friends($name: String) {
match {
$p: Person { name: $name }
$p knows{1,3} $f
}
return { $f.name }
}
query employers($name: String) {
match {
$p: Person { name: $name }
$p worksAt{1,2} $c
}
return { $c.name }
}
query all_persons() {
match { $p: Person }
return { $p.name }
}
query employed() {
match {
$p: Person
$p worksAt $c
}
return { $p.name }
}
query unemployed() {
match {
$p: Person
not { $p worksAt $_ }
}
return { $p.name }
}
"#;
#[derive(Debug, Clone)]
struct GenGraph {
persons: Vec<String>,
companies: Vec<String>,
knows: Vec<(usize, usize)>, // indices into persons (self-loops & cycles allowed)
works_at: Vec<(usize, usize)>, // (person idx, company idx)
}
impl GenGraph {
fn to_jsonl(&self) -> String {
let mut s = String::new();
for p in &self.persons {
s.push_str(&format!("{{\"type\":\"Person\",\"data\":{{\"name\":\"{p}\"}}}}\n"));
}
for c in &self.companies {
s.push_str(&format!("{{\"type\":\"Company\",\"data\":{{\"name\":\"{c}\"}}}}\n"));
}
// Dedup exact-duplicate edge rows (the loader rejects intra-batch
// duplicate keys); collisions/cycles/self-loops are unaffected.
let mut seen = HashSet::new();
for &(a, b) in &self.knows {
if seen.insert(("k", a, b)) {
s.push_str(&format!(
"{{\"edge\":\"Knows\",\"from\":\"{}\",\"to\":\"{}\"}}\n",
self.persons[a], self.persons[b]
));
}
}
for &(a, b) in &self.works_at {
if seen.insert(("w", a, b)) {
s.push_str(&format!(
"{{\"edge\":\"WorksAt\",\"from\":\"{}\",\"to\":\"{}\"}}\n",
self.persons[a], self.companies[b]
));
}
}
s
}
}
fn arb_keys() -> impl Strategy<Value = Vec<String>> {
proptest::sample::subsequence(KEYS.to_vec(), 1..=KEYS.len())
.prop_map(|v| v.into_iter().map(String::from).collect())
}
fn arb_graph() -> impl Strategy<Value = GenGraph> {
(arb_keys(), arb_keys()).prop_flat_map(|(persons, companies)| {
let np = persons.len();
let nc = companies.len();
let knows = prop::collection::vec((0..np, 0..np), 0..=10);
let works = prop::collection::vec((0..np, 0..nc), 0..=10);
(Just(persons), Just(companies), knows, works).prop_map(
|(persons, companies, knows, works_at)| GenGraph {
persons,
companies,
knows,
works_at,
},
)
})
}
fn config() -> Config {
Config {
cases: 48,
..Config::default()
}
}
fn clear_mode() {
unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") };
}
/// RAII guard that sets `OMNIGRAPH_TRAVERSAL_MODE` and clears it on drop — so a
/// panic mid-case (e.g. a query `unwrap`) cannot leak the forced mode into
/// proptest's subsequent shrink/cases and mask the divergence under test. SAFE:
/// every test in this binary is `#[serial]`, so no thread reads the env during
/// the write.
struct ModeGuard;
impl ModeGuard {
fn set(mode: &str) -> Self {
unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) };
ModeGuard
}
}
impl Drop for ModeGuard {
fn drop(&mut self) {
unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") };
}
}
async fn load_graph(graph: &GenGraph) -> (tempfile::TempDir, Omnigraph) {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, &graph.to_jsonl(), LoadMode::Overwrite)
.await
.unwrap();
(dir, db)
}
fn one_param(val: &str) -> ParamMap {
let mut m = ParamMap::new();
m.insert("name".to_string(), Literal::String(val.to_string()));
m
}
/// First-column strings, sorted (MULTISET — preserves duplicate-row count so
/// mode comparisons catch dedup divergence, not just set divergence).
async fn col0_sorted(db: &mut Omnigraph, name: &str, params: &ParamMap) -> Vec<String> {
let r = db
.query(ReadTarget::branch("main"), QUERIES, name, params)
.await
.unwrap();
if r.num_rows() == 0 {
return Vec::new();
}
let b = r.concat_batches().unwrap();
let col = b.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let mut v: Vec<String> = (0..col.len()).map(|i| col.value(i).to_string()).collect();
v.sort();
v
}
async fn col0_set(db: &mut Omnigraph, name: &str, params: &ParamMap) -> HashSet<String> {
col0_sorted(db, name, params).await.into_iter().collect()
}
// INVARIANT 1: mode equivalence. For any generated graph and start key, the
// CSR, indexed, and auto paths return identical result multisets — over both a
// same-type traversal (knows{1,3}, exercises cycles/self-loops) and a cross-type
// one (worksAt{1,2}, collision-prone). This is the search-over-the-class version
// of the hand-built cross-type-collision fixture.
#[test]
#[serial]
fn prop_expand_indexed_eq_csr() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut runner = TestRunner::new(config());
runner
.run(&arb_graph(), |graph| {
let mismatch = rt.block_on(async {
let (_dir, mut db) = load_graph(&graph).await;
for start in graph.persons.clone() {
let p = one_param(&start);
for q in ["friends", "employers"] {
// Each guard clears the mode on drop (end of the block,
// or on panic), so a forced mode never leaks across runs.
let csr = {
let _g = ModeGuard::set("csr");
col0_sorted(&mut db, q, &p).await
};
let indexed = {
let _g = ModeGuard::set("indexed");
col0_sorted(&mut db, q, &p).await
};
// No guard → env unset → auto (cost-based) path.
let auto = col0_sorted(&mut db, q, &p).await;
if csr != indexed || csr != auto {
return Some((start, q, csr, indexed, auto));
}
}
}
None
});
prop_assert!(
mismatch.is_none(),
"Expand mode divergence: {:?}",
mismatch
);
Ok(())
})
.unwrap();
}
// INVARIANT 2: no phantom rows. Every key a traversal returns must belong to the
// destination type's loaded key set — independent of the two-mode comparison, so
// it catches over-emission even if both modes are wrong identically.
#[test]
#[serial]
fn prop_results_subset_of_existing_nodes() {
clear_mode();
let rt = tokio::runtime::Runtime::new().unwrap();
let mut runner = TestRunner::new(config());
runner
.run(&arb_graph(), |graph| {
let bad = rt.block_on(async {
let (_dir, mut db) = load_graph(&graph).await;
let persons: HashSet<String> = graph.persons.iter().cloned().collect();
let companies: HashSet<String> = graph.companies.iter().cloned().collect();
for start in graph.persons.clone() {
let p = one_param(&start);
for f in col0_set(&mut db, "friends", &p).await {
if !persons.contains(&f) {
return Some(("friends", start, f));
}
}
for c in col0_set(&mut db, "employers", &p).await {
if !companies.contains(&c) {
return Some(("employers", start, c));
}
}
}
None
});
prop_assert!(bad.is_none(), "phantom row: {:?}", bad);
Ok(())
})
.unwrap();
}
// INVARIANT 3: anti-join complement. `not { $p worksAt $_ }` and its complement
// (persons WITH a worksAt) must be disjoint and together cover all persons.
#[test]
#[serial]
fn prop_antijoin_partitions_persons() {
clear_mode();
let rt = tokio::runtime::Runtime::new().unwrap();
let mut runner = TestRunner::new(config());
runner
.run(&arb_graph(), |graph| {
let err = rt.block_on(async {
let (_dir, mut db) = load_graph(&graph).await;
let all = col0_set(&mut db, "all_persons", &ParamMap::new()).await;
let unemployed = col0_set(&mut db, "unemployed", &ParamMap::new()).await;
let employed = col0_set(&mut db, "employed", &ParamMap::new()).await;
let overlap: Vec<_> = unemployed.intersection(&employed).cloned().collect();
let union: HashSet<_> = unemployed.union(&employed).cloned().collect();
if !overlap.is_empty() {
return Some(format!("overlap {overlap:?}"));
}
if union != all {
return Some(format!("union {union:?} != all {all:?}"));
}
None
});
prop_assert!(err.is_none(), "anti-join partition broken: {:?}", err);
Ok(())
})
.unwrap();
}

View file

@ -556,6 +556,111 @@ async fn bm25_returns_ranked_results() {
assert!(result.num_rows() <= 3, "bm25 should respect limit 3");
}
// Full rank-ORDER golden (not just top-1 / non-empty): pins ranks 2..k so a
// regression corrupting the tail or reversing the sort direction fails loudly.
// nearest skips apply_ordering (is_search_ordered) and returns Lance native
// order, so result_slugs row order == rank order.
#[tokio::test]
#[serial]
async fn nearest_full_rank_order() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_search_db(&dir).await;
let result = query_main(
&mut db,
SEARCH_QUERIES,
"vector_search",
&vector_param("$q", &[0.1, 0.2, 0.3, 0.4]),
)
.await
.unwrap();
// [0.1,0.2,0.3,0.4] == ml-intro's embedding (dist 0); the rest by ascending L2.
assert_eq!(result_slugs(&result), vec!["ml-intro", "nlp-guide", "rl-intro"]);
}
#[tokio::test]
#[serial]
async fn bm25_full_rank_order() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_search_db(&dir).await;
let result = query_main(
&mut db,
SEARCH_QUERIES,
"bm25_search",
&params(&[("$q", "Learning")]),
)
.await
.unwrap();
// Descending BM25 score order.
assert_eq!(result_slugs(&result), vec!["rl-intro", "ml-intro", "dl-basics"]);
}
// Characterization: fuzzy() does NOT match under the default tokenizer/index in
// this setup — a one-edit typo ("Introductio" for "Introduction") returns no
// rows. (`search`/`match_text` DO work, so FTS itself is fine; fuzzy term
// queries specifically are inert here.) This pins that documented limitation
// instead of leaving fuzzy silently unasserted: if a Lance/tokenizer change
// makes fuzzy match, this turns red and should be promoted to a real
// matched-set + exclusion golden.
#[tokio::test]
#[serial]
async fn fuzzy_does_not_match_under_default_tokenizer() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_search_db(&dir).await;
let r = query_main(&mut db, SEARCH_QUERIES, "fuzzy_search", &params(&[("$q", "Introductio")]))
.await
.unwrap();
assert!(
result_slugs(&r).is_empty(),
"fuzzy now matches — promote this to a real matched-set/exclusion golden"
);
}
// match_text is a FILTER on the body: assert the exact matched set, not contains.
#[tokio::test]
#[serial]
async fn match_text_matches_exact_set_excludes_unrelated() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_search_db(&dir).await;
// "neural" appears only in dl-basics's body ("neural networks").
let r = query_main(&mut db, SEARCH_QUERIES, "phrase_search", &params(&[("$q", "neural")]))
.await
.unwrap();
let mut got = result_slugs(&r);
got.sort();
assert_eq!(got, vec!["dl-basics"]);
}
// RRF fuses arms OTHER than the default nearest+bm25: two FTS arms (title+body).
// Proves primary_var resolves when neither arm is `nearest`, and fusion runs.
#[tokio::test]
#[serial]
async fn rrf_fuses_two_fts_fields() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_search_db(&dir).await;
let r = query_main(&mut db, SEARCH_QUERIES, "rrf_two_fts", &params(&[("$q", "learning")]))
.await
.unwrap();
assert_eq!(result_slugs(&r), vec!["dl-basics", "ml-intro", "rl-intro"]);
}
// RRF fuses two vector arms (no embedding creds — explicit vectors). A doc near
// BOTH query vectors out-ranks one near only one.
#[tokio::test]
#[serial]
async fn rrf_fuses_two_vector_queries() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_search_db(&dir).await;
let r = query_main(
&mut db,
SEARCH_QUERIES,
"rrf_two_vectors",
&two_vector_params("$q1", &[0.1, 0.2, 0.3, 0.4], "$q2", &[0.5, 0.6, 0.7, 0.8]),
)
.await
.unwrap();
assert_eq!(result_slugs(&r), vec!["rl-intro", "ml-intro", "dl-basics"]);
}
#[tokio::test]
#[serial]
async fn mutation_commit_refreshes_search_indices_without_manual_ensure() {

View file

@ -46,6 +46,194 @@ query not_at_acme() {
assert_eq!(names_vec, vec!["Bob", "Charlie", "Diana"]);
}
// Nested anti-join (double negation): proves `not { … not { … } }` recurses
// through execute_pipeline. "People who do NOT work at any NON-Acme company":
// inner `not { $c.name = "Acme" }` keeps the non-Acme employers, the outer `not`
// removes anyone who has one. Alice (Acme only), Charlie & Diana (no employer)
// remain — distinct from plain unemployed {Charlie, Diana}.
#[tokio::test]
async fn nested_anti_join_double_negation() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query no_nonacme_employer() {
match {
$p: Person
not {
$p worksAt $c
not {
$c.name = "Acme"
}
}
}
return { $p.name }
}
"#;
let result = query_main(&mut db, queries, "no_nonacme_employer", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
assert_eq!(names_vec, vec!["Alice", "Charlie", "Diana"]);
}
// The anti-join has two execution forks: the CSR `has_neighbors` fast path
// (bare single-op Expand inner) and the set-oriented inner-pipeline replay (when
// dst_filters force a multi-op inner). They must agree. `not { $p worksAt $_ }`
// takes the fast path; the same negation with an always-true dst filter
// (`$c.name != ""`) is semantically identical but forces the slow path.
#[tokio::test]
async fn anti_join_fast_and_slow_paths_agree() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query fast() {
match {
$p: Person
not { $p worksAt $_ }
}
return { $p.name }
}
query slow() {
match {
$p: Person
not {
$p worksAt $c
$c.name != ""
}
}
return { $p.name }
}
"#;
let names = |result: omnigraph_compiler::result::QueryResult| {
let batch = result.concat_batches().unwrap();
let col = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut v: Vec<String> = (0..col.len()).map(|i| col.value(i).to_string()).collect();
v.sort();
v
};
let fast = names(query_main(&mut db, queries, "fast", &ParamMap::new()).await.unwrap());
let slow = names(query_main(&mut db, queries, "slow", &ParamMap::new()).await.unwrap());
assert_eq!(fast, slow, "anti-join fast and slow paths must agree");
// Alice->Acme, Bob->Globex employed; Charlie & Diana have no employer.
assert_eq!(fast, vec!["Charlie", "Diana"]);
}
// Regression: nested slow-path anti-joins must not collide on the synthetic
// correlation tag. The outer anti-join tags rows with a correlation column that
// rides through its inner pipeline; when the inner pipeline contains ANOTHER
// slow-path anti-join, a fixed tag name would duplicate, and reading it by name
// returns the OUTER tag — mis-correlating the inner negation. Fan-out (p1 works
// at two companies) makes the inner row indices diverge from the outer tags, so
// the bug produces a different person set than the correct one.
#[tokio::test]
async fn nested_anti_join_with_fanout_correlates_correctly() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// p1 -> {Acme, Globex} (fan-out), p2 -> Globex, p3 -> Acme, p4 -> (none).
let data = r#"{"type":"Person","data":{"name":"p1"}}
{"type":"Person","data":{"name":"p2"}}
{"type":"Person","data":{"name":"p3"}}
{"type":"Person","data":{"name":"p4"}}
{"type":"Company","data":{"name":"Acme"}}
{"type":"Company","data":{"name":"Globex"}}
{"edge":"WorksAt","from":"p1","to":"Acme"}
{"edge":"WorksAt","from":"p1","to":"Globex"}
{"edge":"WorksAt","from":"p2","to":"Globex"}
{"edge":"WorksAt","from":"p3","to":"Acme"}"#;
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
let queries = r#"
query no_nonacme_employer() {
match {
$p: Person
not {
$p worksAt $c
not {
$c.name = "Acme"
}
}
}
return { $p.name }
}
"#;
let result = query_main(&mut db, queries, "no_nonacme_employer", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
// p1 & p2 have a non-Acme employer (Globex) -> excluded; p3 (Acme only) and
// p4 (no employer) remain.
assert_eq!(names_vec, vec!["p3", "p4"]);
}
// Regression: a multi-hop anti-join must not take the bulk fast path. The fast
// path answers via `has_neighbors` (ONE-hop existence), so `not { $p knows{2,2}
// $x }` would wrongly drop a node that has a 1-hop neighbor but no 2-hop path.
// Graph: a->b (b is a sink, so a has no 2-hop path), c->d->e (c has a 2-hop
// path). Only c has a 2-hop knows path, so only c is removed.
#[tokio::test]
async fn anti_join_respects_multi_hop_bounds() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let data = r#"{"type":"Person","data":{"name":"a"}}
{"type":"Person","data":{"name":"b"}}
{"type":"Person","data":{"name":"c"}}
{"type":"Person","data":{"name":"d"}}
{"type":"Person","data":{"name":"e"}}
{"edge":"Knows","from":"a","to":"b"}
{"edge":"Knows","from":"c","to":"d"}
{"edge":"Knows","from":"d","to":"e"}"#;
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
let queries = r#"
query no_two_hop() {
match {
$p: Person
not { $p knows{2,2} $x }
}
return { $p.name }
}
"#;
let result = query_main(&mut db, queries, "no_two_hop", &ParamMap::new())
.await
.unwrap();
let batch = result.concat_batches().unwrap();
let names = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut names_vec: Vec<&str> = (0..names.len()).map(|i| names.value(i)).collect();
names_vec.sort();
// Only c has a 2-hop knows path → removed; everyone else (incl. a, which has
// a 1-hop neighbor but no 2-hop path) is kept.
assert_eq!(names_vec, vec!["a", "b", "d", "e"]);
}
// ─── Variable-length hops ───────────────────────────────────────────────────
const CHAIN_SCHEMA: &str = r#"

View file

@ -0,0 +1,327 @@
//! BTREE-indexed Expand path (`execute_expand_indexed`) coverage.
//!
//! These tests force the Expand execution mode via `OMNIGRAPH_TRAVERSAL_MODE`
//! and assert the indexed path matches the CSR path (both are semantically
//! identical — the indexed path just serves neighbor lookups from the persisted
//! src/dst BTREE instead of an in-memory CSR). They live in their own test
//! binary and are all `#[serial]`, so the env writes never race a concurrent
//! reader: within this process serial execution serializes every env read, and
//! other test binaries (e.g. `traversal.rs`) are separate processes whose env
//! stays unset (→ CSR), validating the shared hydrate/align tail on the CSR path.
mod helpers;
use arrow_array::{Array, StringArray};
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::{IndexCoverage, TableStore};
use omnigraph_compiler::ir::ParamMap;
use serial_test::serial;
use helpers::*;
fn set_mode(mode: &str) {
// SAFE: every test here is #[serial] and this binary has no non-serial
// env reader, so no thread reads the environment during this write.
unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) };
}
fn clear_mode() {
unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") };
}
/// Run a name-returning query and return its first column, sorted.
async fn sorted_names(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec<String> {
let result = query_main(db, queries, name, params).await.unwrap();
if result.num_rows() == 0 {
return Vec::new();
}
let batch = result.concat_batches().unwrap();
let col = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut v: Vec<String> = (0..col.len()).map(|i| col.value(i).to_string()).collect();
v.sort();
v
}
/// Run the same query under CSR, indexed, and auto (cost-chooser) modes; assert
/// all three produce identical results and return them. The auto pass exercises
/// `choose_expand_mode` end to end: whichever path it selects, the rows must
/// match the forced paths (the chooser changes which path runs, never the result).
async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec<String> {
set_mode("csr");
let csr = sorted_names(db, queries, name, params).await;
set_mode("indexed");
let indexed = sorted_names(db, queries, name, params).await;
clear_mode();
let auto = sorted_names(db, queries, name, params).await;
assert_eq!(
indexed, csr,
"indexed Expand must produce identical results to CSR for query '{name}'"
);
assert_eq!(
auto, csr,
"auto (cost-chooser) Expand must produce identical results to the forced paths for query '{name}'"
);
indexed
}
// The C6 index-coverage guard: `key_column_index_coverage` must report whether
// a `key_col IN (...)` scan will use the persisted BTREE or silently full-scan.
// Not #[serial] — it calls the helper directly and reads no env.
#[tokio::test]
async fn key_column_index_coverage_detects_btree_presence() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let snap = snapshot_main(&db).await.unwrap();
// Edge `src` gets a BTREE from ensure_indices on load → Indexed.
let edge_ds = snap.open("edge:Knows").await.unwrap();
let src_cov = TableStore::key_column_index_coverage(&edge_ds, "src")
.await
.unwrap();
assert_eq!(src_cov, IndexCoverage::Indexed, "edge src is BTREE-indexed");
// A node property column with no scalar index → Degraded (the warn path).
let node_ds = snap.open("node:Person").await.unwrap();
let age_cov = TableStore::key_column_index_coverage(&node_ds, "age")
.await
.unwrap();
assert!(
matches!(age_cov, IndexCoverage::Degraded { .. }),
"non-indexed column should be Degraded, got {age_cov:?}"
);
}
// An edge appended after the BTREE was built lands in a new fragment that the
// index does not cover (edge-index creation is skipped once a BTREE exists). The
// scan is then partly a full scan, so coverage must report `Degraded` — otherwise
// the cost chooser would price an unindexed-in-part scan as fully indexed.
// (Results stay correct regardless — `indexed_finds_unindexed_appended_edge`.)
#[tokio::test]
async fn coverage_degrades_for_appended_unindexed_fragment() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Fresh load: the Knows BTREE covers every fragment → Indexed.
let snap = snapshot_main(&db).await.unwrap();
let edge_ds = snap.open("edge:Knows").await.unwrap();
assert_eq!(
TableStore::key_column_index_coverage(&edge_ds, "src").await.unwrap(),
IndexCoverage::Indexed,
"freshly-loaded edge BTREE covers all fragments"
);
// Append an edge → a new, unindexed fragment outside the index fragment_bitmap.
mutate_main(
&mut db,
MUTATION_QUERIES,
"add_friend",
&params(&[("$from", "Alice"), ("$to", "Diana")]),
)
.await
.unwrap();
let snap2 = snapshot_main(&db).await.unwrap();
let edge_ds2 = snap2.open("edge:Knows").await.unwrap();
let cov = TableStore::key_column_index_coverage(&edge_ds2, "src").await.unwrap();
assert!(
matches!(cov, IndexCoverage::Degraded { .. }),
"appended unindexed fragment must degrade coverage, got {cov:?}"
);
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_one_hop_same_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// friends_of: `$p knows $f` (Person -> Person, single hop).
let got = both_modes(&mut db, TEST_QUERIES, "friends_of", &params(&[("$name", "Alice")])).await;
assert_eq!(got, vec!["Bob", "Charlie"], "Alice knows Bob and Charlie");
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_multi_hop_same_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query reach($name: String) {
match {
$p: Person { name: $name }
$p knows{1,2} $f
}
return { $f.name }
}
"#;
// Alice -> Bob, Charlie (1 hop); Bob -> Diana (2 hops).
let got = both_modes(&mut db, queries, "reach", &params(&[("$name", "Alice")])).await;
assert_eq!(got, vec!["Bob", "Charlie", "Diana"]);
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_cross_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let queries = r#"
query employer($name: String) {
match {
$p: Person { name: $name }
$p worksAt $c
}
return { $c.name }
}
"#;
let got = both_modes(&mut db, queries, "employer", &params(&[("$name", "Alice")])).await;
assert_eq!(got, vec!["Acme"], "Alice works at Acme");
}
#[tokio::test]
#[serial]
async fn indexed_matches_csr_no_match() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Diana has no outgoing Knows edges → empty in both modes.
let got = both_modes(&mut db, TEST_QUERIES, "friends_of", &params(&[("$name", "Diana")])).await;
assert!(got.is_empty(), "Diana knows no one");
}
#[tokio::test]
#[serial]
async fn indexed_finds_unindexed_appended_edge() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Append Alice -> Diana AFTER the initial load. `ensure_indices`' existence
// guard means the src/dst BTREE built on the first load does NOT cover this
// new fragment. The indexed path must still find it via Lance's
// unindexed-fragment scan (fast_search=false default), so partial index
// coverage never silently drops rows.
mutate_main(
&mut db,
MUTATION_QUERIES,
"add_friend",
&params(&[("$from", "Alice"), ("$to", "Diana")]),
)
.await
.unwrap();
set_mode("indexed");
let got = sorted_names(&mut db, TEST_QUERIES, "friends_of", &params(&[("$name", "Alice")])).await;
clear_mode();
assert_eq!(
got,
vec!["Bob", "Charlie", "Diana"],
"indexed traversal must see the freshly-appended, unindexed edge"
);
}
// Regression: a node `id` is unique only WITHIN a type, so a `Person` and a
// `Company` can share an id string. A variable-length traversal over a
// cross-type edge (`worksAt`, Person -> Company) must structurally stop after
// one hop — a Company is not a `worksAt` source — so `worksAt{1,2}` returns
// exactly the one-hop companies. Before the structural hop-cap, the indexed
// path's single string interner de-interned the hop-1 Company id back to the
// colliding Person id and ran a hop-2 `worksAt src IN (...)` scan that matched
// that same-string Person's edges, emitting a spurious second-hop company the
// CSR path never produces. `both_modes` (csr == indexed == auto) plus the
// golden assert catch both the divergence and an over-emitting shared bug.
#[tokio::test]
#[serial]
async fn cross_type_id_collision_does_not_bleed_into_second_hop() {
const SCHEMA: &str = r#"
node Person { name: String @key }
node Company { name: String @key }
edge WorksAt: Person -> Company
"#;
// `shared` is BOTH a Person id and a Company id. alice worksAt the Company
// `shared`; the Person `shared` worksAt the Company `other`.
const DATA: &str = r#"{"type":"Person","data":{"name":"alice"}}
{"type":"Person","data":{"name":"shared"}}
{"type":"Company","data":{"name":"shared"}}
{"type":"Company","data":{"name":"other"}}
{"edge":"WorksAt","from":"alice","to":"shared"}
{"edge":"WorksAt","from":"shared","to":"other"}"#;
const QUERY: &str = r#"
query reach($name: String) {
match {
$p: Person { name: $name }
$p worksAt{1,2} $c
}
return { $c.name }
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
load_jsonl(&mut db, DATA, LoadMode::Overwrite).await.unwrap();
let got = both_modes(&mut db, QUERY, "reach", &params(&[("$name", "alice")])).await;
assert_eq!(
got,
vec!["shared"],
"cross-type worksAt{{1,2}} must return only the one-hop company; a hop-2 \
result means the id-string collision bled across types"
);
}
const REACH_5: &str = r#"
query reach($name: String) {
match {
$p: Person { name: $name }
$p knows{1,5} $f
}
return { $f.name }
}
"#;
// A directed 3-cycle a->b->c->a, traversed with a hop ceiling (5) ABOVE the cycle
// length. Variable-length traversal must terminate and dedup (the source is
// seeded into `visited`, so the c->a back-edge does not re-emit a). Uses a
// bounded range deliberately: an unbounded `{1,}` is a typecheck error, not a
// runtime path. `both_modes` also confirms indexed == csr on the cycle.
#[tokio::test]
#[serial]
async fn variable_hops_terminate_and_dedup_on_cycle() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let data = r#"{"type":"Person","data":{"name":"a"}}
{"type":"Person","data":{"name":"b"}}
{"type":"Person","data":{"name":"c"}}
{"edge":"Knows","from":"a","to":"b"}
{"edge":"Knows","from":"b","to":"c"}
{"edge":"Knows","from":"c","to":"a"}"#;
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
let got = both_modes(&mut db, REACH_5, "reach", &params(&[("$name", "a")])).await;
// From a: b (1 hop), c (2 hops); the c->a back-edge hits the seeded source
// and is not re-emitted. No infinite loop, each node at most once.
assert_eq!(got, vec!["b", "c"]);
}
// A self-loop a->a plus a->b. Variable-length traversal must not loop forever and
// must not re-emit the seeded source.
#[tokio::test]
#[serial]
async fn variable_hops_handle_self_loop() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let data = r#"{"type":"Person","data":{"name":"a"}}
{"type":"Person","data":{"name":"b"}}
{"edge":"Knows","from":"a","to":"a"}
{"edge":"Knows","from":"a","to":"b"}"#;
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
let got = both_modes(&mut db, REACH_5, "reach", &params(&[("$name", "a")])).await;
// a->a hits the seeded source (pruned); only b is reached.
assert_eq!(got, vec!["b"]);
}

View file

@ -13,6 +13,10 @@
| Maintenance concurrency | `OMNIGRAPH_MAINTENANCE_CONCURRENCY=8` | `db/omnigraph/optimize.rs` |
| Lance blob compaction support | `LANCE_SUPPORTS_BLOB_COMPACTION = false` | `db/omnigraph/optimize.rs` |
| Graph index cache size | `8` (LRU) | `runtime_cache.rs` |
| Expand indexed-path frontier ceiling | `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER=1024` | `exec/query.rs` |
| Expand indexed-path hop ceiling | `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS=6` | `exec/query.rs` |
| Expand CSR-build cost factor | `CSR_BUILD_FACTOR = 1.5` | `exec/query.rs` |
| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = cost-based auto) | `exec/query.rs` |
| Default body limit | `1 MB` | `omnigraph-server/lib.rs` |
| Ingest body limit | `32 MB` | `omnigraph-server/lib.rs` |
| Engine embed model | `gemini-embedding-2-preview` | `omnigraph/embedding.rs` |
@ -21,3 +25,16 @@
| Embed retries | `4` | both clients |
| Embed retry backoff | `200 ms` | both clients |
| LANCE memory pool default | `1 GB` (raised in v0.3.0) | runtime |
**Expand traversal dispatch.** With `OMNIGRAPH_TRAVERSAL_MODE` unset, the engine
chooses the indexed (per-hop BTREE) vs CSR (whole-graph in-memory) path with a
cost model over cheap manifest counts (frontier size, |E|, source-vertex count,
hops) plus the index-coverage signal: the indexed path is preferred when its
frontier-relative work beats building the CSR (≈ when `hops × frontier` is a
small fraction of the source-vertex set), and CSR is preferred for dense/deep
traversals or when the BTREE coverage is degraded and a full scan would be paid
per hop. The two ceilings bound the **initial dispatch** frontier/hops (beyond
them CSR is always used); they are not a hard per-hop bound — the cost model
*estimates* total indexed work as ~`hops × frontier × fanout`, so dense fan-out is
priced toward CSR rather than capped mid-traversal. The override flag forces a path (the `auto` result is identical either way;
only the path differs).

View file

@ -21,6 +21,6 @@ This is OmniGraph-specific (not Lance):
- `TypeIndex`: dense `u32 ↔ String id` mapping per node type.
- `CsrIndex`: Compressed Sparse Row representation of edges per edge type — `offsets[i]..offsets[i+1]` slices into `targets`.
- `GraphIndex { type_indices, csr (out), csc (in) }` — built on demand from a snapshot's edge tables.
- `GraphIndex { type_indices, csr (out), csc (in) }` — built on demand from a snapshot's edge tables, **lazily**: only when an `Expand` the planner routes to the CSR path (dense / large frontier) or an `AntiJoin` actually needs it.
- Cached in `RuntimeCache::graph_indices` (LRU, max 8 entries, keyed by snapshot id + edge table versions).
- Built only when an `Expand` or `AntiJoin` IR op is present in the lowered query, so pure scans skip it.
- Selective `Expand`s resolve neighbors from the persisted `src`/`dst` BTREE instead (one indexed scan per hop) and never trigger the CSR build; see [query-language](query-language.md) → Expand. Pure scans, and queries served entirely by the indexed traversal path, skip it.

View file

@ -55,6 +55,8 @@ Used inside MATCH or as expressions inside RETURN/ORDER:
- `order { <expr> [asc|desc], … }` — supports plain expressions and `nearest(...)`.
- `limit <integer>` — required when there is a `nearest(...)` ordering.
- **Total, deterministic order.** Rows with equal user-sort keys are broken by the bound entities' key columns (`<var>.id`, ascending) appended as a final tie-break, so the result is a *total* order — reproducible across runs, and `order … limit N` returns a deterministic top-N even when ties straddle the cutoff. (Aggregate results have no entity-key columns; their group rows are already distinct on the projected group keys.)
- **NULL placement** is *nulls-first ascending, nulls-last descending* (i.e. `nulls_first = !descending`): a NULL sorts as if smaller than any value.
## Mutation statements
@ -79,7 +81,7 @@ Reason: under the staged-write rewire (MR-794), inserts and updates accumulate i
Pipeline operations:
- `NodeScan { variable, type_name, filters }`
- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune.
- `Expand { src_var, dst_var, edge_type, direction (Out|In), dst_type, min_hops, max_hops, dst_filters }` — destination filters are pushed *into* the expand so Lance scalar pushdown can prune. Executed one of two ways, chosen per-expand by a cost model over cheap manifest counts (frontier size, |E|, source-vertex count, hops) plus index coverage: selective traversals (small frontier relative to the source set) resolve neighbors from the persisted `src`/`dst` BTREE (one indexed scan per hop); dense / deep / large-frontier traversals — or those whose BTREE coverage is degraded so a full scan would be paid per hop — use the in-memory CSR adjacency index. Both produce identical results. The `OMNIGRAPH_EXPAND_INDEXED_MAX_FRONTIER` / `OMNIGRAPH_EXPAND_INDEXED_MAX_HOPS` ceilings bound the *initial dispatch* frontier/hops (beyond them CSR is always used); the cost model estimates total indexed work as ~`hops × frontier × fanout` and prices dense fan-out toward CSR — they are not a hard per-hop bound. `OMNIGRAPH_TRAVERSAL_MODE=indexed|csr` forces a mode (see [constants](constants.md)).
- `Filter { left, op, right }`
- `AntiJoin { outer_var, inner: Vec<IROp> }` — for `not { … }`