Two factual mismatches caught during code-grounded re-review: - docs/architecture.md: "13 cmd families" was stale — the CLI has 17 Command variants (Version, Embed, Init, Load, Ingest, Branch, Schema, Query, Snapshot, Export, Run, Commit, Read, Change, Policy, Optimize, Cleanup). Replaced the count with "command families" so the diagram doesn't drift again. - docs/execution.md: the mutation prose said "every mutation runs on a fresh __run__<id> branch", which over-claims. mutation.rs:555 short- circuits when the target is already a __run__ branch — the assumption there is the caller is managing the surrounding run lifecycle. Added a one-paragraph caveat noting the exception with the file:line citation. Both diagrams unchanged; only annotations / counts adjusted.
8.1 KiB
Query Execution, Mutations, and Loading
Query execution (exec/query.rs)
Pipeline:
- Parse + typecheck via
omnigraph-compiler. - Lower to IR.
- If
ExpandorAntiJoinis present, build (or fetch fromRuntimeCache) aGraphIndex. - Run
execute_queryagainst the snapshot.
Read flow — sequence
sequenceDiagram
autonumber
participant client as Client
participant og as Omnigraph::query<br/>(query.rs:7)
participant cmp as omnigraph-compiler
participant exec as execute_query<br/>(query.rs:347)
participant gi as GraphIndex<br/>(RuntimeCache)
participant ts as table_store
participant lance as Lance scanner
client->>og: query(target, source, name, params)
og->>og: ensure_schema_state_valid()<br/>resolve target → snapshot
og->>cmp: parse + typecheck_query (typecheck.rs:83)
cmp-->>og: CheckedQuery
og->>cmp: lower_query (lower.rs:11)
cmp-->>og: QueryIR (pipeline of IROp)
og->>exec: extract_search_mode + dispatch (query.rs:110)
exec->>gi: build / fetch GraphIndex<br/>(if Expand or AntiJoin)
gi-->>exec: CSR / CSC topology
loop for each IROp in pipeline
exec->>ts: scan with predicate / SIP
ts->>lance: filter · nearest · full_text_search
lance-->>ts: Stream of RecordBatch
ts-->>exec: RecordBatch stream
exec->>exec: factorize · expand · fuse · project
end
exec-->>og: QueryResult (RecordBatches)
og-->>client: serialized result
Code paths:
- Entry:
Omnigraph::queryatcrates/omnigraph/src/exec/query.rs:7 - Search-mode extraction:
extract_search_modeatcrates/omnigraph/src/exec/query.rs:110 - Pipeline runner:
execute_queryatcrates/omnigraph/src/exec/query.rs:347 - RRF fan-out:
execute_rrf_queryatcrates/omnigraph/src/exec/query.rs:393 - Per-source-row BFS:
execute_expandatcrates/omnigraph/src/exec/query.rs:675 - Lance scan + pushdown:
execute_node_scanatcrates/omnigraph/src/exec/query.rs:1027 - Filter → SQL pushdown:
build_lance_filteratcrates/omnigraph/src/exec/query.rs:1158
Multi-modal search modes (SearchMode)
The executor recognizes three modes that may be combined in a single query:
nearest— vector ANN (uses Lance vector index;LIMITrequired).bm25— BM25 over an inverted index.rrf— Reciprocal Rank Fusion of two rankings, with k (default 60).
Hybrid example: order { rrf(nearest($d.embedding, $q), bm25($d.body, $q_text)) desc } limit 20.
Joins / set operations
- Joins are implicit: MATCH bindings + traversals are implemented as scans + CSR/CSC lookups.
not { … }lowers to anAntiJoinover the inner pipeline.
Scoped reads
query(target, source, name, params)— at any branch or snapshot.run_query_at(version, …)— direct historical query at a manifest version.
Concurrency
- Snapshot isolation per query: all reads inside a query use the same
Snapshot. - Readers and writers on different branches don't block each other.
Mutation execution (exec/mutation.rs)
Resolves expression values to literals, converts to typed Arrow arrays (literal_to_typed_array(lit, DataType, num_rows)), then writes:
insert→ LanceWriteMode::Appendupdate→ Lancemerge_insert(WhenMatched::Update)delete→ Lancemerge_insert(WhenMatched::Delete)(logical) or filtered overwrite.
Multi-statement mutations are atomic at the manifest commit boundary.
Mutation flow — sequence
sequenceDiagram
autonumber
participant client as Client
participant og as Omnigraph::mutate<br/>(mutation.rs:511)
participant cmp as omnigraph-compiler
participant runs as RunRegistry
participant ts as table_store
participant lance as Lance dataset
participant mr as ManifestRepo<br/>(manifest.rs:280)
client->>og: mutate(target, source, name, params)
og->>cmp: parse + typecheck_query
cmp-->>og: CheckedQuery (Mutation IR)
og->>runs: begin_run(target, op_hash)<br/>fork __run__<id> from target head
runs-->>og: RunRecord
loop for each mutation statement (on __run__<id>)
og->>og: resolve expression literals<br/>literal_to_typed_array(lit, type, n)
alt insert
og->>ts: append RecordBatches
ts->>lance: WriteMode::Append → new fragment(s)
else update
og->>ts: merge_insert keyed by id
ts->>lance: merge_insert(WhenMatched::Update)
else delete
og->>ts: merge_insert with delete predicate
ts->>lance: merge_insert(WhenMatched::Delete)
end
lance-->>ts: new dataset version
og->>mr: commit_updates(SubTableUpdate)<br/>per-statement commit on __run__<id>
mr-->>og: ack
end
og->>og: OCC: target head unchanged since begin_run?
og->>og: publish_run(run_id)
alt fast path (target hasn't moved)
og->>mr: commit_updates_on_branch(target, updates)<br/>promote run snapshot
else merge path (target advanced)
og->>og: branch_merge_internal(__run__<id>, target)<br/>three-way merge
end
mr-->>og: new target snapshot
og->>runs: terminate_run(Published)
og-->>client: MutationResult
Code paths:
- Entry:
Omnigraph::mutateatcrates/omnigraph/src/exec/mutation.rs:511 - Per-mutation orchestration:
mutate_with_current_actoratcrates/omnigraph/src/exec/mutation.rs:539 - Per-statement commit on the run-branch:
commit_updates(called fromexecute_insert/execute_update/execute_deleteincrates/omnigraph/src/exec/mutation.rs) - Run publish:
Omnigraph::publish_runatcrates/omnigraph/src/db/omnigraph.rs:858 - Manifest commit primitive:
ManifestRepo::commitatcrates/omnigraph/src/db/manifest.rs:280(called from both per-statementcommit_updatesand the publish path)
Multi-statement mutations don't get atomicity from a single final commit — they get it from the run-branch + publish_run pattern. By default a mutation forks a fresh __run__<id> branch (begin_run); each statement individually commits its sub-table updates to that run-branch. After all statements complete, an OCC pre-check verifies the target hasn't moved since the run started, then publish_run atomically promotes the run-branch into the target — either via the fast path (direct promotion if the target hasn't moved) or a three-way merge. That final publish is what gives multi-statement mutations their atomicity guarantee (per docs/invariants.md §VI.26). If anything fails mid-run, the run is failed and the run-branch is dropped without affecting the target.
One exception: if the caller already targets a __run__<id> branch (mutation.rs:555), the mutation runs directly on that branch with no nested run wrapping — the assumption is the caller is managing the surrounding run lifecycle themselves. See runs.md for the full run lifecycle.
Bulk loader (loader/mod.rs)
- JSONL only in v1, with two record shapes:
- Node:
{"type":"NodeType", "data":{…}} - Edge:
{"edge":"EdgeType", "from":"src_id", "to":"dst_id", "data":{…}}
- Node:
- Lines starting with
//are treated as comments. - Schema validation on every row (typecheck, required props, blob base64 decoding).
- Edge endpoint resolution by node
@key.
Load modes (LoadMode)
| Mode | Semantics |
|---|---|
Overwrite |
Replace all data in the target tables on the branch |
Append |
Strict insert; duplicates error |
Merge |
Upsert by id (merge_insert) |
load vs ingest
load(branch, data, mode)— direct load to a branch.ingest(branch, from, data, mode)— branch-creating, transactional load:- If target advanced since the run started, fork a fresh run branch from
from. - Load into the run branch (Append).
- If target hasn't moved, fast-publish; otherwise abort.
- If target advanced since the run started, fork a fresh run branch from
- Returns
IngestResult { branch, base_branch, branch_created, mode, tables[] }. ingest_as(actor_id)records the actor on the resulting commit.
Embeddings during load
If a node type has @embed properties, the loader calls the engine embedding client (Gemini, RETRIEVAL_DOCUMENT) per row to populate the vector column. See embeddings.md.