From 5243c048aa2b464818a22a2b4f51f23c2bda819a Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Wed, 17 Jun 2026 13:25:20 +0200 Subject: [PATCH] perf(engine): remove the per-query metadata re-derivation tax on warm reads (#268) * test(engine): add read-path IO instrumentation seam for warm-read cost tests Prerequisite seam for the query-latency fixes. Adds crates/omnigraph/src/instrumentation.rs: - CountingStorageAdapter: a StorageAdapter decorator counting per-method reads (read_text/exists/read_text_versioned/list_dir), for the schema-contract reads on the query path. - A per-query task-local (QueryIoProbes) carrying Lance WrappingObjectStore wrappers per open category plus a probe counter, delivered via with_query_io_probes. open_dataset_tracked attaches the wrapper so the open itself is counted (ObjectStoreParams.object_store_wrapper). Wires the wrappers into the manifest open (open_manifest_dataset) and the commit-graph opens (CommitGraph::open/open_at_branch). Production leaves the task-local unset, so nothing attaches. Makes Omnigraph::open_with_storage public so tests can inject the counting adapter. lance-io is a dev-dependency (IOTracker named only in tests). No runtime behavior change. * test(engine): warm same-branch read should reuse the coordinator (red) Cost-budget test using Lance IOTracker at the object-store boundary (the LanceDB IO-counted-test pattern). On a 20-commit-deep graph, a warm same-branch query re-opens a fresh coordinator, which opens both the commit graph and __manifest. Asserts the read opens the commit graph zero times and performs exactly one cheap version probe; today it does neither (it scans the commit graph on re-open and never probes). The freshness guard already passes. Adds the commit_many helper for history-depth fixtures. Red half of the Fix 1 red->green pair; turns green with the next commit. * perf(engine): same-branch reads reuse the warm coordinator (Fix 1) query()/resolved_target re-opened a fresh GraphCoordinator from storage on every read (full __manifest scan + two commit-graph scans), so a warm read's cost grew with commit history (invariant 15) though the data was unchanged. resolved_target now serves same-branch reads from the warm in-memory coordinator, gated by a cheap version probe (latest_version_id, one object-store op) instead of a full re-open: - fresh (probe == cached version): return the in-memory snapshot under the read lock, with a synthetic (branch, version) id and no commit-graph access (reads pin the snapshot by manifest version, not the commit DAG; invariant 2). - stale: take the write lock, re-probe (double-checked; tokio RwLock has no read->write upgrade), then refresh_manifest_only (no commit-graph scan), preserving strong consistency for external writers (invariant 6). Cross-branch and snapshot targets keep the existing cold-resolve path. Adds ManifestCoordinator/GraphCoordinator::probe_latest_version and GraphCoordinator::refresh_manifest_only. Nothing on the read path needs a real commit ULID (only RuntimeCache keys on the id, where synthetic is consistent), per a caller audit. A warm same-branch read on a 20-commit graph now does zero commit-graph opens and exactly one probe (down from a deep commit-graph scan) and still observes external commits. The residual per-table __manifest scans are removed later by Fix 2. * test(engine): warm query should validate the schema contract once (red) ensure_schema_state_valid runs twice per query (query()/run_query_at AND resolved_target/snapshot_at_version), each reading 3 contract files + 2 existence probes. A warm query thus does 6 read_text + 4 exists where one validation (3 + 2) suffices, measured via CountingStorageAdapter. Adds a drift guard (schema_source_drift_is_caught_on_read) that already passes. Red half of the finding-A red->green pair. * perf(engine): validate the schema contract once per query (finding A) ensure_schema_state_valid ran on every query AND again inside resolved_target / snapshot_at_version, so each query validated the schema contract twice (~10 storage ops). Removes the redundant query()/ run_query_at() calls; the validation inside resolved_target / snapshot_at_version still runs, so drift is detected exactly as before. A source-only fast path was rejected: a long-lived handle must detect external drift of the schema source, IR, OR state on its next operation (lifecycle::long_lived_handle_rejects_schema_*), which a source-only compare would miss. So the only safe latency win is not validating twice. A warm query now does one validation (3 read_text + 2 exists) instead of two (6 + 4). * test(engine): warm + multi-table reads should do zero manifest scans (red) After Fix 1 a warm same-branch read still scans __manifest ~44 times at 20-commit depth: not from resolution (Fix 1 removed that) but from the per-table open path, which routes through the Lance namespace and full-scans __manifest twice per touched table (describe_table + describe_table_version). Tightens the warm test to assert manifest read_iops == 0 and adds a multi-table (traversal) test asserting the same, pinning the "2 tables = 2x" tax. Red half of the Fix 2 red->green pair. * perf(engine): open touched tables by location+version, not via the namespace (Fix 2) SubTableEntry::open routed every read-path table open through DatasetBuilder::from_namespace(BranchManifestNamespace), whose describe_table full-scans __manifest and, with managed_versioning, makes Lance scan again (describe_table_version) -- two full __manifest scans per touched table. That was the residual that made warm-read manifest IO grow with history and the '2 tables = 2x' multi-table tax. The resolved Snapshot already holds each table's path/version/branch, so open directly: from_uri(table_uri_for_path(root, path, branch)).with_version(v). The branch-qualified location is the dataset that physically holds the version (main: {path}; branch: {path}/tree/{branch}, Lance native-branch storage), and with_version resolves it within THAT dataset's _versions. 0 namespace calls + 1 HEAD via the native ConditionalPutCommitHandler. The read namespace (BranchManifestNamespace) is now unused in production (writes use StagedTableNamespace), so it, its constructor, and the helpers only it used (to_namespace_version, publish_requests, their imports) are gated #[cfg(test)] -- retained to validate the namespace contract in unit tests. Removes the dead open_table_at_version_from_manifest. Warm same-branch + multi-table reads now scan __manifest zero times; branch + time-travel reads stay correct (branching.rs, point_in_time.rs, 2 lib regression tests); production-lib warnings unchanged (baseline). * test(engine): cost-budget coverage for branch-warm and stale-refresh reads (matrix) Extends the read-path cost-budget tests across more of the morphological matrix: - warm_branch_read_does_no_manifest_scans: a warm read on a non-main branch (handle synced to it) scans __manifest zero times, exercising Fix 2's branch-owned-table open (tree/{branch} + with_version) on Fix 1's warm path -- the cell that regressed when the open used with_branch against the base. - stale_read_refreshes_manifest_only: an external commit makes the next read take the stale path, which re-reads the manifest (read_iops > 0) but never scans the commit graph (refresh_manifest_only), pinning Fix 1's manifest-only refresh. Cold paths (cross-branch, time-travel) stay behavior-covered (branching.rs, point_in_time.rs) and are cold by design (Fix 1 warm-paths only same-branch), so there is no manifest==0 contract to assert there. * test(engine): same-branch write after external commit must not fork the commit DAG (red) * fix(engine): refresh commit-graph head before append to prevent same-branch DAG fork A same-branch write that follows an external commit committed a fresh manifest version (commit_all rebases the pin from a fresh coordinator) but appended off the coordinator's stale in-memory commit-graph head, forking the commit DAG (the new commit and the external commit shared a parent). Pre-existing for non-strict inserts; widened to strict ops by Fix 1's refresh_manifest_only freshening the read-time pin. record_graph_commit now refreshes the commit-graph head from storage before append_commit, so the parent is the true current head. record_merge_commit is unaffected (it passes explicit parents). * perf(engine): hold open Dataset handles + share one Session per graph (Fix 3) A warm same-branch read still re-opened every touched table per query (the "never warms up" residual after Fix 1+2). A per-graph held-handle cache keyed by (table_path, branch, version) now serves repeat reads with zero table opens, and one shared lance::Session per graph warms metadata/index caches across opens. Validated against LanceDB upstream (rust/lancedb/src/table/dataset.rs DatasetConsistencyWrapper): hold an Arc and reuse it for 0-IO warm reads; one Session per connection threaded into opens; writers never serve from the read cache; time-travel bypasses. One adaptation: omnigraph keys by version (snapshot-pins-version model) where LanceDB keys per-table+HEAD, reusing the in-repo GraphIndexCache LRU template. - ReadCaches (session + TableHandleCache) injected onto live-Branch-read snapshots in resolved_target; Snapshot::open serves from the cache or opens once with the session on a miss (via the instrumented open_table_dataset). - Writes (resolved_branch_target -> open HEAD) and time-travel / Snapshot-id reads bypass the cache. Version-in-key makes a write a new key (old handle ages out via LRU); invalidate_all at branch-switch/refresh is hygiene only. - Cost tests: a 2nd identical warm read does 0 table opens; a write re-opens only the changed table at its new version. Full engine suite green. * test(engine): forbid raw data opens in the read/exec layer (P2 guard) Extend the forbidden-API guard with Dataset::open / DatasetBuilder::from_uri / from_namespace so the read/exec layer (exec/, loader/, changes/, db/omnigraph/) cannot bypass Snapshot::open and the held-handle cache (Fix 3). The instrumented opener (instrumentation.rs) is allow-listed; two legitimate non-read opens (a test editing __manifest, Hard-drop version GC) carry sentinels. The storage/manifest layers stay allow-listed. Lean P2 scope, per LanceDB-upstream + minimize-liability: the data-read boundary already exists (SubTableEntry::open); this guard pins it so a future read cannot open around the cache. Centralizing all internal opens behind one opener is deferred. * docs(dev): invariant 15 (one source of truth, cheaply derived) + cost-budget testing Records the principle behind the query-latency work: Lance and the manifest are the source of truth, everything else a derived view held warm and refreshed by a cheap probe; the two failure modes (a drifting parallel copy, and cold re-derivation whose cost grows with history) are deny-listed. Adds the cost-budget testing discipline (assert a warm read's open/IO count is flat at commit-history depth, the LanceDB IO-counted pattern) and the warm_read_cost.rs row. Updates the read-path-re-derivation known gap to reflect what Fix 1/2/3 + finding A close, and adds the commit-graph-parent-under-concurrency gap. * fix(engine): branch-incarnation identity + unified invalidation + shared LruMap (PR #268 review) Phase 6 A-D, correct-by-design responses to the Codex/Greptile P2 review comments. A: warm-read freshness and the table-handle cache key use the manifest incarnation (e_tag, manifest-timestamp fallback, then version), so a deleted+recreated non-main branch reusing a version number cannot be served stale; main stays version-cheap, non-main loads latest_manifest; a detected stale refresh also invalidates read caches; two regression tests force the version collision. B: unify the two cache invalidations into Omnigraph::invalidate_read_caches() at the four sites. C: assert the stale path's probe count. D: shared LruMap behind both caches with unconditional eviction, plus a unit test. Full engine suite green; multi-process lineage fork and O(history) write refresh remain known gaps for Phase 6E/7. --- AGENTS.md | 4 + Cargo.lock | 1 + crates/omnigraph/Cargo.toml | 1 + crates/omnigraph/src/db/commit_graph.rs | 38 +- crates/omnigraph/src/db/graph_coordinator.rs | 61 +- crates/omnigraph/src/db/manifest.rs | 138 ++- crates/omnigraph/src/db/manifest/layout.rs | 9 +- crates/omnigraph/src/db/manifest/metadata.rs | 2 +- crates/omnigraph/src/db/manifest/namespace.rs | 38 +- crates/omnigraph/src/db/manifest/publisher.rs | 6 +- crates/omnigraph/src/db/manifest/recovery.rs | 7 +- crates/omnigraph/src/db/manifest/tests.rs | 11 +- crates/omnigraph/src/db/omnigraph.rs | 146 ++- crates/omnigraph/src/db/omnigraph/optimize.rs | 1 + .../src/db/omnigraph/schema_apply.rs | 7 +- .../omnigraph/src/db/omnigraph/table_ops.rs | 5 +- crates/omnigraph/src/exec/query.rs | 4 +- crates/omnigraph/src/instrumentation.rs | 231 +++++ crates/omnigraph/src/lib.rs | 1 + crates/omnigraph/src/runtime_cache.rs | 220 ++++- crates/omnigraph/tests/branching.rs | 168 ++++ crates/omnigraph/tests/forbidden_apis.rs | 9 + crates/omnigraph/tests/helpers/mod.rs | 15 + crates/omnigraph/tests/warm_read_cost.rs | 833 ++++++++++++++++++ docs/dev/invariants.md | 57 +- docs/dev/testing.md | 12 +- 26 files changed, 1918 insertions(+), 107 deletions(-) create mode 100644 crates/omnigraph/src/instrumentation.rs create mode 100644 crates/omnigraph/tests/warm_read_cost.rs diff --git a/AGENTS.md b/AGENTS.md index 378de88..e8cd035 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -125,6 +125,8 @@ This is a decision lens, not a code-size rule. It cuts both ways. Sometimes the When evaluating a design, ask: *"what does this look like after 5 more changes like it?"* If the answer is "this converges to one shape", cost is bounded. If it's "this forks every time", the option is mortgaging the future for present convenience — pick differently. +The same lens has a structural corollary: **one source of truth, cheaply derived.** Lance and the manifest are the source of truth; everything else is a derived view. Maintaining a parallel copy invites drift that compounds over time, and re-deriving a view from the full source on every call makes its cost grow with history. Both are liabilities integrated over time, so both are ruled out the same way: hold a warm derived view and refresh it with a cheap probe, never shadow the source or rebuild from it cold. Invariant 15 in [docs/dev/invariants.md](docs/dev/invariants.md) states this; invariants 1 (respect the substrate) and 7 (indexes are derived state) are instances. + ### Tiebreakers when liability alone is silent - **Correctness > simplicity > performance.** Lexicographic — give up performance for simpler code; give up simplicity for correct code; never give up correctness. The deny-list ("no silent failures," "no acks before durable persistence," "no reads of partial commits") is this rule's hard floor. @@ -145,6 +147,7 @@ These are architectural rules that need to be in scope on every change. They're 5. **Reads always see the current index state for the branch they're reading.** Indexes track the branch head, not historical snapshots. If you change index lifecycle, preserve this guarantee. 6. **Stable type IDs survive renames.** Schema migration relies on identity that's stable across rename — don't mint new IDs on rename. 7. **Logical contract over physical state.** Physical state (index coverage, fragment layout, compaction versions, staged writes) is derived and rebuildable; it must never fail a logical operation. Check preconditions against logical state and let reconciliation converge the physical state idempotently — genuine logical conflicts still fail loudly. This is the rule rules 1–6 instantiate; full statement and applications in [docs/dev/invariants.md](docs/dev/invariants.md). +8. **One source of truth, cheaply derived.** Lance and the manifest are the source of truth; runtime state is a derived view of them. Don't maintain a parallel copy that can drift, and don't re-derive a view from cold storage on every call (that makes cost grow with history). Hold it warm, refresh with a cheap probe. ### Deny-list (fast-pass review filter — full reasoning in [docs/dev/invariants.md](docs/dev/invariants.md)) @@ -166,6 +169,7 @@ If a proposal fits one of these, the burden is on the proposer to justify why th - Cloud-only correctness fixes — correctness is always OSS. - Forking the codebase for Cloud — trait-extension only. - Hand-rolling something Lance already does — check the spec first. +- Shadowing the source of truth with a maintained parallel copy, or re-deriving a derived view from cold storage per call (cost then scales with history). Hold it warm and refresh cheaply. - Mutating in place state that should be immutable (Lance fragments, index segments) — new segments instead. - Silent failures — OOM, timeout, partial result must all be surfaced and bounded. - Shipping observable behavior as if it weren't part of the contract — output ordering, error-message text, timestamp precision, default-flag values, latency profile. Per Hyrum's Law, every observable behavior gets depended on once shipped; don't expose what you don't want to commit to. diff --git a/Cargo.lock b/Cargo.lock index 2419e9f..3963da1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4941,6 +4941,7 @@ dependencies = [ "lance-datafusion", "lance-file", "lance-index", + "lance-io", "lance-linalg", "lance-namespace", "lance-namespace-impls", diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index 7ee9bda..55d3008 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -55,5 +55,6 @@ arc-swap = { workspace = true } omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" } tokio = { workspace = true } lance-namespace-impls = { workspace = true } +lance-io = "7.0.0" serial_test = "3" proptest = "1" diff --git a/crates/omnigraph/src/db/commit_graph.rs b/crates/omnigraph/src/db/commit_graph.rs index 3d90e54..572bdf5 100644 --- a/crates/omnigraph/src/db/commit_graph.rs +++ b/crates/omnigraph/src/db/commit_graph.rs @@ -79,10 +79,14 @@ impl CommitGraph { pub async fn open(root_uri: &str) -> Result { let root = root_uri.trim_end_matches('/'); - let dataset = Dataset::open(&graph_commits_uri(root)) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok(); + let wrapper = crate::instrumentation::commit_graph_wrapper(); + let dataset = + crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone()) + .await?; + let actor_dataset = + crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper) + .await + .ok(); let actor_by_commit_id = match &actor_dataset { Some(dataset) => load_commit_actor_cache(dataset).await?, None => HashMap::new(), @@ -101,14 +105,18 @@ impl CommitGraph { pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result { let root = root_uri.trim_end_matches('/'); - let dataset = Dataset::open(&graph_commits_uri(root)) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; + let wrapper = crate::instrumentation::commit_graph_wrapper(); + let dataset = + crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone()) + .await?; let dataset = dataset .checkout_branch(branch) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok(); + let actor_dataset = + crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper) + .await + .ok(); let actor_by_commit_id = match &actor_dataset { Some(dataset) => load_commit_actor_cache(dataset).await?, None => HashMap::new(), @@ -127,9 +135,12 @@ impl CommitGraph { pub async fn refresh(&mut self) -> Result<()> { let root = self.root_uri.clone(); - self.dataset = Dataset::open(&graph_commits_uri(&root)) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; + let wrapper = crate::instrumentation::commit_graph_wrapper(); + self.dataset = crate::instrumentation::open_dataset_tracked( + &graph_commits_uri(&root), + wrapper.clone(), + ) + .await?; if let Some(branch) = &self.active_branch { self.dataset = self .dataset @@ -137,7 +148,10 @@ impl CommitGraph { .await .map_err(|e| OmniError::Lance(e.to_string()))?; } - self.actor_dataset = Dataset::open(&graph_commit_actors_uri(&root)).await.ok(); + self.actor_dataset = + crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(&root), wrapper) + .await + .ok(); self.actor_by_commit_id = match &self.actor_dataset { Some(dataset) => load_commit_actor_cache(dataset).await?, None => HashMap::new(), diff --git a/crates/omnigraph/src/db/graph_coordinator.rs b/crates/omnigraph/src/db/graph_coordinator.rs index dfe2767..b9bcb11 100644 --- a/crates/omnigraph/src/db/graph_coordinator.rs +++ b/crates/omnigraph/src/db/graph_coordinator.rs @@ -10,7 +10,9 @@ use crate::storage::{StorageAdapter, join_uri, normalize_root_uri}; use super::commit_graph::{CommitGraph, GraphCommit}; use super::is_internal_system_branch; -use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate}; +use super::manifest::{ + ManifestChange, ManifestCoordinator, ManifestIncarnation, Snapshot, SubTableUpdate, +}; const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance"; @@ -26,10 +28,11 @@ impl SnapshotId { &self.0 } - pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self { - match branch { - Some(branch) => Self(format!("manifest:{}:v{}", branch, version)), - None => Self(format!("manifest:main:v{}", version)), + pub(crate) fn synthetic(branch: Option<&str>, version: u64, e_tag: Option<&str>) -> Self { + let branch = branch.unwrap_or("main"); + match e_tag { + Some(e_tag) => Self(format!("manifest:{}:v{}:etag:{}", branch, version, e_tag)), + None => Self(format!("manifest:{}:v{}", branch, version)), } } } @@ -166,6 +169,10 @@ impl GraphCoordinator { self.manifest.version() } + pub(crate) fn manifest_incarnation(&self) -> ManifestIncarnation { + self.manifest.incarnation() + } + pub fn snapshot(&self) -> Snapshot { self.manifest.snapshot() } @@ -182,6 +189,19 @@ impl GraphCoordinator { Ok(()) } + pub(crate) async fn probe_latest_incarnation(&self) -> Result { + crate::instrumentation::record_probe(); + self.manifest.probe_latest_incarnation().await + } + + /// Refresh only the manifest (not the commit graph). The read path uses this + /// on a stale same-branch probe: a read pins its snapshot by manifest version + /// and never needs the commit graph, so a full `refresh` (which also scans + /// the commit graph) would be wasted IO. + pub async fn refresh_manifest_only(&mut self) -> Result<()> { + self.manifest.refresh().await + } + pub async fn branch_list(&self) -> Result> { self.manifest.list_branches().await.map(|branches| { branches @@ -315,10 +335,13 @@ impl GraphCoordinator { None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?, }; - Ok(other - .head_commit_id() - .await? - .unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version()))) + Ok(other.head_commit_id().await?.unwrap_or_else(|| { + SnapshotId::synthetic( + other.current_branch(), + other.version(), + other.manifest_incarnation().e_tag.as_deref(), + ) + })) } pub async fn resolve_target(&self, target: &ReadTarget) -> Result { @@ -339,7 +362,11 @@ impl GraphCoordinator { } }; let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| { - SnapshotId::synthetic(other.current_branch(), other.version()) + SnapshotId::synthetic( + other.current_branch(), + other.version(), + other.manifest_incarnation().e_tag.as_deref(), + ) }); Ok(ResolvedTarget { requested: target.clone(), @@ -509,9 +536,23 @@ impl GraphCoordinator { return Ok(SnapshotId::synthetic( current_branch.as_deref(), manifest_version, + self.manifest_incarnation().e_tag.as_deref(), )); }; failpoints::maybe_fail("graph_publish.before_commit_append")?; + // Refresh the commit-graph head from storage before selecting the + // parent. `append_commit` parents the new commit on the IN-MEMORY head + // (`head_commit_id`, zero storage read), but the manifest was just + // committed against a freshly rebased pin (`commit_all` opens a fresh + // coordinator) while THIS coordinator's cached head may be stale because + // an external writer advanced the branch. Without this refresh a + // same-branch write after an external commit appends off the stale head + // and FORKS the commit DAG (the new commit and the external commit + // sharing a parent). Refreshing makes the parent the true current head; + // the just-committed manifest version has no commit-graph row yet, so the + // fresh head is exactly the prior commit. (record_merge_commit is + // unaffected — it passes explicit parents, never the cached head.) + commit_graph.refresh().await?; let graph_commit_id = commit_graph .append_commit(current_branch.as_deref(), manifest_version, actor_id) .await?; diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index f130523..ce91513 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -24,11 +24,10 @@ mod recovery; mod state; use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at}; -use layout::{manifest_uri, open_manifest_dataset, type_name_hash}; +use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash}; pub(crate) use metadata::TableVersionMetadata; #[cfg(test)] use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state}; -use namespace::open_table_at_version_from_manifest; pub(crate) use namespace::open_table_head_for_write; #[cfg(test)] use namespace::{branch_manifest_namespace, staged_table_namespace}; @@ -74,16 +73,51 @@ pub struct Snapshot { root_uri: String, version: u64, entries: HashMap, + /// Per-graph read caches (shared `Session` + held-handle cache), injected by + /// `Omnigraph::resolved_target` for live Branch reads so table opens reuse + /// handles (0 IO on a warm repeat) and one `Session`. `None` for write-prelude + /// snapshots, time-travel / Snapshot-id reads, and directly-built test + /// snapshots, which fall back to a plain open. + read_caches: Option>, } impl Snapshot { - /// Open a sub-table dataset at its pinned version. + /// Open a sub-table dataset at its pinned version. With read caches present + /// (live Branch reads), reuse a held handle through the cache (0 open IO on a + /// warm repeat) and the shared `Session`; otherwise plain-open (Fix 2). pub async fn open(&self, table_key: &str) -> Result { let entry = self .entries .get(table_key) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; - entry.open(&self.root_uri).await + match &self.read_caches { + Some(caches) => { + let location = table_uri_for_path( + &self.root_uri, + &entry.table_path, + entry.table_branch.as_deref(), + ); + caches + .handles + .get_or_open( + &entry.table_path, + entry.table_branch.as_deref(), + entry.table_version, + entry.version_metadata.e_tag(), + &location, + Some(&caches.session), + ) + .await + } + None => entry.open(&self.root_uri).await, + } + } + + /// Attach per-graph read caches (shared `Session` + handle cache) so this + /// snapshot's table opens reuse handles and the session. Set by + /// `Omnigraph::resolved_target` for live Branch reads only. + pub(crate) fn set_read_caches(&mut self, caches: Arc) { + self.read_caches = Some(caches); } /// Manifest version this snapshot was taken from. @@ -101,6 +135,31 @@ impl Snapshot { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ManifestIncarnation { + pub(crate) version: u64, + pub(crate) e_tag: Option, + timestamp_nanos: Option, +} + +impl ManifestIncarnation { + pub(crate) fn matches(&self, held: &Self) -> bool { + if self.version != held.version { + return false; + } + match (&self.e_tag, &held.e_tag) { + (Some(latest), Some(current)) => latest == current, + _ => match (self.timestamp_nanos, held.timestamp_nanos) { + (Some(latest), Some(current)) => latest == current, + // Some object stores can omit both e_tag and manifest timestamp + // from the reachable API. In that narrow case the version-number + // probe is the strongest available identity. + _ => true, + }, + } + } +} + impl SubTableUpdate { pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest { self.version_metadata.to_create_table_version_request( @@ -132,14 +191,28 @@ pub(crate) enum ManifestChange { } impl SubTableEntry { + /// Open this sub-table at its pinned version directly by location (Fix 2), + /// without the Lance namespace — which would full-scan `__manifest` twice per + /// open (`describe_table` + `describe_table_version`). The resolved Snapshot + /// already holds the path, version, and branch. Branches are Lance native + /// branches, so `with_branch` resolves `{base}/tree/{branch}` from the base + /// URI; main uses `with_version`. pub(crate) async fn open(&self, root_uri: &str) -> Result { - open_table_at_version_from_manifest( - root_uri, - &self.table_key, - self.table_branch.as_deref(), - self.table_version, - ) - .await + // The branch-qualified location is the dataset that physically holds this + // version: main at `{table_path}`, a branch at + // `{table_path}/tree/{branch}` (Lance native-branch storage). `with_version` + // then resolves the version within THAT dataset's `_versions` — a branch + // version lives under `tree/{branch}/_versions`, not the base. This + // matches the physical layout the namespace path resolved, without the + // per-open `__manifest` scan. + let location = table_uri_for_path(root_uri, &self.table_path, self.table_branch.as_deref()); + // Route through the instrumented data-table opener (Fix 3). With no + // session this is exactly the Fix-2 `from_uri(location).with_version`. + // This is the uncached fallback (a snapshot with no read caches); the + // cached path (`Snapshot::open` → handle cache) calls the same opener on + // a miss with the shared session, so both paths count on the per-query + // `table_wrapper`. + crate::instrumentation::open_table_dataset(&location, self.table_version, None).await } } @@ -223,6 +296,7 @@ impl ManifestCoordinator { .into_iter() .map(|entry| (entry.table_key.clone(), entry)) .collect(), + read_caches: None, } } @@ -359,6 +433,48 @@ impl ManifestCoordinator { self.dataset.version().version } + /// Latest committed manifest version on disk (one object-store op, no row + /// scan). The freshness probe for warm reuse: compare against `version()` + /// (the held handle's pinned version) to decide whether to refresh. + pub async fn probe_latest_version(&self) -> Result { + self.dataset + .latest_version_id() + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + + pub(crate) fn incarnation(&self) -> ManifestIncarnation { + ManifestIncarnation { + version: self.version(), + e_tag: self.dataset.manifest_location().e_tag.clone(), + timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos), + } + } + + /// Latest committed manifest identity. Main cannot be deleted/recreated, so + /// the cheap version-number probe is sufficient there. Non-main Lance + /// branches can be deleted and recreated with the same version number, so + /// load the latest manifest location and compare its e_tag / timestamp too. + pub(crate) async fn probe_latest_incarnation(&self) -> Result { + if self.active_branch.is_none() { + return Ok(ManifestIncarnation { + version: self.probe_latest_version().await?, + e_tag: self.dataset.manifest_location().e_tag.clone(), + timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos), + }); + } + let (manifest, location) = self + .dataset + .latest_manifest() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(ManifestIncarnation { + version: manifest.version, + e_tag: location.e_tag, + timestamp_nanos: Some(manifest.timestamp_nanos), + }) + } + pub fn active_branch(&self) -> Option<&str> { self.active_branch.as_deref() } diff --git a/crates/omnigraph/src/db/manifest/layout.rs b/crates/omnigraph/src/db/manifest/layout.rs index 9cfde9a..08fe043 100644 --- a/crates/omnigraph/src/db/manifest/layout.rs +++ b/crates/omnigraph/src/db/manifest/layout.rs @@ -20,9 +20,12 @@ pub(super) fn manifest_uri(root: &str) -> String { } pub(super) async fn open_manifest_dataset(root_uri: &str, branch: Option<&str>) -> Result { - let dataset = Dataset::open(&manifest_uri(root_uri.trim_end_matches('/'))) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; + let uri = manifest_uri(root_uri.trim_end_matches('/')); + let dataset = crate::instrumentation::open_dataset_tracked( + &uri, + crate::instrumentation::manifest_wrapper(), + ) + .await?; match branch { Some(branch) if branch != "main" => dataset .checkout_branch(branch) diff --git a/crates/omnigraph/src/db/manifest/metadata.rs b/crates/omnigraph/src/db/manifest/metadata.rs index 0bf14b6..7cd6436 100644 --- a/crates/omnigraph/src/db/manifest/metadata.rs +++ b/crates/omnigraph/src/db/manifest/metadata.rs @@ -111,7 +111,6 @@ impl TableVersionMetadata { self.manifest_size } - #[cfg(test)] pub(crate) fn e_tag(&self) -> Option<&str> { self.e_tag.as_deref() } @@ -138,6 +137,7 @@ impl TableVersionMetadata { request } + #[cfg(test)] pub(super) fn to_namespace_version(&self, version: u64) -> TableVersion { self.to_namespace_version_with_details(version, None, None) } diff --git a/crates/omnigraph/src/db/manifest/namespace.rs b/crates/omnigraph/src/db/manifest/namespace.rs index 5e907ba..0d567e0 100644 --- a/crates/omnigraph/src/db/manifest/namespace.rs +++ b/crates/omnigraph/src/db/manifest/namespace.rs @@ -16,21 +16,30 @@ use object_store::{ use crate::error::{OmniError, Result}; -use super::layout::{ - namespace_internal_error, open_manifest_dataset, table_id_to_key, table_uri_for_path, -}; -use super::metadata::{ - TableVersionMetadata, namespace_version_metadata, parse_namespace_version_request, -}; +use super::layout::{namespace_internal_error, table_uri_for_path}; +#[cfg(test)] +use super::layout::{open_manifest_dataset, table_id_to_key}; +use super::metadata::TableVersionMetadata; +#[cfg(test)] +use super::metadata::{namespace_version_metadata, parse_namespace_version_request}; +#[cfg(test)] use super::publisher::GraphNamespacePublisher; +// The read namespace (BranchManifestNamespace) is test-only since Fix 2: reads +// open sub-tables directly by location+version (SubTableEntry::open), so nothing +// in production routes a read through the Lance namespace. The writes path uses +// StagedTableNamespace. These items are retained to validate the namespace +// contract in unit tests. +#[cfg(test)] use super::state::{ManifestState, SubTableEntry, read_manifest_entries, read_manifest_state}; +#[cfg(test)] #[derive(Debug, Clone)] struct BranchManifestNamespace { root_uri: String, branch: Option, } +#[cfg(test)] impl BranchManifestNamespace { fn new(root_uri: &str, branch: Option<&str>) -> Self { Self { @@ -137,6 +146,7 @@ impl StagedTableNamespace { } } +#[cfg(test)] pub(crate) fn branch_manifest_namespace( root_uri: &str, branch: Option<&str>, @@ -175,21 +185,7 @@ async fn load_table_from_namespace( .map_err(|e| OmniError::Lance(e.to_string())) } -pub(crate) async fn open_table_at_version_from_manifest( - root_uri: &str, - table_key: &str, - branch: Option<&str>, - version: u64, -) -> Result { - load_table_from_namespace( - branch_manifest_namespace(root_uri, branch), - table_key, - branch, - Some(version), - ) - .await -} - +#[cfg(test)] #[async_trait] impl LanceNamespace for BranchManifestNamespace { fn namespace_id(&self) -> String { diff --git a/crates/omnigraph/src/db/manifest/publisher.rs b/crates/omnigraph/src/db/manifest/publisher.rs index 288f4be..ba1166d 100644 --- a/crates/omnigraph/src/db/manifest/publisher.rs +++ b/crates/omnigraph/src/db/manifest/publisher.rs @@ -24,10 +24,13 @@ use lance::Dataset; use lance::Error as LanceError; use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched}; use lance_namespace::NamespaceError; +#[cfg(test)] use lance_namespace::models::CreateTableVersionRequest; use crate::error::{OmniError, Result}; +#[cfg(test)] +use super::SubTableUpdate; use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id}; use super::metadata::parse_namespace_version_request; use super::migrations::migrate_internal_schema; @@ -37,7 +40,7 @@ use super::state::{ }; use super::{ ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION, - SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone, + SubTableEntry, TableRegistration, TableTombstone, }; /// Bound on the publisher-level retry loop that wraps Lance's row-level CAS @@ -396,6 +399,7 @@ impl GraphNamespacePublisher { Ok(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone())) } + #[cfg(test)] pub(super) async fn publish_requests( &self, requests: &[CreateTableVersionRequest], diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 968d3f4..4b0f870 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -830,7 +830,12 @@ pub(crate) async fn recover_manifest_drift( // write-entry heal: a deferred sidecar whose branch was // deleted would otherwise fail every ReadWrite open. coordinator.refresh().await?; - if !coordinator.all_branches().await?.iter().any(|name| name == b) { + if !coordinator + .all_branches() + .await? + .iter() + .any(|name| name == b) + { discard_orphaned_branch_sidecar( root_uri, storage.as_ref(), diff --git a/crates/omnigraph/src/db/manifest/tests.rs b/crates/omnigraph/src/db/manifest/tests.rs index 0e00505..3888bd4 100644 --- a/crates/omnigraph/src/db/manifest/tests.rs +++ b/crates/omnigraph/src/db/manifest/tests.rs @@ -1531,7 +1531,11 @@ async fn test_v2_to_v3_sweeps_legacy_run_branches_on_write_open() { .await .unwrap(); let post = open_manifest_dataset(uri, None).await.unwrap(); - assert_eq!(super::migrations::read_stamp(&post), 2, "stamp rewound to v2"); + assert_eq!( + super::migrations::read_stamp(&post), + 2, + "stamp rewound to v2" + ); } // A no-op publish forces the open-for-write path, which runs the migration. @@ -1556,7 +1560,10 @@ async fn test_v2_to_v3_sweeps_legacy_run_branches_on_write_open() { !after.iter().any(|b| b.starts_with("__run__")), "legacy run branch must be swept; got {after:?}", ); - assert!(after.iter().any(|b| b == "feature"), "user branch must survive"); + assert!( + after.iter().any(|b| b == "feature"), + "user branch must survive" + ); assert!(after.iter().any(|b| b == "main"), "main must survive"); // Idempotent: a second write-open finds the stamp at current and does not diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 48be274..e1d7acf 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -106,6 +106,12 @@ pub struct Omnigraph { coordinator: Arc>, table_store: TableStore, runtime_cache: RuntimeCache, + /// Per-graph read caches: one shared Lance `Session` plus the held-`Dataset` + /// handle cache, handed to live-Branch-read snapshots (via + /// `resolved_target`) so table opens reuse handles (0 IO on a warm repeat) + /// and one session. Invalidated alongside `runtime_cache` on branch switch / + /// refresh — hygiene only; version-in-key carries correctness. + read_caches: Arc, /// Read-heavy on every query, written only by `apply_schema`. ArcSwap /// gives atomic pointer swap with zero-cost reads (`load()` returns a /// `Guard>`), so concurrent queries on different actors @@ -327,6 +333,14 @@ impl Omnigraph { coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)), table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), + // One shared Session per graph (LanceDB's one-session-per-connection + // model) plus the held-handle cache, created once and reused across + // reads. Session::default() caps are lazy (6 GiB index / 1 GiB + // metadata); multi-graph cap/sharing is a deferred follow-up. + read_caches: Arc::new(crate::runtime_cache::ReadCaches { + session: Arc::new(lance::session::Session::default()), + handles: Arc::new(crate::runtime_cache::TableHandleCache::default()), + }), catalog: Arc::new(ArcSwap::from_pointee(catalog)), schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())), write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), @@ -351,12 +365,10 @@ impl Omnigraph { Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await } - /// `open_with_storage` retained for existing callers (init/test paths). - /// Defaults to `OpenMode::ReadWrite`. - pub(crate) async fn open_with_storage( - uri: &str, - storage: Arc, - ) -> Result { + /// Open with a caller-supplied [`StorageAdapter`]. Used by init/test paths + /// and by embedding/test consumers that wrap storage (e.g. a counting + /// decorator for IO-budget tests). Defaults to `OpenMode::ReadWrite`. + pub async fn open_with_storage(uri: &str, storage: Arc) -> Result { Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await } @@ -428,6 +440,14 @@ impl Omnigraph { coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)), table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), + // One shared Session per graph (LanceDB's one-session-per-connection + // model) plus the held-handle cache, created once and reused across + // reads. Session::default() caps are lazy (6 GiB index / 1 GiB + // metadata); multi-graph cap/sharing is a deferred follow-up. + read_caches: Arc::new(crate::runtime_cache::ReadCaches { + session: Arc::new(lance::session::Session::default()), + handles: Arc::new(crate::runtime_cache::TableHandleCache::default()), + }), catalog: Arc::new(ArcSwap::from_pointee(catalog)), schema_source: Arc::new(ArcSwap::from_pointee(schema_source)), write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), @@ -539,6 +559,12 @@ impl Omnigraph { } pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> { + // Full per-call validation is intentional: a long-lived handle must + // detect external drift of the schema source, IR, OR state on its next + // operation (see lifecycle::long_lived_handle_rejects_schema_* tests). A + // source-only fast path would miss IR/state drift when _schema.pg is + // unchanged, so the only safe latency win is not calling this twice per + // query (finding A removes the redundant caller in exec/query.rs). validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await } @@ -719,10 +745,13 @@ impl Omnigraph { let normalized = normalize_branch_name(branch.unwrap_or("main"))?; let coord = self.coordinator.read().await; if normalized.as_deref() == coord.current_branch() { - let snapshot_id = coord - .head_commit_id() - .await? - .unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version())); + let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| { + SnapshotId::synthetic( + coord.current_branch(), + coord.version(), + coord.manifest_incarnation().e_tag.as_deref(), + ) + }); return Ok(ResolvedTarget { requested, branch: coord.current_branch().map(str::to_string), @@ -785,10 +814,15 @@ impl Omnigraph { let branch = normalize_branch_name(branch)?; let next = self.open_coordinator_for_branch(branch.as_deref()).await?; *self.coordinator.write().await = next; - self.runtime_cache.invalidate_all().await; + self.invalidate_read_caches().await; Ok(()) } + async fn invalidate_read_caches(&self) { + self.runtime_cache.invalidate_all().await; + self.read_caches.handles.invalidate_all().await; + } + /// Re-read the handle-local coordinator state from storage AND run /// in-process recovery. Closes the Phase B → Phase C residual (e.g. /// `MutationStaging::finalize` crash mid-publish in a long-running @@ -888,7 +922,7 @@ impl Omnigraph { ) .await?; self.reload_schema_if_source_changed().await?; - self.runtime_cache.invalidate_all().await; + self.invalidate_read_caches().await; Ok(()) } @@ -920,7 +954,7 @@ impl Omnigraph { // write that triggered the heal validates against the stale // schema. Same post-heal step as `refresh`. self.reload_schema_if_source_changed().await?; - self.runtime_cache.invalidate_all().await; + self.invalidate_read_caches().await; } Ok(()) } @@ -956,7 +990,7 @@ impl Omnigraph { /// own publish path. pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> { self.coordinator.write().await.refresh().await?; - self.runtime_cache.invalidate_all().await; + self.invalidate_read_caches().await; Ok(()) } @@ -974,11 +1008,66 @@ impl Omnigraph { target: impl Into, ) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator - .read() - .await - .resolve_target(&target.into()) - .await + let target = target.into(); + let mut resolved = self.resolve_target_inner(&target).await?; + // Attach the read caches (shared Session + held-handle cache) for live + // Branch reads so table opens reuse handles (0 IO on a warm repeat). + // Snapshot-id reads are deliberately NOT cached: they pin a historical + // version `cleanup` may GC, so bypassing the cache sidesteps the + // cleanup-vs-cached-handle edge. Writes never reach here (they use + // `resolved_branch_target`), so they never receive a pinned handle. + if matches!(target, ReadTarget::Branch(_)) { + resolved + .snapshot + .set_read_caches(Arc::clone(&self.read_caches)); + } + Ok(resolved) + } + + /// Resolve a read target to its snapshot, without attaching read caches. + /// Same-branch reads reuse the warm coordinator, gated by a cheap version + /// probe (invariant 6: strong consistency, never a blind warm read). Reads do + /// not need the commit graph (the manifest version is the visibility + /// authority, invariant 2), so the id is synthetic and no commit-graph scan + /// happens on this path. + async fn resolve_target_inner(&self, target: &ReadTarget) -> Result { + if let ReadTarget::Branch(branch) = target { + let normalized = normalize_branch_name(branch)?; + { + let coord = self.coordinator.read().await; + if normalized.as_deref() != coord.current_branch() { + // Different branch: cold resolve (opens that branch). + return coord.resolve_target(target).await; + } + let held = coord.manifest_incarnation(); + if coord.probe_latest_incarnation().await?.matches(&held) { + return Ok(warm_resolved_target(&coord, target)); + } + // Stale: refresh under the write lock below. + } + let mut coord = self.coordinator.write().await; + if normalized.as_deref() == coord.current_branch() { + // Re-check after taking the write lock; another writer may have + // refreshed (tokio RwLock has no read->write upgrade). + let held = coord.manifest_incarnation(); + let mut refreshed = false; + if !coord.probe_latest_incarnation().await?.matches(&held) { + coord.refresh_manifest_only().await?; + refreshed = true; + } + let resolved = warm_resolved_target(&coord, target); + drop(coord); + if refreshed { + self.invalidate_read_caches().await; + } + return Ok(resolved); + } + // Branch changed while waiting for the write lock: cold resolve. + return coord.resolve_target(target).await; + } + + // Snapshot target: resolve through the commit graph as before. + self.coordinator.read().await.resolve_target(target).await } // ─── Change detection ──────────────────────────────────────────────── @@ -1673,6 +1762,24 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result> { Ok(Some(branch.to_string())) } +/// Build a `ResolvedTarget` from the warm coordinator without opening the commit +/// graph. The live branch snapshot is pinned by the manifest incarnation, so the +/// id is synthetic `(branch, version, e_tag when available)`; nothing on the read +/// path needs a real commit ULID (only `RuntimeCache` keys on the id, where +/// synthetic is consistent). +fn warm_resolved_target(coord: &GraphCoordinator, requested: &ReadTarget) -> ResolvedTarget { + ResolvedTarget { + requested: requested.clone(), + branch: coord.current_branch().map(str::to_string), + snapshot_id: SnapshotId::synthetic( + coord.current_branch(), + coord.version(), + coord.manifest_incarnation().e_tag.as_deref(), + ), + snapshot: coord.snapshot(), + } +} + pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> { if is_internal_system_branch(branch) { return Err(OmniError::manifest(format!( @@ -2523,6 +2630,7 @@ edge WorksAt: Person -> Company db.branch_create("__run__legacy").await.unwrap(); drop(db); { + // forbidden-api-allow: test synthesizes a legacy graph by editing __manifest directly. let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri)) .await .unwrap(); diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 9181822..498f9ae 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -937,6 +937,7 @@ mod tests { for type_name in ["Person", "Company"] { let table_uri = node_table_uri(uri, type_name); + // forbidden-api-allow: test synthesizes a branch ref directly on the Lance dataset. let mut ds = lance::Dataset::open(&table_uri).await.unwrap(); let base = ds.version().version; ds.create_branch("feature", base, None).await.unwrap(); diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 48f8099..d013eb2 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -447,8 +447,7 @@ where && sidecar_registrations.is_empty() && sidecar_tombstones.is_empty()); if writes_sidecar { - schema_apply_queue_keys - .push(crate::db::manifest::schema_apply_serial_queue_key()); + schema_apply_queue_keys.push(crate::db::manifest::schema_apply_serial_queue_key()); } let _schema_apply_queue_guards = db .write_queue() @@ -530,8 +529,7 @@ where .await?; let table_path = table_path_for_table_key(target_table_key)?; let dataset_uri = db.storage().dataset_uri(&table_path); - let target_ds = - SnapshotHandle::new(TableStore::write_dataset(&dataset_uri, batch).await?); + let target_ds = SnapshotHandle::new(TableStore::write_dataset(&dataset_uri, batch).await?); // Indexes on the renamed table are reconciled later (iss-848). let state = db.storage().table_state(&dataset_uri, &target_ds).await?; table_registrations.insert(target_table_key.clone(), table_path); @@ -750,6 +748,7 @@ where async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> { use chrono::Utc; use lance::dataset::cleanup::CleanupPolicy; + // forbidden-api-allow: maintenance (Hard-drop version GC) opens the dataset to run cleanup_old_versions. let ds = lance::Dataset::open(full_uri) .await .map_err(|e| OmniError::Lance(e.to_string()))?; diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index d30acff..c325931 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -1097,7 +1097,8 @@ async fn prepare_updates_for_commit( // have null embeddings) is deferred and logged inside // build_indices; a later ensure_indices/optimize materializes it. // The load/mutate/merge commit must not fail on it. - let _pending = build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?; + let _pending = + build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?; let state = db.storage().table_state(&full_path, &ds).await?; prepared_update.table_version = state.version; prepared_update.row_count = state.row_count; @@ -1350,6 +1351,7 @@ mod classify_fork_ref_tests { // the manifest's `feature` snapshot still places on main. let person = node_path(&db, "feature", "node:Person").await; { + // forbidden-api-allow: test synthesizes a branch ref directly on the Lance dataset. let mut ds = lance::Dataset::open(&person).await.unwrap(); let v = ds.version().version; ds.create_branch("feature", v, None).await.unwrap(); @@ -1362,6 +1364,7 @@ mod classify_fork_ref_tests { // Orphan (ghost): a ref for a branch the manifest does not have at all. { + // forbidden-api-allow: test synthesizes a branch ref directly on the Lance dataset. let mut ds = lance::Dataset::open(&person).await.unwrap(); let v = ds.version().version; ds.create_branch("ghost", v, None).await.unwrap(); diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index b12e26b..e922075 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -35,7 +35,7 @@ impl Omnigraph { query_name: &str, params: &ParamMap, ) -> Result { - self.ensure_schema_state_valid().await?; + // resolved_target validates the schema contract; no redundant call here. let resolved = self.resolved_target(target).await?; let catalog = self.catalog(); @@ -80,7 +80,7 @@ impl Omnigraph { query_name: &str, params: &ParamMap, ) -> Result { - self.ensure_schema_state_valid().await?; + // snapshot_at_version validates the schema contract; no redundant call here. let snapshot = self.snapshot_at_version(version).await?; let catalog = self.catalog(); diff --git a/crates/omnigraph/src/instrumentation.rs b/crates/omnigraph/src/instrumentation.rs new file mode 100644 index 0000000..98249c0 --- /dev/null +++ b/crates/omnigraph/src/instrumentation.rs @@ -0,0 +1,231 @@ +//! Read-path cost instrumentation (test seam). +//! +//! Two boundary instruments let cost-budget tests assert that a warm read does +//! no redundant IO, the way LanceDB's IO-counted tests do (see +//! `docs/dev/testing.md`, "Cost-budget tests"): +//! +//! - **Lance object store** — a per-query [`WrappingObjectStore`] attached to the +//! datasets a query opens, so a test counts real `read_iops`. Delivered through +//! a task-local ([`QueryIoProbes`]) set by the test; production leaves it unset, +//! so the open helpers attach nothing (one unset-`Option` check per open). +//! - **omnigraph `StorageAdapter`** — [`CountingStorageAdapter`], a decorator that +//! counts per-method calls (the schema-contract reads on the query path). +//! +//! Nothing here changes runtime behavior: the wrappers only observe, and the +//! decorator delegates every call. `IOTracker` (the concrete counter) lives in +//! tests via the `lance-io` dev-dependency; this module stays generic over the +//! `lance::io`-re-exported trait, so it adds no production dependency. + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use async_trait::async_trait; +use lance::Dataset; +use lance::dataset::builder::DatasetBuilder; +use lance::io::{ObjectStoreParams, WrappingObjectStore}; + +use crate::error::{OmniError, Result}; +use crate::storage::StorageAdapter; + +/// Per-query IO probes, installed for a query's task via [`with_query_io_probes`]. +/// +/// Each wrapper is attached (when present) to the datasets that category opens, +/// so a test reads `read_iops` off its own `IOTracker` handle. `probe_count` +/// records calls to the version probe (which runs on the coordinator's already-open +/// handle, so it is counted by invocation rather than by the per-query wrappers). +#[derive(Clone, Default)] +pub struct QueryIoProbes { + pub manifest_wrapper: Option>, + pub commit_graph_wrapper: Option>, + /// Attached to the per-table data opens a query performs (the cache-miss + /// path in `SubTableEntry::open`). Lets a cost test assert how many tables + /// a query actually opened — N on a cold read, 0 on a warm repeat once the + /// handle cache (Fix 3) serves them. + pub table_wrapper: Option>, + pub probe_count: Arc, +} + +tokio::task_local! { + static QUERY_IO_PROBES: QueryIoProbes; +} + +/// Run `fut` with per-query IO probes installed. Test-only entry point; nothing +/// in production sets the probes, so the accessors below return `None`/no-op. +pub async fn with_query_io_probes(probes: QueryIoProbes, fut: F) -> F::Output +where + F: std::future::Future, +{ + QUERY_IO_PROBES.scope(probes, fut).await +} + +fn current(f: impl FnOnce(&QueryIoProbes) -> R) -> Option { + QUERY_IO_PROBES.try_with(f).ok() +} + +pub(crate) fn manifest_wrapper() -> Option> { + current(|p| p.manifest_wrapper.clone()).flatten() +} + +pub(crate) fn commit_graph_wrapper() -> Option> { + current(|p| p.commit_graph_wrapper.clone()).flatten() +} + +pub(crate) fn table_wrapper() -> Option> { + current(|p| p.table_wrapper.clone()).flatten() +} + +/// Record one version-probe invocation against the active per-query probes. +/// No-op when no probes are installed (production). +pub(crate) fn record_probe() { + let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed)); +} + +/// Open a Lance dataset at `uri`, attaching `wrapper` (for IO counting) when +/// present. With no wrapper this is exactly `Dataset::open(uri)`. The wrapper is +/// set via `ObjectStoreParams` on the builder so the open itself is counted +/// (`Dataset::with_object_store_wrappers` only wraps an already-open store). +pub(crate) async fn open_dataset_tracked( + uri: &str, + wrapper: Option>, +) -> Result { + let result = match wrapper { + None => Dataset::open(uri).await, + Some(wrapper) => { + DatasetBuilder::from_uri(uri) + .with_store_params(ObjectStoreParams { + object_store_wrapper: Some(wrapper), + ..Default::default() + }) + .load() + .await + } + }; + result.map_err(|e| OmniError::Lance(e.to_string())) +} + +/// Open a data-table dataset at `location` pinned to `version` — the cache-miss +/// path of the data-read boundary (`SubTableEntry::open`). Attaches the shared +/// per-graph `Session` (warms metadata/index caches across opens, LanceDB's +/// one-session-per-connection pattern) and the per-query `table_wrapper` (for IO +/// counting) when present. With neither, this is exactly the Fix-2 +/// `from_uri(location).with_version(version)` open. +pub(crate) async fn open_table_dataset( + location: &str, + version: u64, + session: Option<&Arc>, +) -> Result { + let mut builder = DatasetBuilder::from_uri(location).with_version(version); + if let Some(session) = session { + builder = builder.with_session(session.clone()); + } + if let Some(wrapper) = table_wrapper() { + builder = builder.with_store_params(ObjectStoreParams { + object_store_wrapper: Some(wrapper), + ..Default::default() + }); + } + builder + .load() + .await + .map_err(|e| OmniError::Lance(e.to_string())) +} + +/// Per-method read counts for [`CountingStorageAdapter`]. +#[derive(Debug, Default)] +pub struct StorageReadCounts { + pub read_text: AtomicU64, + pub exists: AtomicU64, + pub read_text_versioned: AtomicU64, + pub list_dir: AtomicU64, +} + +impl StorageReadCounts { + pub fn read_text(&self) -> u64 { + self.read_text.load(Ordering::Relaxed) + } + pub fn exists(&self) -> u64 { + self.exists.load(Ordering::Relaxed) + } + pub fn read_text_versioned(&self) -> u64 { + self.read_text_versioned.load(Ordering::Relaxed) + } + pub fn list_dir(&self) -> u64 { + self.list_dir.load(Ordering::Relaxed) + } +} + +/// Boundary decorator over a [`StorageAdapter`] that counts read-facing calls. +/// Reads delegate after incrementing; writes delegate unchanged. Construct with +/// [`CountingStorageAdapter::new`] and open an engine via +/// `Omnigraph::open_with_storage` to count its non-Lance storage IO. +#[derive(Debug)] +pub struct CountingStorageAdapter { + inner: Arc, + counts: Arc, +} + +impl CountingStorageAdapter { + /// Wrap `inner`, returning the adapter and a shared handle to its counts. + pub fn new(inner: Arc) -> (Arc, Arc) { + let counts = Arc::new(StorageReadCounts::default()); + let adapter: Arc = Arc::new(Self { + inner, + counts: Arc::clone(&counts), + }); + (adapter, counts) + } +} + +#[async_trait] +impl StorageAdapter for CountingStorageAdapter { + async fn read_text(&self, uri: &str) -> Result { + self.counts.read_text.fetch_add(1, Ordering::Relaxed); + self.inner.read_text(uri).await + } + + async fn write_text(&self, uri: &str, contents: &str) -> Result<()> { + self.inner.write_text(uri, contents).await + } + + async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result { + self.inner.write_text_if_absent(uri, contents).await + } + + async fn exists(&self, uri: &str) -> Result { + self.counts.exists.fetch_add(1, Ordering::Relaxed); + self.inner.exists(uri).await + } + + async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { + self.inner.rename_text(from_uri, to_uri).await + } + + async fn delete(&self, uri: &str) -> Result<()> { + self.inner.delete(uri).await + } + + async fn list_dir(&self, dir_uri: &str) -> Result> { + self.counts.list_dir.fetch_add(1, Ordering::Relaxed); + self.inner.list_dir(dir_uri).await + } + + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { + self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed); + self.inner.read_text_versioned(uri).await + } + + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result> { + self.inner + .write_text_if_match(uri, contents, expected_version) + .await + } + + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { + self.inner.delete_prefix(prefix_uri).await + } +} diff --git a/crates/omnigraph/src/lib.rs b/crates/omnigraph/src/lib.rs index ff0b3d6..7dd7135 100644 --- a/crates/omnigraph/src/lib.rs +++ b/crates/omnigraph/src/lib.rs @@ -14,6 +14,7 @@ pub mod error; mod exec; pub mod failpoints; pub mod graph_index; +pub mod instrumentation; pub mod loader; pub mod runtime_cache; pub mod storage; diff --git a/crates/omnigraph/src/runtime_cache.rs b/crates/omnigraph/src/runtime_cache.rs index 84b562a..e85a90a 100644 --- a/crates/omnigraph/src/runtime_cache.rs +++ b/crates/omnigraph/src/runtime_cache.rs @@ -1,6 +1,9 @@ use std::collections::{HashMap, VecDeque}; +use std::hash::Hash; use std::sync::Arc; +use lance::Dataset; +use lance::session::Session; use omnigraph_compiler::catalog::Catalog; use tokio::sync::Mutex; @@ -26,17 +29,15 @@ pub struct RuntimeCache { graph_indices: Mutex, } -#[derive(Debug, Default)] +#[derive(Debug)] struct GraphIndexCache { - entries: HashMap>, - lru: VecDeque, + entries: LruMap>, } impl RuntimeCache { pub async fn invalidate_all(&self) { let mut cache = self.graph_indices.lock().await; - cache.entries.clear(); - cache.lru.clear(); + cache.entries.invalidate_all(); } pub async fn graph_index( @@ -48,7 +49,6 @@ impl RuntimeCache { { let mut cache = self.graph_indices.lock().await; if let Some(index) = cache.entries.get(&key).cloned() { - cache.touch(key.clone()); return Ok(index); } } @@ -62,7 +62,6 @@ impl RuntimeCache { let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?); let mut cache = self.graph_indices.lock().await; if let Some(existing) = cache.entries.get(&key).cloned() { - cache.touch(key); return Ok(existing); } cache.insert(key, Arc::clone(&index)); @@ -72,24 +71,86 @@ impl RuntimeCache { impl GraphIndexCache { fn insert(&mut self, key: GraphIndexCacheKey, value: Arc) { - self.entries.insert(key.clone(), value); - self.touch(key); - while self.entries.len() > 8 { - let Some(oldest) = self.lru.pop_front() else { - break; - }; - if self.entries.remove(&oldest).is_some() { - break; - } + self.entries.insert(key, value); + } + + #[cfg(test)] + fn touch(&mut self, key: GraphIndexCacheKey) { + self.entries.touch(key); + } +} + +#[derive(Debug)] +struct LruMap +where + K: Clone + Eq + Hash, +{ + entries: HashMap, + lru: VecDeque, + cap: usize, +} + +impl LruMap +where + K: Clone + Eq + Hash, +{ + fn new(cap: usize) -> Self { + Self { + entries: HashMap::new(), + lru: VecDeque::new(), + cap, } } - fn touch(&mut self, key: GraphIndexCacheKey) { + fn get(&mut self, key: &K) -> Option<&V> { + if self.entries.contains_key(key) { + self.touch(key.clone()); + self.entries.get(key) + } else { + None + } + } + + fn insert(&mut self, key: K, value: V) { + self.entries.insert(key.clone(), value); + self.touch(key); + while self.entries.len() > self.cap { + let Some(oldest) = self.lru.pop_front() else { + break; + }; + self.entries.remove(&oldest); + } + } + + fn invalidate_all(&mut self) { + self.entries.clear(); + self.lru.clear(); + } + + #[cfg(test)] + fn contains_key(&self, key: &K) -> bool { + self.entries.contains_key(key) + } + + #[cfg(test)] + fn len(&self) -> usize { + self.entries.len() + } + + fn touch(&mut self, key: K) { self.lru.retain(|existing| existing != &key); self.lru.push_back(key); } } +impl Default for GraphIndexCache { + fn default() -> Self { + Self { + entries: LruMap::new(8), + } + } +} + fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey { let mut edge_tables: Vec = catalog .edge_types @@ -114,6 +175,114 @@ fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphI } } +/// Max held `Dataset` handles. A handle holds only Arcs (object store + manifest), +/// never table data, so this is cheap; it bounds how many `(table, branch, +/// version, e_tag)` cells stay warm. One graph's live table set across a couple +/// of branches at the current version fits comfortably, with headroom for the +/// recently-superseded versions left by writes until they age out. +const TABLE_HANDLE_CACHE_CAP: usize = 64; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct TableHandleKey { + table_path: String, + table_branch: Option, + version: u64, + e_tag: Option, +} + +/// Held open-`Dataset` handles keyed by `(table_path, branch, version, e_tag)` — the +/// version-keyed analogue of LanceDB's `DatasetConsistencyWrapper` +/// (`rust/lancedb/src/table/dataset.rs`). A warm read reuses a held handle with +/// zero open IO (a cheap `Dataset` clone); a miss opens once at the location with +/// the shared `Session`. Version plus e_tag are in the key, so a write (or a +/// delete/recreate that reuses a version number on object stores with e_tags) is +/// simply a new key. A same-branch manifest refresh clears this cache as the +/// fallback for e_tag-less table locations. Only read-path Data opens use this — +/// writes open HEAD directly and never receive a pinned handle. +#[derive(Default)] +pub struct TableHandleCache { + inner: Mutex, +} + +struct TableHandleCacheInner { + entries: LruMap, +} + +impl TableHandleCache { + /// Drop all held handles. Correctness never requires this (version-in-key); + /// it is memory hygiene, called from the same hooks that clear the graph + /// index cache (branch switch / refresh). + pub async fn invalidate_all(&self) { + let mut inner = self.inner.lock().await; + inner.entries.invalidate_all(); + } + + /// Return the dataset for `(table_path, branch, version, e_tag)`, reusing a + /// held handle (0 open IO) or opening it once at `location` with the shared + /// `session` on a miss. + pub async fn get_or_open( + &self, + table_path: &str, + table_branch: Option<&str>, + version: u64, + e_tag: Option<&str>, + location: &str, + session: Option<&Arc>, + ) -> Result { + let key = TableHandleKey { + table_path: table_path.to_string(), + table_branch: table_branch.map(str::to_string), + version, + e_tag: e_tag.map(str::to_string), + }; + { + let mut inner = self.inner.lock().await; + if let Some(ds) = inner.entries.get(&key).cloned() { + return Ok(ds); + } + } + // Miss: open without holding the lock (the open is async IO). A concurrent + // double-miss opens twice and one wins the insert — correct (the dataset + // at a version is immutable) and rare. + let ds = crate::instrumentation::open_table_dataset(location, version, session).await?; + let mut inner = self.inner.lock().await; + if let Some(existing) = inner.entries.get(&key).cloned() { + return Ok(existing); + } + inner.insert(key, ds.clone()); + Ok(ds) + } +} + +impl TableHandleCacheInner { + fn insert(&mut self, key: TableHandleKey, value: Dataset) { + self.entries.insert(key, value); + } +} + +impl Default for TableHandleCacheInner { + fn default() -> Self { + Self { + entries: LruMap::new(TABLE_HANDLE_CACHE_CAP), + } + } +} + +/// Per-graph read caches handed to a resolved `Snapshot` so its table opens reuse +/// one shared `Session` (LanceDB's one-session-per-connection pattern) and the +/// held-handle cache. Manual `Debug` because `lance::session::Session` is not +/// `Debug`; this lets `Snapshot` keep its `#[derive(Debug)]`. +pub struct ReadCaches { + pub session: Arc, + pub handles: Arc, +} + +impl std::fmt::Debug for ReadCaches { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReadCaches").finish_non_exhaustive() + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -156,4 +325,21 @@ mod tests { assert!(cache.entries.contains_key(&key(0))); assert!(!cache.entries.contains_key(&key(1))); } + + #[test] + fn lru_map_evicts_oldest_and_touch_refreshes_order() { + let mut map = LruMap::new(2); + map.insert("a", 1); + map.insert("b", 2); + + assert_eq!(map.get(&"a"), Some(&1)); + map.insert("c", 3); + + assert!(map.contains_key(&"a")); + assert!(!map.contains_key(&"b")); + assert!(map.contains_key(&"c")); + + map.invalidate_all(); + assert_eq!(map.len(), 0); + } } diff --git a/crates/omnigraph/tests/branching.rs b/crates/omnigraph/tests/branching.rs index 108702c..bd98c9c 100644 --- a/crates/omnigraph/tests/branching.rs +++ b/crates/omnigraph/tests/branching.rs @@ -548,6 +548,174 @@ async fn branch_merge_records_single_latest_commit_with_two_parents() { ); } +// ── P1: commit-DAG coherence on same-branch writes after an external commit ── +// +// `append_commit` takes a new commit's parent from the coordinator's in-memory +// head (commit_graph head_commit, zero storage read), but `commit_all` rebases +// the MANIFEST from a fresh coordinator. So after an external writer advances +// the branch, a same-branch write on a non-refreshed handle commits a fresh +// manifest version yet appends off the stale head — forking the commit DAG (the +// new commit and the external commit share a parent). Data is unaffected (the +// manifest is the visibility authority); only commit history is malformed. +// P1 refreshes the commit-graph head before the append, so the parent is the +// true current head. These two tests are RED before that fix, GREEN after. + +/// Non-strict insert: the fork is pre-existing (commit_all rebases the manifest +/// regardless of the stale head), independent of Fix 1. +#[tokio::test] +async fn same_branch_insert_after_external_commit_is_linear() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + // Handle A: a long-lived writer whose coordinator head stays pinned at the + // load commit (C0) — it never refreshes before its own write below. + let mut a = init_and_load(&dir).await; + let c0 = CommitGraph::open(uri) + .await + .unwrap() + .head_commit() + .await + .unwrap() + .unwrap(); + + // External writer B advances main: commit C1, parent C0. + let mut b = Omnigraph::open(uri).await.unwrap(); + mutate_main( + &mut b, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "ext_b")], &[("$age", 30)]), + ) + .await + .unwrap(); + let c1 = CommitGraph::open(uri) + .await + .unwrap() + .head_commit() + .await + .unwrap() + .unwrap(); + assert_eq!( + c1.parent_commit_id.as_deref(), + Some(c0.graph_commit_id.as_str()), + "sanity: B's commit C1 should descend from C0" + ); + + // A writes to main WITHOUT refreshing. A's coordinator still thinks the head + // is C0, so a pre-fix append parents the new commit on C0 instead of C1. + mutate_main( + &mut a, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "local_a")], &[("$age", 40)]), + ) + .await + .unwrap(); + + let commits = CommitGraph::open(uri) + .await + .unwrap() + .load_commits() + .await + .unwrap(); + let latest = commits.iter().max_by_key(|c| c.manifest_version).unwrap(); + assert_eq!( + latest.parent_commit_id.as_deref(), + Some(c1.graph_commit_id.as_str()), + "A's same-branch write after an external commit must append off the true \ + head C1, not the stale head C0 (commit-DAG fork)" + ); + let c0_children = commits + .iter() + .filter(|c| c.parent_commit_id.as_deref() == Some(c0.graph_commit_id.as_str())) + .count(); + assert_eq!(c0_children, 1, "C0 must have exactly one child; two is the fork"); +} + +/// Strict update after a read: Fix 1's `refresh_manifest_only` makes the read +/// freshen the read-time pin, defeating the strict 409 that used to force a +/// coherent refresh — so the same stale-head append forks strict ops too. +#[tokio::test] +async fn same_branch_update_after_external_commit_and_read_is_linear() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + // A inserts the row it will later update; this is A's own commit (Ca), so + // A's coordinator head is Ca. + let mut a = init_and_load(&dir).await; + mutate_main( + &mut a, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "target")], &[("$age", 40)]), + ) + .await + .unwrap(); + let ca = CommitGraph::open(uri) + .await + .unwrap() + .head_commit() + .await + .unwrap() + .unwrap(); + + // External writer B advances main: commit Cb, parent Ca. + let mut b = Omnigraph::open(uri).await.unwrap(); + mutate_main( + &mut b, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "ext_b")], &[("$age", 30)]), + ) + .await + .unwrap(); + let cb = CommitGraph::open(uri) + .await + .unwrap() + .head_commit() + .await + .unwrap() + .unwrap(); + assert_eq!(cb.parent_commit_id.as_deref(), Some(ca.graph_commit_id.as_str())); + + // A reads main: the stale-probe path refreshes A's MANIFEST (via + // refresh_manifest_only) but not its commit-graph head, freshening the + // read-time pin so the strict update below skips its 409. + query_main(&mut a, TEST_QUERIES, "total_people", ¶ms(&[])) + .await + .unwrap(); + + // Strict update, no explicit refresh: pre-fix it appends off the stale head + // Ca instead of Cb. + mutate_main( + &mut a, + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "target")], &[("$age", 99)]), + ) + .await + .unwrap(); + + let commits = CommitGraph::open(uri) + .await + .unwrap() + .load_commits() + .await + .unwrap(); + let latest = commits.iter().max_by_key(|c| c.manifest_version).unwrap(); + assert_eq!( + latest.parent_commit_id.as_deref(), + Some(cb.graph_commit_id.as_str()), + "a strict update after an external commit and a local read must append \ + off the true head Cb, not the stale head Ca" + ); + let ca_children = commits + .iter() + .filter(|c| c.parent_commit_id.as_deref() == Some(ca.graph_commit_id.as_str())) + .count(); + assert_eq!(ca_children, 1, "Ca must have exactly one child; two is the fork"); +} + #[tokio::test] async fn branch_merge_records_actor_on_latest_commit() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/tests/forbidden_apis.rs b/crates/omnigraph/tests/forbidden_apis.rs index e079464..667e8c5 100644 --- a/crates/omnigraph/tests/forbidden_apis.rs +++ b/crates/omnigraph/tests/forbidden_apis.rs @@ -71,6 +71,14 @@ const FORBIDDEN_PATTERNS: &[&str] = &[ "Dataset::drop_columns", "Dataset::truncate_table", "Dataset::restore", + // Raw dataset OPENS — all reads must route through `Snapshot::open` (the + // held-handle cache + shared Session, Fix 3). Only the instrumented opener + // (`instrumentation.rs`) and the storage/manifest layers (allow-listed below) + // open datasets directly; forbidding these in the read/exec layer keeps a + // future read from silently bypassing the cache. + "Dataset::open", + "DatasetBuilder::from_uri", + "DatasetBuilder::from_namespace", // Lance-specific method names that don't clash with our `TableStore` // wrappers (we use `merge_insert_batch{,es}`, `add_columns_to_*`, // etc. — never the bare Lance names). Engine code that writes @@ -106,6 +114,7 @@ const ALLOW_LIST_FILES: &[&str] = &[ "commit_graph.rs", // Maintains `_graph_commits.lance` system table. "graph_coordinator.rs", // Drives the manifest publisher / branch coordinator. "recovery_audit.rs", // Maintains `_graph_commit_recoveries.lance` (recovery audit trail). + "instrumentation.rs", // The instrumented dataset opener (open_dataset_tracked / open_table_dataset). ]; /// Directories exempt from the guard. Files under these paths may use diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index 6476e1a..e690839 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -166,6 +166,21 @@ pub async fn mutate_branch( db.mutate(branch, query_source, query_name, params).await } +/// Advance the manifest version `n` times (one commit per insert), building +/// deep commit history for cost-budget tests (history depth, not row count). +pub async fn commit_many(db: &mut Omnigraph, n: usize) { + for i in 0..n { + mutate_main( + db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", &format!("commit_many_{i}"))], &[("$age", 30)]), + ) + .await + .unwrap(); + } +} + pub async fn snapshot_main(db: &Omnigraph) -> Result { db.snapshot_of(ReadTarget::branch("main")).await } diff --git a/crates/omnigraph/tests/warm_read_cost.rs b/crates/omnigraph/tests/warm_read_cost.rs new file mode 100644 index 0000000..d7fc52a --- /dev/null +++ b/crates/omnigraph/tests/warm_read_cost.rs @@ -0,0 +1,833 @@ +//! Cost-budget tests for the warm read path (Fix 1): a warm same-branch read +//! must perform no manifest or commit-graph opens, measured with Lance's +//! `IOTracker` at the object-store boundary (the LanceDB IO-counted-test +//! pattern; see docs/dev/testing.md). Guards invariant 15 (read cost bounded by +//! work, not history) for snapshot resolution, and invariant 6 (a warm reader +//! still observes external commits). + +mod helpers; + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use arrow_array::{Array, StringArray}; +use lance::io::WrappingObjectStore; +use lance_io::utils::tracking_store::IOTracker; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes}; +use omnigraph_compiler::result::QueryResult; + +use helpers::{ + MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params, + mutate_branch, mutate_main, params, +}; + +/// IO probes plus the tracker handles to read `read_iops` after the query. +/// Returns `(probes, manifest, commit_graph, table, probe_count)` — `table` +/// counts per-table data opens (the cache-miss path), so a cost test can assert +/// N opens on a cold read and 0 on a warm repeat (Fix 3). +fn probes() -> ( + QueryIoProbes, + IOTracker, + IOTracker, + IOTracker, + Arc, +) { + let manifest = IOTracker::default(); + let commit_graph = IOTracker::default(); + let table = IOTracker::default(); + let probe_count = Arc::new(AtomicU64::new(0)); + let probes = QueryIoProbes { + manifest_wrapper: Some(Arc::new(manifest.clone()) as Arc), + commit_graph_wrapper: Some(Arc::new(commit_graph.clone()) as Arc), + table_wrapper: Some(Arc::new(table.clone()) as Arc), + probe_count: Arc::clone(&probe_count), + }; + (probes, manifest, commit_graph, table, probe_count) +} + +fn first_column_strings(result: &QueryResult) -> Vec { + if result.num_rows() == 0 { + return Vec::new(); + } + let batch = result.concat_batches().unwrap(); + let values = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut out = (0..values.len()) + .filter(|&row| !values.is_null(row)) + .map(|row| values.value(row).to_string()) + .collect::>(); + out.sort(); + out +} + +/// A warm same-branch read must not re-open or scan `__manifest`, and must not +/// open the commit graph, even at commit-history depth. The only manifest IO is +/// the version probe (counted by invocation). Fails before Fix 1, where the read +/// path re-opens a fresh coordinator and scans both internal tables. +#[tokio::test] +async fn warm_same_branch_read_does_no_resolution_opens() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + // Deep history: warm-read resolution cost must be flat in commit count. + commit_many(&mut db, 20).await; + + let (probes_in, manifest, commit_graph, _table, probe_count) = probes(); + with_query_io_probes( + probes_in, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ), + ) + .await + .unwrap(); + + // A warm same-branch read opens nothing from the internal tables, even at + // commit-history depth. Fix 1 reuses the coordinator (no re-open: 0 + // commit-graph opens, exactly 1 cheap version probe). Fix 2 opens the touched + // data table by location+version instead of via the namespace, so the + // per-table __manifest scan is gone too. Pre-fix, each of these is a deep scan + // of an internal table that grows with commit count. + assert_eq!( + manifest.stats().read_iops, + 0, + "warm same-branch read must not scan __manifest (resolution or per-table)" + ); + assert_eq!( + commit_graph.stats().read_iops, + 0, + "warm same-branch read must not open the commit graph (no coordinator re-open)" + ); + assert_eq!( + probe_count.load(Ordering::Relaxed), + 1, + "warm same-branch read performs exactly one version probe" + ); +} + +/// A multi-table query (a traversal touching Person, WorksAt, and Company) scans +/// `__manifest` zero times. Fix 2 opens every touched table by location+version, +/// so manifest IO no longer scales with the number of tables — pre-Fix-2 each +/// table cost two full `__manifest` scans (`describe_table` + +/// `describe_table_version`), which is the "2 tables = 2×" multi-table tax. +#[tokio::test] +async fn multi_table_query_does_no_manifest_scans() { + let dir = tempfile::tempdir().unwrap(); + let db = init_and_load(&dir).await; + + let (probes_in, manifest, _commit_graph, _table, _probe) = probes(); + with_query_io_probes( + probes_in, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "age_stats", + ¶ms(&[]), + ), + ) + .await + .unwrap(); + + assert_eq!( + manifest.stats().read_iops, + 0, + "a multi-table read must not scan __manifest once per touched table" + ); +} + +/// A warm reader must observe a commit made through another handle (invariant 6, +/// strong consistency): the version probe detects the advance and refreshes. +/// Passes before and after Fix 1 (today's cold re-read is always fresh); a +/// regression guard so the warm-reuse fast path never serves a stale read. +#[tokio::test] +async fn external_commit_observed_by_warm_reader() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let reader = Omnigraph::open(uri).await.unwrap(); + + let before = count_rows(&reader, "node:Person").await; + + // External commit through a separate handle. + mutate_main( + &mut writer, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "ext_new_person")], &[("$age", 41)]), + ) + .await + .unwrap(); + + let after = count_rows(&reader, "node:Person").await; + assert_eq!( + after, + before + 1, + "warm reader must observe an external commit" + ); +} + +// ── Finding A: drop the redundant per-query schema validation ───────────────── +// +// Every query runs `ensure_schema_state_valid`. It ran TWICE per query (once in +// query()/run_query_at, once again in resolved_target/snapshot_at_version), each +// reading 3 contract files + 2 existence probes (~10 storage ops). Finding A +// removes the redundant caller, so validation runs once. (A cheaper source-only +// probe was rejected: the codebase requires per-call detection of IR/state drift +// on long-lived handles -- lifecycle::long_lived_handle_rejects_schema_ir_drift +// -- which a source-only compare would miss.) Measured at the StorageAdapter +// boundary with the counting decorator. + +/// A warm query validates the schema contract exactly once (3 reads + 2 exists), +/// not twice. Fails before finding A, where query() and resolved_target each +/// validate (6 read_text + 4 exists). +#[tokio::test] +async fn warm_query_validates_schema_contract_once() { + use omnigraph::instrumentation::CountingStorageAdapter; + use omnigraph::storage::storage_for_uri; + + let dir = tempfile::tempdir().unwrap(); + // Init through the standard path, then re-open behind a counting adapter to + // measure the per-query schema-contract storage reads (delta around the + // query excludes open-time reads). + let _ = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let (adapter, counts) = CountingStorageAdapter::new(storage_for_uri(uri).unwrap()); + let db = Omnigraph::open_with_storage(uri, adapter).await.unwrap(); + + let before_read_text = counts.read_text(); + let before_exists = counts.exists(); + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ) + .await + .unwrap(); + + assert_eq!( + counts.read_text() - before_read_text, + 3, + "warm query should validate the schema contract once (3 reads), not twice" + ); + assert_eq!( + counts.exists() - before_exists, + 2, + "warm query should probe contract-file existence once (2 probes), not twice" + ); +} + +/// The cheap source-compare must still detect that the on-disk schema source has +/// drifted from the validated contract and fail the read, rather than serving the +/// stale-but-cached schema. Passes before and after finding A (regression guard +/// for the documented weaker per-query guard). +#[tokio::test] +async fn schema_source_drift_is_caught_on_read() { + let dir = tempfile::tempdir().unwrap(); + let _writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let reader = Omnigraph::open(uri).await.unwrap(); + + // Drift the on-disk schema source behind the reader's back. + std::fs::write( + dir.path().join("_schema.pg"), + "this is not a valid schema {{{", + ) + .unwrap(); + + let result = reader + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ) + .await; + assert!( + result.is_err(), + "a query must fail when the on-disk schema source has drifted from the validated contract" + ); +} + +// ── Morphological-matrix coverage: branch-warm + stale-refresh cells ────────── + +/// A WARM read on a non-main branch (handle synced to that branch) also scans +/// `__manifest` zero times. Exercises Fix 2's branch-owned-table open +/// (`{table_path}/tree/{branch}` + with_version) on Fix 1's warm path — the cell +/// that regressed when the open used `with_branch` against the base. +#[tokio::test] +async fn warm_branch_read_does_no_manifest_scans() { + let dir = tempfile::tempdir().unwrap(); + let db = init_and_load(&dir).await; + db.branch_create("feature").await.unwrap(); + // Write to the branch so its tables are branch-owned (under tree/feature). + db.mutate( + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + // Bind the handle's coordinator to the branch so reads of it take the warm path. + db.sync_branch("feature").await.unwrap(); + + let (probes_in, manifest, commit_graph, _table, probe_count) = probes(); + with_query_io_probes( + probes_in, + db.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ), + ) + .await + .unwrap(); + + assert_eq!( + manifest.stats().read_iops, + 0, + "warm branch read must not scan __manifest (branch-owned table opened by location)" + ); + assert_eq!( + commit_graph.stats().read_iops, + 0, + "warm branch read must not open the commit graph" + ); + assert_eq!( + probe_count.load(Ordering::Relaxed), + 1, + "warm branch read performs exactly one version probe" + ); +} + +/// A non-main branch can be deleted and recreated at the same Lance version +/// number. Warm branch freshness therefore needs the manifest incarnation, not +/// just `version()`, or a reader pinned to the old incarnation can serve stale +/// rows from the deleted branch. This is the correctness guard for Phase 6A. +#[tokio::test] +async fn warm_read_on_recreated_branch_observes_new_incarnation() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let reader = Omnigraph::open(uri).await.unwrap(); + + writer.branch_create("feature").await.unwrap(); + mutate_branch( + &mut writer, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + + reader.sync_branch("feature").await.unwrap(); + let old_feature = reader + .query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "Eve")]), + ) + .await + .unwrap(); + assert_eq!( + old_feature.num_rows(), + 1, + "test setup: old feature branch must contain Eve" + ); + let old_version = reader + .version_of(ReadTarget::branch("feature")) + .await + .unwrap(); + + writer.branch_delete("feature").await.unwrap(); + mutate_main( + &mut writer, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "MainOnly")], &[("$age", 44)]), + ) + .await + .unwrap(); + writer.branch_create("feature").await.unwrap(); + let new_version = writer + .version_of(ReadTarget::branch("feature")) + .await + .unwrap(); + assert_eq!( + new_version, old_version, + "test setup must exercise branch incarnation reuse at one Lance version" + ); + + let (probes_in, manifest, commit_graph, _table, probe_count) = probes(); + let new_feature = with_query_io_probes( + probes_in, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "MainOnly")]), + ), + ) + .await + .unwrap(); + + assert_eq!( + new_feature.num_rows(), + 1, + "warm reader must refresh to the recreated branch incarnation" + ); + assert!( + manifest.stats().read_iops > 0, + "recreated branch must re-read the manifest after the incarnation probe" + ); + assert_eq!( + commit_graph.stats().read_iops, + 0, + "same-branch incarnation refresh must be manifest-only" + ); + assert_eq!( + probe_count.load(Ordering::Relaxed), + 2, + "stale same-branch read probes once under the read lock and once under the write lock" + ); +} + +/// Recreated non-main branches can reuse the same branch-owned table version. +/// This forces the held table-handle cache to distinguish incarnations by the +/// per-table Lance manifest e_tag, not just `(table_path, branch, version)`. +#[tokio::test] +async fn recreated_branch_owned_table_handle_uses_table_etag() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let reader = Omnigraph::open(uri).await.unwrap(); + + writer.branch_create("feature").await.unwrap(); + mutate_branch( + &mut writer, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "OldOnly")], &[("$age", 31)]), + ) + .await + .unwrap(); + + reader.sync_branch("feature").await.unwrap(); + let old_person = reader + .query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "OldOnly")]), + ) + .await + .unwrap(); + assert_eq!(old_person.num_rows(), 1); + let old_entry = reader + .snapshot_of(ReadTarget::branch("feature")) + .await + .unwrap() + .entry("node:Person") + .unwrap() + .clone(); + assert_eq!(old_entry.table_branch.as_deref(), Some("feature")); + + writer.branch_delete("feature").await.unwrap(); + writer.branch_create("feature").await.unwrap(); + mutate_branch( + &mut writer, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "NewOnly")], &[("$age", 32)]), + ) + .await + .unwrap(); + let new_entry = writer + .snapshot_of(ReadTarget::branch("feature")) + .await + .unwrap() + .entry("node:Person") + .unwrap() + .clone(); + assert_eq!(new_entry.table_path, old_entry.table_path); + assert_eq!(new_entry.table_branch, old_entry.table_branch); + assert_eq!( + new_entry.table_version, old_entry.table_version, + "test setup must force table handle identity to differ only by e_tag" + ); + + let (probes_in, manifest, commit_graph, table, probe_count) = probes(); + let new_person = with_query_io_probes( + probes_in, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "NewOnly")]), + ), + ) + .await + .unwrap(); + assert_eq!( + new_person.num_rows(), + 1, + "warm reader must open the recreated branch-owned table incarnation" + ); + assert!( + table.stats().read_iops > 0, + "table e_tag must force a held-handle cache miss for the recreated table" + ); + assert!( + manifest.stats().read_iops > 0, + "recreated branch must refresh the manifest" + ); + assert_eq!( + commit_graph.stats().read_iops, + 0, + "same-branch table-incarnation refresh must be manifest-only" + ); + assert_eq!( + probe_count.load(Ordering::Relaxed), + 2, + "stale same-branch read probes once under each lock" + ); + + let stale_old_person = reader + .query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "OldOnly")]), + ) + .await + .unwrap(); + assert_eq!( + stale_old_person.num_rows(), + 0, + "old branch-owned table contents must not leak after branch recreation" + ); +} + +/// The graph-index cache is keyed by synthetic snapshot id plus edge-table +/// state. A recreated branch can reuse the same edge table `(branch, version)`, +/// so the synthetic snapshot id must carry the manifest incarnation or traversal +/// can reuse stale topology. +#[tokio::test] +async fn recreated_branch_traversal_uses_graph_index_incarnation() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let reader = Omnigraph::open(uri).await.unwrap(); + + writer.branch_create("feature").await.unwrap(); + mutate_branch( + &mut writer, + "feature", + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params( + &[("$name", "OldWalker"), ("$friend", "Alice")], + &[("$age", 41)], + ), + ) + .await + .unwrap(); + + reader.sync_branch("feature").await.unwrap(); + let old_friends = reader + .query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "OldWalker")]), + ) + .await + .unwrap(); + assert_eq!(first_column_strings(&old_friends), vec!["Alice"]); + let old_edge_entry = reader + .snapshot_of(ReadTarget::branch("feature")) + .await + .unwrap() + .entry("edge:Knows") + .unwrap() + .clone(); + assert_eq!(old_edge_entry.table_branch.as_deref(), Some("feature")); + + writer.branch_delete("feature").await.unwrap(); + writer.branch_create("feature").await.unwrap(); + mutate_branch( + &mut writer, + "feature", + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params( + &[("$name", "NewWalker"), ("$friend", "Bob")], + &[("$age", 42)], + ), + ) + .await + .unwrap(); + let new_edge_entry = writer + .snapshot_of(ReadTarget::branch("feature")) + .await + .unwrap() + .entry("edge:Knows") + .unwrap() + .clone(); + assert_eq!(new_edge_entry.table_path, old_edge_entry.table_path); + assert_eq!(new_edge_entry.table_branch, old_edge_entry.table_branch); + assert_eq!( + new_edge_entry.table_version, old_edge_entry.table_version, + "test setup must force graph-index identity to differ only by snapshot incarnation" + ); + + let (probes_in, manifest, commit_graph, _table, probe_count) = probes(); + let new_friends = with_query_io_probes( + probes_in, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "NewWalker")]), + ), + ) + .await + .unwrap(); + assert_eq!( + first_column_strings(&new_friends), + vec!["Bob"], + "traversal must use the recreated branch's topology, not stale cached graph index" + ); + assert!( + manifest.stats().read_iops > 0, + "recreated branch traversal must refresh the manifest" + ); + assert_eq!( + commit_graph.stats().read_iops, + 0, + "same-branch traversal incarnation refresh must be manifest-only" + ); + assert_eq!( + probe_count.load(Ordering::Relaxed), + 2, + "stale same-branch read probes once under each lock" + ); + + let stale_old_friends = reader + .query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "OldWalker")]), + ) + .await + .unwrap(); + assert_eq!( + first_column_strings(&stale_old_friends), + Vec::::new(), + "old branch topology must not leak after branch recreation" + ); +} + +/// When an external writer advances the manifest, the reader's next query takes +/// the STALE path: it re-reads the manifest (read_iops > 0) but never scans the +/// commit graph (`refresh_manifest_only`), unlike a full coordinator refresh. +/// Pins Fix 1's manifest-only refresh. +#[tokio::test] +async fn stale_read_refreshes_manifest_only() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + let reader = Omnigraph::open(uri).await.unwrap(); + // Establish the reader's warm coordinator. + reader + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ) + .await + .unwrap(); + + // External commit advances the on-disk manifest behind the reader. + mutate_main( + &mut writer, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Frank")], &[("$age", 33)]), + ) + .await + .unwrap(); + + let (probes_in, manifest, commit_graph, _table, probe_count) = probes(); + with_query_io_probes( + probes_in, + reader.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ), + ) + .await + .unwrap(); + + assert!( + manifest.stats().read_iops > 0, + "stale read must re-read the manifest" + ); + assert_eq!( + commit_graph.stats().read_iops, + 0, + "stale refresh must be manifest-only (no commit-graph scan)" + ); + assert_eq!( + probe_count.load(Ordering::Relaxed), + 2, + "stale same-branch read probes once under the read lock and once under the write lock" + ); +} + +// ── Fix 3: held-handle cache — warm repeat reads stop re-opening tables ──────── +// +// After Fix 1+2 a warm same-branch read still re-opened every touched table per +// query (the "never warms up" residual). Fix 3 holds the open `Dataset` per +// `(table, branch, version, e_tag)` (the version-keyed analogue of LanceDB's +// `DatasetConsistencyWrapper`) and shares one `Session` per graph, so a second +// identical warm read reuses the handle with zero table opens. + +/// Headline: a second identical warm same-branch read does ZERO table opens +/// (the cold first read opens; the warm repeat serves from the held-handle +/// cache). Fails before Fix 3, where every read re-opens the table. +#[tokio::test] +async fn repeat_warm_read_reuses_table_handles() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + // Deep history: the win must hold regardless of commit count. + commit_many(&mut db, 10).await; + + // Cold first read: opens the touched table. + let (p1, _m1, _c1, table1, _pr1) = probes(); + with_query_io_probes( + p1, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ), + ) + .await + .unwrap(); + assert!( + table1.stats().read_iops > 0, + "the cold first read must open the table" + ); + + // Warm repeat: the held handle is reused, so no open happens through this + // query's table wrapper. + let (p2, manifest2, commit_graph2, table2, probe2) = probes(); + with_query_io_probes( + p2, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ), + ) + .await + .unwrap(); + assert_eq!( + table2.stats().read_iops, + 0, + "a warm repeat read must reuse the held handle (0 table opens)" + ); + assert_eq!( + manifest2.stats().read_iops, + 0, + "warm repeat read: 0 manifest opens" + ); + assert_eq!( + commit_graph2.stats().read_iops, + 0, + "warm repeat read: 0 commit-graph opens" + ); + assert_eq!( + probe2.load(Ordering::Relaxed), + 1, + "warm repeat read: exactly one version probe" + ); +} + +/// A write advances the table's version, so the next read misses the +/// version-keyed cache and re-opens — never serving a stale handle (invariant 6 +/// for the cached path). Passes with or without the cache; a correctness guard +/// that the cache cannot serve pre-write data. +#[tokio::test] +async fn write_invalidates_table_cache_for_changed_table() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let before = count_rows(&db, "node:Person").await; + + // Warm the cache for Person. + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ) + .await + .unwrap(); + + // Write Person: its version advances, so the cached (table, branch, version) + // key is now superseded. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "cache_miss_one")], &[("$age", 50)]), + ) + .await + .unwrap(); + + // The next read re-opens Person at the new version (cache miss). + let (p, _m, _c, table, _pr) = probes(); + with_query_io_probes( + p, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "total_people", + ¶ms(&[]), + ), + ) + .await + .unwrap(); + assert!( + table.stats().read_iops > 0, + "a read after a write to the table must re-open it (version-keyed miss)" + ); + + let after = count_rows(&db, "node:Person").await; + assert_eq!( + after, + before + 1, + "the post-write read observes the new row (no stale handle served)" + ); +} diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 2fa87d1..eb6821a 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -53,7 +53,13 @@ converge the physical state. versioning, fragments, branches, compaction, cleanup, and index primitives. DataFusion should own relational execution where it fits. Do not add custom WALs, transaction managers, buffer pools, page formats, or local clones of - substrate behavior. Read [lance.md](lance.md) before guessing. + substrate behavior. Read [lance.md](lance.md) before guessing. Respecting the + substrate also means *using* it idiomatically, not only refraining from + rebuilding it: reuse long-lived handles instead of re-opening per call, + resolve latest state through the substrate's cheap primitive instead of + re-scanning, and share its caches/session. Re-deriving per call what the + substrate keeps warm is a substrate violation even when no code is + reimplemented. 2. **Graph visibility is manifest-atomic.** Lance commits are per dataset. OmniGraph's graph-level atomicity comes from publishing one manifest update @@ -126,6 +132,18 @@ converge the physical state. a substitute for missing lower-level assertions. Read [testing.md](testing.md) before adding tests. +15. **One source of truth, cheaply derived.** Lance and the manifest are the + source of truth. Everything the engine needs at runtime is a derived view of + them: read or projected on demand, held warm, refreshed by a cheap probe. Two + failure modes are forbidden. A *parallel copy* the engine maintains can drift + from the source, and that divergence compounds over time. *Cold + re-derivation* rebuilds the view from the full source on every call, so its + cost grows with history. Invariants 1 and 7, and the deny-list "state that + drifts" and "manifest-derivable reconciler" items, are instances; so is + bounding a read's cost to its working set rather than the commit count. This + is the structural face of "engineering is programming integrated over time": + both failure modes are liabilities that compound as the system grows. + ## Current Truth Matrix | Area | Current state | Source | @@ -252,6 +270,37 @@ them explicit. - **Resource bounds:** some operations still lack enforced per-query memory or time budgets. New long-running work should add explicit bounds rather than widening the gap. +- **Read-path re-derivation (largely closed by the query-latency work):** + snapshot resolution used to re-open a fresh coordinator per read (a full + `__manifest` re-scan plus two commit-graph scans), open each table through the + namespace (two more `__manifest` scans per table), validate the schema twice, + and share no Lance `Session`. That was an O(commits) cost that never warmed up. + Fix 1 (warm coordinator reuse behind a `latest_version_id` probe), Fix 2 (open + tables by location+version), finding A (validate once), and Fix 3 (a held + `Dataset`-handle cache keyed by `(table, branch, version, e_tag when Lance + exposes it)` plus one shared `Session` per graph) remove that tax: a warm + same-branch read does one probe, one schema read, and zero opens on a repeat. + Non-main branch freshness compares the manifest incarnation (`version` plus + manifest-location e_tag when available, otherwise Lance manifest timestamp), + because Lance branch names can be deleted/recreated at the same version number; + the manifest e_tag is carried into synthetic snapshot ids when available, and + a detected same-branch manifest refresh clears read caches as the fallback for + e_tag-less table locations/topology. Remaining: the internal metadata tables + (`__manifest`, `_graph_commits`) are still not compacted, so the probe and + refresh cost still grows with fragment count on a long-lived graph (the + `optimize`-covers-internal-tables follow-up); the commit graph is not yet + reconcilable from the manifest; and the traversal id-map is still rebuilt. +- **Commit-graph parent under concurrency:** `record_graph_commit` now refreshes + the commit-graph head from storage before appending, so a same-branch write + after an external commit no longer forks the commit DAG by parenting off a + stale cached head (the single-process fork, pre-existing for non-strict + inserts and widened to strict ops by Fix 1's `refresh_manifest_only`, is now + closed). Residual: two processes writing disjoint tables can still pass their + per-table manifest CAS and append off the same parent (a refresh-then-append + TOCTOU). The convergent fix is reconcile-from-manifest (parent = the commit at + the manifest version the publisher CAS'd against; `manifest_version` is on + every commit row), composing with the manifest-to-commit-graph atomicity gap; + it needs commit-graph append ordering or a Lance append-CAS to fully close. ## Deny-list @@ -277,6 +326,10 @@ case is exceptional. - Cost-blind plan choice when statistics are available or required. - Hidden statistics for behavior that affects planning or operator choice. - Hash-map iteration order in result ordering, plan choice, or migration output. +- Cold re-derivation on the hot path: rebuilding from the full source what could + be held warm and refreshed cheaply, so cost scales with history rather than the + working set (the cost face of invariant 15; "state that drifts" above is its + shadow-copy face). - String-flattened SQL/filter generation when a structured pushdown API is available. - Eager multi-hop cross-product materialization when factorization fits. @@ -313,6 +366,8 @@ Use this as yes/no/NA for any non-trivial design or PR: - Are stats/capabilities exposed when behavior depends on them? - Are existing known gaps left no worse and documented if touched? - Does the test live at the same boundary as the change? +- Is this operation's cost bounded with respect to history and scale, or does it + re-derive warm state from cold storage per call? - Does the change avoid every deny-list pattern, or justify the exception? ## Maintenance Policy diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 8d6a305..6a62580 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -23,8 +23,9 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `merge_truth_table.rs` | Merge-pair truth table (MR-786): all 9×9 `(left_op, right_op)` cells from `{noop, addNode, removeNode, addEdge, removeEdge, setProperty, dropProperty, addLabel, removeLabel}`. Adding a new op to `OpVariant` forces a compile error in `build_case` until the new row + column are dispositioned. 36 executable cells run through real `branch_merge` with a structured oracle (`MergeOutcome` / `MergeConflictKind` + graph-state assert); 45 cells involving `dropProperty`/`addLabel`/`removeLabel` are recorded as `Unsupported` until the mutation grammar grows. | | `writes.rs` | Direct-publish writes: cancellation, non-strict insert/merge rebase under the per-table queue, strict stale-write conflicts, multi-statement atomicity, MR-794 staged-write rewire (D₂ rejection, insert+update coalesce, multi-append coalesce, partial-failure recovery, load RI/cardinality recovery) | | `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead | -| `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs; `// forbidden-api-allow: ` sentinel exempts reviewed lines | +| `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: ` sentinel exempts reviewed lines | | `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands | +| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by synthetic snapshot-id incarnation or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below | | `lifecycle.rs` | Graph lifecycle, schema state | | `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) | | `changes.rs` | `diff_between` / `diff_commits` | @@ -125,5 +126,14 @@ When you pick up any change, walk through this: 6. **For substrate-touching changes** (Lance behavior), reach for `failpoints` or fixture-driven scenarios, not stubbed-out mocks. 7. **For server / API changes**, confirm the OpenAPI regeneration happens in `openapi.rs` and that the diff lands in `openapi.json`. 8. **Verify your change makes an existing test fail before it makes the new one pass.** If you can break the code without breaking a test, your coverage gap is the problem to fix first. +9. **Bound hot-path cost at history depth.** If the change touches a read or open path, add or extend a test that asserts a *bounded* cost (e.g. a warm same-branch read performs zero `Dataset::open`, or a fixed object-op count) against a fixture with realistic *commit-history depth*, not just realistic row counts. Cost that scales with history is invisible on a shallow fixture and only bites in production. See "Cost-budget tests" below. + +## Cost-budget tests: bound hot-path cost at history depth + +Correctness bugs fail loudly in tests; cost-scaling bugs pass every test and degrade silently in production. The engine read path historically had no cost assertion, and fixtures carry shallow commit history, so an O(commits)-per-query cost stayed green in CI and only surfaced on a long-lived graph (read snapshot resolution re-scanned the internal manifest and commit-graph tables on every query, and those tables were never compacted). Guard against the class: + +- **Assert a cost budget, not just a result.** For a read/open path, assert the number of `Dataset::open` calls (or object-store ops) a warm query performs, and that it does not grow with commit count. The reference is LanceDB's IO-counted tests, which assert a cached read costs 0-1 IO and carry a named regression test against "a list call on every subsequent query." +- **Test at history depth.** Build a fixture with many *commits* (not many rows) and assert warm-read cost is flat across depths. A shallow fixture cannot catch an O(commits) cost. +- This is the testing companion to invariant 15 in [docs/dev/invariants.md](invariants.md) (hot-path cost is bounded by work, not history). When in doubt, re-read [docs/dev/invariants.md](invariants.md) — quality gates apply to every change.