perf(engine): remove the per-query metadata re-derivation tax on warm reads (#268)

* test(engine): add read-path IO instrumentation seam for warm-read cost tests

Prerequisite seam for the query-latency fixes. Adds
crates/omnigraph/src/instrumentation.rs:

- CountingStorageAdapter: a StorageAdapter decorator counting per-method
  reads (read_text/exists/read_text_versioned/list_dir), for the
  schema-contract reads on the query path.
- A per-query task-local (QueryIoProbes) carrying Lance WrappingObjectStore
  wrappers per open category plus a probe counter, delivered via
  with_query_io_probes. open_dataset_tracked attaches the wrapper so the
  open itself is counted (ObjectStoreParams.object_store_wrapper).

Wires the wrappers into the manifest open (open_manifest_dataset) and the
commit-graph opens (CommitGraph::open/open_at_branch). Production leaves
the task-local unset, so nothing attaches.

Makes Omnigraph::open_with_storage public so tests can inject the counting
adapter. lance-io is a dev-dependency (IOTracker named only in tests). No
runtime behavior change.

* test(engine): warm same-branch read should reuse the coordinator (red)

Cost-budget test using Lance IOTracker at the object-store boundary (the
LanceDB IO-counted-test pattern). On a 20-commit-deep graph, a warm
same-branch query re-opens a fresh coordinator, which opens both the commit
graph and __manifest. Asserts the read opens the commit graph zero times
and performs exactly one cheap version probe; today it does neither (it
scans the commit graph on re-open and never probes). The freshness guard
already passes. Adds the commit_many helper for history-depth fixtures.

Red half of the Fix 1 red->green pair; turns green with the next commit.

* perf(engine): same-branch reads reuse the warm coordinator (Fix 1)

query()/resolved_target re-opened a fresh GraphCoordinator from storage on
every read (full __manifest scan + two commit-graph scans), so a warm
read's cost grew with commit history (invariant 15) though the data was
unchanged.

resolved_target now serves same-branch reads from the warm in-memory
coordinator, gated by a cheap version probe (latest_version_id, one
object-store op) instead of a full re-open:
- fresh (probe == cached version): return the in-memory snapshot under the
  read lock, with a synthetic (branch, version) id and no commit-graph
  access (reads pin the snapshot by manifest version, not the commit DAG;
  invariant 2).
- stale: take the write lock, re-probe (double-checked; tokio RwLock has no
  read->write upgrade), then refresh_manifest_only (no commit-graph scan),
  preserving strong consistency for external writers (invariant 6).
Cross-branch and snapshot targets keep the existing cold-resolve path.

Adds ManifestCoordinator/GraphCoordinator::probe_latest_version and
GraphCoordinator::refresh_manifest_only. Nothing on the read path needs a
real commit ULID (only RuntimeCache keys on the id, where synthetic is
consistent), per a caller audit.

A warm same-branch read on a 20-commit graph now does zero commit-graph
opens and exactly one probe (down from a deep commit-graph scan) and still
observes external commits. The residual per-table __manifest scans are
removed later by Fix 2.

* test(engine): warm query should validate the schema contract once (red)

ensure_schema_state_valid runs twice per query (query()/run_query_at AND
resolved_target/snapshot_at_version), each reading 3 contract files + 2
existence probes. A warm query thus does 6 read_text + 4 exists where one
validation (3 + 2) suffices, measured via CountingStorageAdapter. Adds a
drift guard (schema_source_drift_is_caught_on_read) that already passes.

Red half of the finding-A red->green pair.

* perf(engine): validate the schema contract once per query (finding A)

ensure_schema_state_valid ran on every query AND again inside
resolved_target / snapshot_at_version, so each query validated the schema
contract twice (~10 storage ops). Removes the redundant query()/
run_query_at() calls; the validation inside resolved_target /
snapshot_at_version still runs, so drift is detected exactly as before.

A source-only fast path was rejected: a long-lived handle must detect
external drift of the schema source, IR, OR state on its next operation
(lifecycle::long_lived_handle_rejects_schema_*), which a source-only
compare would miss. So the only safe latency win is not validating twice.

A warm query now does one validation (3 read_text + 2 exists) instead of
two (6 + 4).

* test(engine): warm + multi-table reads should do zero manifest scans (red)

After Fix 1 a warm same-branch read still scans __manifest ~44 times at
20-commit depth: not from resolution (Fix 1 removed that) but from the
per-table open path, which routes through the Lance namespace and full-scans
__manifest twice per touched table (describe_table + describe_table_version).
Tightens the warm test to assert manifest read_iops == 0 and adds a
multi-table (traversal) test asserting the same, pinning the "2 tables = 2x"
tax. Red half of the Fix 2 red->green pair.

* perf(engine): open touched tables by location+version, not via the namespace (Fix 2)

SubTableEntry::open routed every read-path table open through
DatasetBuilder::from_namespace(BranchManifestNamespace), whose describe_table
full-scans __manifest and, with managed_versioning, makes Lance scan again
(describe_table_version) -- two full __manifest scans per touched table. That
was the residual that made warm-read manifest IO grow with history and the
'2 tables = 2x' multi-table tax.

The resolved Snapshot already holds each table's path/version/branch, so open
directly: from_uri(table_uri_for_path(root, path, branch)).with_version(v).
The branch-qualified location is the dataset that physically holds the version
(main: {path}; branch: {path}/tree/{branch}, Lance native-branch storage), and
with_version resolves it within THAT dataset's _versions. 0 namespace calls +
1 HEAD via the native ConditionalPutCommitHandler.

The read namespace (BranchManifestNamespace) is now unused in production
(writes use StagedTableNamespace), so it, its constructor, and the helpers only
it used (to_namespace_version, publish_requests, their imports) are gated
#[cfg(test)] -- retained to validate the namespace contract in unit tests.
Removes the dead open_table_at_version_from_manifest.

Warm same-branch + multi-table reads now scan __manifest zero times; branch +
time-travel reads stay correct (branching.rs, point_in_time.rs, 2 lib
regression tests); production-lib warnings unchanged (baseline).

* test(engine): cost-budget coverage for branch-warm and stale-refresh reads (matrix)

Extends the read-path cost-budget tests across more of the morphological matrix:
- warm_branch_read_does_no_manifest_scans: a warm read on a non-main branch
  (handle synced to it) scans __manifest zero times, exercising Fix 2's
  branch-owned-table open (tree/{branch} + with_version) on Fix 1's warm path --
  the cell that regressed when the open used with_branch against the base.
- stale_read_refreshes_manifest_only: an external commit makes the next read
  take the stale path, which re-reads the manifest (read_iops > 0) but never
  scans the commit graph (refresh_manifest_only), pinning Fix 1's manifest-only
  refresh.

Cold paths (cross-branch, time-travel) stay behavior-covered (branching.rs,
point_in_time.rs) and are cold by design (Fix 1 warm-paths only same-branch), so
there is no manifest==0 contract to assert there.

* test(engine): same-branch write after external commit must not fork the commit DAG (red)

* fix(engine): refresh commit-graph head before append to prevent same-branch DAG fork

A same-branch write that follows an external commit committed a fresh manifest
version (commit_all rebases the pin from a fresh coordinator) but appended off
the coordinator's stale in-memory commit-graph head, forking the commit DAG (the
new commit and the external commit shared a parent). Pre-existing for non-strict
inserts; widened to strict ops by Fix 1's refresh_manifest_only freshening the
read-time pin. record_graph_commit now refreshes the commit-graph head from
storage before append_commit, so the parent is the true current head.
record_merge_commit is unaffected (it passes explicit parents).

* perf(engine): hold open Dataset handles + share one Session per graph (Fix 3)

A warm same-branch read still re-opened every touched table per query (the
"never warms up" residual after Fix 1+2). A per-graph held-handle cache keyed by
(table_path, branch, version) now serves repeat reads with zero table opens, and
one shared lance::Session per graph warms metadata/index caches across opens.

Validated against LanceDB upstream (rust/lancedb/src/table/dataset.rs
DatasetConsistencyWrapper): hold an Arc<Dataset> and reuse it for 0-IO warm
reads; one Session per connection threaded into opens; writers never serve from
the read cache; time-travel bypasses. One adaptation: omnigraph keys by version
(snapshot-pins-version model) where LanceDB keys per-table+HEAD, reusing the
in-repo GraphIndexCache LRU template.

- ReadCaches (session + TableHandleCache) injected onto live-Branch-read
  snapshots in resolved_target; Snapshot::open serves from the cache or opens
  once with the session on a miss (via the instrumented open_table_dataset).
- Writes (resolved_branch_target -> open HEAD) and time-travel / Snapshot-id
  reads bypass the cache. Version-in-key makes a write a new key (old handle ages
  out via LRU); invalidate_all at branch-switch/refresh is hygiene only.
- Cost tests: a 2nd identical warm read does 0 table opens; a write re-opens only
  the changed table at its new version.

Full engine suite green.

* test(engine): forbid raw data opens in the read/exec layer (P2 guard)

Extend the forbidden-API guard with Dataset::open / DatasetBuilder::from_uri /
from_namespace so the read/exec layer (exec/, loader/, changes/, db/omnigraph/)
cannot bypass Snapshot::open and the held-handle cache (Fix 3). The instrumented
opener (instrumentation.rs) is allow-listed; two legitimate non-read opens (a
test editing __manifest, Hard-drop version GC) carry sentinels. The
storage/manifest layers stay allow-listed.

Lean P2 scope, per LanceDB-upstream + minimize-liability: the data-read boundary
already exists (SubTableEntry::open); this guard pins it so a future read cannot
open around the cache. Centralizing all internal opens behind one opener is
deferred.

* docs(dev): invariant 15 (one source of truth, cheaply derived) + cost-budget testing

Records the principle behind the query-latency work: Lance and the manifest are
the source of truth, everything else a derived view held warm and refreshed by a
cheap probe; the two failure modes (a drifting parallel copy, and cold
re-derivation whose cost grows with history) are deny-listed. Adds the
cost-budget testing discipline (assert a warm read's open/IO count is flat at
commit-history depth, the LanceDB IO-counted pattern) and the warm_read_cost.rs
row. Updates the read-path-re-derivation known gap to reflect what Fix 1/2/3 +
finding A close, and adds the commit-graph-parent-under-concurrency gap.

* fix(engine): branch-incarnation identity + unified invalidation + shared LruMap (PR #268 review)

Phase 6 A-D, correct-by-design responses to the Codex/Greptile P2 review comments. A: warm-read freshness and the table-handle cache key use the manifest incarnation (e_tag, manifest-timestamp fallback, then version), so a deleted+recreated non-main branch reusing a version number cannot be served stale; main stays version-cheap, non-main loads latest_manifest; a detected stale refresh also invalidates read caches; two regression tests force the version collision. B: unify the two cache invalidations into Omnigraph::invalidate_read_caches() at the four sites. C: assert the stale path's probe count. D: shared LruMap behind both caches with unconditional eviction, plus a unit test. Full engine suite green; multi-process lineage fork and O(history) write refresh remain known gaps for Phase 6E/7.
This commit is contained in:
Ragnor Comerford 2026-06-17 13:25:20 +02:00 committed by GitHub
parent d0e06a6ff6
commit 5243c048aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 1918 additions and 107 deletions

View file

@ -55,5 +55,6 @@ arc-swap = { workspace = true }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.0" }
tokio = { workspace = true }
lance-namespace-impls = { workspace = true }
lance-io = "7.0.0"
serial_test = "3"
proptest = "1"

View file

@ -79,10 +79,14 @@ impl CommitGraph {
pub async fn open(root_uri: &str) -> Result<Self> {
let root = root_uri.trim_end_matches('/');
let dataset = Dataset::open(&graph_commits_uri(root))
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
let wrapper = crate::instrumentation::commit_graph_wrapper();
let dataset =
crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone())
.await?;
let actor_dataset =
crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper)
.await
.ok();
let actor_by_commit_id = match &actor_dataset {
Some(dataset) => load_commit_actor_cache(dataset).await?,
None => HashMap::new(),
@ -101,14 +105,18 @@ impl CommitGraph {
pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
let root = root_uri.trim_end_matches('/');
let dataset = Dataset::open(&graph_commits_uri(root))
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let wrapper = crate::instrumentation::commit_graph_wrapper();
let dataset =
crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone())
.await?;
let dataset = dataset
.checkout_branch(branch)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
let actor_dataset =
crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper)
.await
.ok();
let actor_by_commit_id = match &actor_dataset {
Some(dataset) => load_commit_actor_cache(dataset).await?,
None => HashMap::new(),
@ -127,9 +135,12 @@ impl CommitGraph {
pub async fn refresh(&mut self) -> Result<()> {
let root = self.root_uri.clone();
self.dataset = Dataset::open(&graph_commits_uri(&root))
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let wrapper = crate::instrumentation::commit_graph_wrapper();
self.dataset = crate::instrumentation::open_dataset_tracked(
&graph_commits_uri(&root),
wrapper.clone(),
)
.await?;
if let Some(branch) = &self.active_branch {
self.dataset = self
.dataset
@ -137,7 +148,10 @@ impl CommitGraph {
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
self.actor_dataset = Dataset::open(&graph_commit_actors_uri(&root)).await.ok();
self.actor_dataset =
crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(&root), wrapper)
.await
.ok();
self.actor_by_commit_id = match &self.actor_dataset {
Some(dataset) => load_commit_actor_cache(dataset).await?,
None => HashMap::new(),

View file

@ -10,7 +10,9 @@ use crate::storage::{StorageAdapter, join_uri, normalize_root_uri};
use super::commit_graph::{CommitGraph, GraphCommit};
use super::is_internal_system_branch;
use super::manifest::{ManifestChange, ManifestCoordinator, Snapshot, SubTableUpdate};
use super::manifest::{
ManifestChange, ManifestCoordinator, ManifestIncarnation, Snapshot, SubTableUpdate,
};
const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
@ -26,10 +28,11 @@ impl SnapshotId {
&self.0
}
pub(crate) fn synthetic(branch: Option<&str>, version: u64) -> Self {
match branch {
Some(branch) => Self(format!("manifest:{}:v{}", branch, version)),
None => Self(format!("manifest:main:v{}", version)),
pub(crate) fn synthetic(branch: Option<&str>, version: u64, e_tag: Option<&str>) -> Self {
let branch = branch.unwrap_or("main");
match e_tag {
Some(e_tag) => Self(format!("manifest:{}:v{}:etag:{}", branch, version, e_tag)),
None => Self(format!("manifest:{}:v{}", branch, version)),
}
}
}
@ -166,6 +169,10 @@ impl GraphCoordinator {
self.manifest.version()
}
pub(crate) fn manifest_incarnation(&self) -> ManifestIncarnation {
self.manifest.incarnation()
}
pub fn snapshot(&self) -> Snapshot {
self.manifest.snapshot()
}
@ -182,6 +189,19 @@ impl GraphCoordinator {
Ok(())
}
pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
crate::instrumentation::record_probe();
self.manifest.probe_latest_incarnation().await
}
/// Refresh only the manifest (not the commit graph). The read path uses this
/// on a stale same-branch probe: a read pins its snapshot by manifest version
/// and never needs the commit graph, so a full `refresh` (which also scans
/// the commit graph) would be wasted IO.
pub async fn refresh_manifest_only(&mut self) -> Result<()> {
self.manifest.refresh().await
}
pub async fn branch_list(&self) -> Result<Vec<String>> {
self.manifest.list_branches().await.map(|branches| {
branches
@ -315,10 +335,13 @@ impl GraphCoordinator {
None => GraphCoordinator::open(self.root_uri(), Arc::clone(&self.storage)).await?,
};
Ok(other
.head_commit_id()
.await?
.unwrap_or_else(|| SnapshotId::synthetic(other.current_branch(), other.version())))
Ok(other.head_commit_id().await?.unwrap_or_else(|| {
SnapshotId::synthetic(
other.current_branch(),
other.version(),
other.manifest_incarnation().e_tag.as_deref(),
)
}))
}
pub async fn resolve_target(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
@ -339,7 +362,11 @@ impl GraphCoordinator {
}
};
let snapshot_id = other.head_commit_id().await?.unwrap_or_else(|| {
SnapshotId::synthetic(other.current_branch(), other.version())
SnapshotId::synthetic(
other.current_branch(),
other.version(),
other.manifest_incarnation().e_tag.as_deref(),
)
});
Ok(ResolvedTarget {
requested: target.clone(),
@ -509,9 +536,23 @@ impl GraphCoordinator {
return Ok(SnapshotId::synthetic(
current_branch.as_deref(),
manifest_version,
self.manifest_incarnation().e_tag.as_deref(),
));
};
failpoints::maybe_fail("graph_publish.before_commit_append")?;
// Refresh the commit-graph head from storage before selecting the
// parent. `append_commit` parents the new commit on the IN-MEMORY head
// (`head_commit_id`, zero storage read), but the manifest was just
// committed against a freshly rebased pin (`commit_all` opens a fresh
// coordinator) while THIS coordinator's cached head may be stale because
// an external writer advanced the branch. Without this refresh a
// same-branch write after an external commit appends off the stale head
// and FORKS the commit DAG (the new commit and the external commit
// sharing a parent). Refreshing makes the parent the true current head;
// the just-committed manifest version has no commit-graph row yet, so the
// fresh head is exactly the prior commit. (record_merge_commit is
// unaffected — it passes explicit parents, never the cached head.)
commit_graph.refresh().await?;
let graph_commit_id = commit_graph
.append_commit(current_branch.as_deref(), manifest_version, actor_id)
.await?;

View file

@ -24,11 +24,10 @@ mod recovery;
mod state;
use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at};
use layout::{manifest_uri, open_manifest_dataset, type_name_hash};
use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash};
pub(crate) use metadata::TableVersionMetadata;
#[cfg(test)]
use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state};
use namespace::open_table_at_version_from_manifest;
pub(crate) use namespace::open_table_head_for_write;
#[cfg(test)]
use namespace::{branch_manifest_namespace, staged_table_namespace};
@ -74,16 +73,51 @@ pub struct Snapshot {
root_uri: String,
version: u64,
entries: HashMap<String, SubTableEntry>,
/// Per-graph read caches (shared `Session` + held-handle cache), injected by
/// `Omnigraph::resolved_target` for live Branch reads so table opens reuse
/// handles (0 IO on a warm repeat) and one `Session`. `None` for write-prelude
/// snapshots, time-travel / Snapshot-id reads, and directly-built test
/// snapshots, which fall back to a plain open.
read_caches: Option<Arc<crate::runtime_cache::ReadCaches>>,
}
impl Snapshot {
/// Open a sub-table dataset at its pinned version.
/// Open a sub-table dataset at its pinned version. With read caches present
/// (live Branch reads), reuse a held handle through the cache (0 open IO on a
/// warm repeat) and the shared `Session`; otherwise plain-open (Fix 2).
pub async fn open(&self, table_key: &str) -> Result<Dataset> {
let entry = self
.entries
.get(table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
entry.open(&self.root_uri).await
match &self.read_caches {
Some(caches) => {
let location = table_uri_for_path(
&self.root_uri,
&entry.table_path,
entry.table_branch.as_deref(),
);
caches
.handles
.get_or_open(
&entry.table_path,
entry.table_branch.as_deref(),
entry.table_version,
entry.version_metadata.e_tag(),
&location,
Some(&caches.session),
)
.await
}
None => entry.open(&self.root_uri).await,
}
}
/// Attach per-graph read caches (shared `Session` + handle cache) so this
/// snapshot's table opens reuse handles and the session. Set by
/// `Omnigraph::resolved_target` for live Branch reads only.
pub(crate) fn set_read_caches(&mut self, caches: Arc<crate::runtime_cache::ReadCaches>) {
self.read_caches = Some(caches);
}
/// Manifest version this snapshot was taken from.
@ -101,6 +135,31 @@ impl Snapshot {
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ManifestIncarnation {
pub(crate) version: u64,
pub(crate) e_tag: Option<String>,
timestamp_nanos: Option<u128>,
}
impl ManifestIncarnation {
pub(crate) fn matches(&self, held: &Self) -> bool {
if self.version != held.version {
return false;
}
match (&self.e_tag, &held.e_tag) {
(Some(latest), Some(current)) => latest == current,
_ => match (self.timestamp_nanos, held.timestamp_nanos) {
(Some(latest), Some(current)) => latest == current,
// Some object stores can omit both e_tag and manifest timestamp
// from the reachable API. In that narrow case the version-number
// probe is the strongest available identity.
_ => true,
},
}
}
}
impl SubTableUpdate {
pub(crate) fn to_create_table_version_request(&self) -> CreateTableVersionRequest {
self.version_metadata.to_create_table_version_request(
@ -132,14 +191,28 @@ pub(crate) enum ManifestChange {
}
impl SubTableEntry {
/// Open this sub-table at its pinned version directly by location (Fix 2),
/// without the Lance namespace — which would full-scan `__manifest` twice per
/// open (`describe_table` + `describe_table_version`). The resolved Snapshot
/// already holds the path, version, and branch. Branches are Lance native
/// branches, so `with_branch` resolves `{base}/tree/{branch}` from the base
/// URI; main uses `with_version`.
pub(crate) async fn open(&self, root_uri: &str) -> Result<Dataset> {
open_table_at_version_from_manifest(
root_uri,
&self.table_key,
self.table_branch.as_deref(),
self.table_version,
)
.await
// The branch-qualified location is the dataset that physically holds this
// version: main at `{table_path}`, a branch at
// `{table_path}/tree/{branch}` (Lance native-branch storage). `with_version`
// then resolves the version within THAT dataset's `_versions` — a branch
// version lives under `tree/{branch}/_versions`, not the base. This
// matches the physical layout the namespace path resolved, without the
// per-open `__manifest` scan.
let location = table_uri_for_path(root_uri, &self.table_path, self.table_branch.as_deref());
// Route through the instrumented data-table opener (Fix 3). With no
// session this is exactly the Fix-2 `from_uri(location).with_version`.
// This is the uncached fallback (a snapshot with no read caches); the
// cached path (`Snapshot::open` → handle cache) calls the same opener on
// a miss with the shared session, so both paths count on the per-query
// `table_wrapper`.
crate::instrumentation::open_table_dataset(&location, self.table_version, None).await
}
}
@ -223,6 +296,7 @@ impl ManifestCoordinator {
.into_iter()
.map(|entry| (entry.table_key.clone(), entry))
.collect(),
read_caches: None,
}
}
@ -359,6 +433,48 @@ impl ManifestCoordinator {
self.dataset.version().version
}
/// Latest committed manifest version on disk (one object-store op, no row
/// scan). The freshness probe for warm reuse: compare against `version()`
/// (the held handle's pinned version) to decide whether to refresh.
pub async fn probe_latest_version(&self) -> Result<u64> {
self.dataset
.latest_version_id()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub(crate) fn incarnation(&self) -> ManifestIncarnation {
ManifestIncarnation {
version: self.version(),
e_tag: self.dataset.manifest_location().e_tag.clone(),
timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
}
}
/// Latest committed manifest identity. Main cannot be deleted/recreated, so
/// the cheap version-number probe is sufficient there. Non-main Lance
/// branches can be deleted and recreated with the same version number, so
/// load the latest manifest location and compare its e_tag / timestamp too.
pub(crate) async fn probe_latest_incarnation(&self) -> Result<ManifestIncarnation> {
if self.active_branch.is_none() {
return Ok(ManifestIncarnation {
version: self.probe_latest_version().await?,
e_tag: self.dataset.manifest_location().e_tag.clone(),
timestamp_nanos: Some(self.dataset.manifest().timestamp_nanos),
});
}
let (manifest, location) = self
.dataset
.latest_manifest()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(ManifestIncarnation {
version: manifest.version,
e_tag: location.e_tag,
timestamp_nanos: Some(manifest.timestamp_nanos),
})
}
pub fn active_branch(&self) -> Option<&str> {
self.active_branch.as_deref()
}

View file

@ -20,9 +20,12 @@ pub(super) fn manifest_uri(root: &str) -> String {
}
pub(super) async fn open_manifest_dataset(root_uri: &str, branch: Option<&str>) -> Result<Dataset> {
let dataset = Dataset::open(&manifest_uri(root_uri.trim_end_matches('/')))
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let uri = manifest_uri(root_uri.trim_end_matches('/'));
let dataset = crate::instrumentation::open_dataset_tracked(
&uri,
crate::instrumentation::manifest_wrapper(),
)
.await?;
match branch {
Some(branch) if branch != "main" => dataset
.checkout_branch(branch)

View file

@ -111,7 +111,6 @@ impl TableVersionMetadata {
self.manifest_size
}
#[cfg(test)]
pub(crate) fn e_tag(&self) -> Option<&str> {
self.e_tag.as_deref()
}
@ -138,6 +137,7 @@ impl TableVersionMetadata {
request
}
#[cfg(test)]
pub(super) fn to_namespace_version(&self, version: u64) -> TableVersion {
self.to_namespace_version_with_details(version, None, None)
}

View file

@ -16,21 +16,30 @@ use object_store::{
use crate::error::{OmniError, Result};
use super::layout::{
namespace_internal_error, open_manifest_dataset, table_id_to_key, table_uri_for_path,
};
use super::metadata::{
TableVersionMetadata, namespace_version_metadata, parse_namespace_version_request,
};
use super::layout::{namespace_internal_error, table_uri_for_path};
#[cfg(test)]
use super::layout::{open_manifest_dataset, table_id_to_key};
use super::metadata::TableVersionMetadata;
#[cfg(test)]
use super::metadata::{namespace_version_metadata, parse_namespace_version_request};
#[cfg(test)]
use super::publisher::GraphNamespacePublisher;
// The read namespace (BranchManifestNamespace) is test-only since Fix 2: reads
// open sub-tables directly by location+version (SubTableEntry::open), so nothing
// in production routes a read through the Lance namespace. The writes path uses
// StagedTableNamespace. These items are retained to validate the namespace
// contract in unit tests.
#[cfg(test)]
use super::state::{ManifestState, SubTableEntry, read_manifest_entries, read_manifest_state};
#[cfg(test)]
#[derive(Debug, Clone)]
struct BranchManifestNamespace {
root_uri: String,
branch: Option<String>,
}
#[cfg(test)]
impl BranchManifestNamespace {
fn new(root_uri: &str, branch: Option<&str>) -> Self {
Self {
@ -137,6 +146,7 @@ impl StagedTableNamespace {
}
}
#[cfg(test)]
pub(crate) fn branch_manifest_namespace(
root_uri: &str,
branch: Option<&str>,
@ -175,21 +185,7 @@ async fn load_table_from_namespace(
.map_err(|e| OmniError::Lance(e.to_string()))
}
pub(crate) async fn open_table_at_version_from_manifest(
root_uri: &str,
table_key: &str,
branch: Option<&str>,
version: u64,
) -> Result<Dataset> {
load_table_from_namespace(
branch_manifest_namespace(root_uri, branch),
table_key,
branch,
Some(version),
)
.await
}
#[cfg(test)]
#[async_trait]
impl LanceNamespace for BranchManifestNamespace {
fn namespace_id(&self) -> String {

View file

@ -24,10 +24,13 @@ use lance::Dataset;
use lance::Error as LanceError;
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
use lance_namespace::NamespaceError;
#[cfg(test)]
use lance_namespace::models::CreateTableVersionRequest;
use crate::error::{OmniError, Result};
#[cfg(test)]
use super::SubTableUpdate;
use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id};
use super::metadata::parse_namespace_version_request;
use super::migrations::migrate_internal_schema;
@ -37,7 +40,7 @@ use super::state::{
};
use super::{
ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION,
SubTableEntry, SubTableUpdate, TableRegistration, TableTombstone,
SubTableEntry, TableRegistration, TableTombstone,
};
/// Bound on the publisher-level retry loop that wraps Lance's row-level CAS
@ -396,6 +399,7 @@ impl GraphNamespacePublisher {
Ok(Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone()))
}
#[cfg(test)]
pub(super) async fn publish_requests(
&self,
requests: &[CreateTableVersionRequest],

View file

@ -830,7 +830,12 @@ pub(crate) async fn recover_manifest_drift(
// write-entry heal: a deferred sidecar whose branch was
// deleted would otherwise fail every ReadWrite open.
coordinator.refresh().await?;
if !coordinator.all_branches().await?.iter().any(|name| name == b) {
if !coordinator
.all_branches()
.await?
.iter()
.any(|name| name == b)
{
discard_orphaned_branch_sidecar(
root_uri,
storage.as_ref(),

View file

@ -1531,7 +1531,11 @@ async fn test_v2_to_v3_sweeps_legacy_run_branches_on_write_open() {
.await
.unwrap();
let post = open_manifest_dataset(uri, None).await.unwrap();
assert_eq!(super::migrations::read_stamp(&post), 2, "stamp rewound to v2");
assert_eq!(
super::migrations::read_stamp(&post),
2,
"stamp rewound to v2"
);
}
// A no-op publish forces the open-for-write path, which runs the migration.
@ -1556,7 +1560,10 @@ async fn test_v2_to_v3_sweeps_legacy_run_branches_on_write_open() {
!after.iter().any(|b| b.starts_with("__run__")),
"legacy run branch must be swept; got {after:?}",
);
assert!(after.iter().any(|b| b == "feature"), "user branch must survive");
assert!(
after.iter().any(|b| b == "feature"),
"user branch must survive"
);
assert!(after.iter().any(|b| b == "main"), "main must survive");
// Idempotent: a second write-open finds the stamp at current and does not

View file

@ -106,6 +106,12 @@ pub struct Omnigraph {
coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
table_store: TableStore,
runtime_cache: RuntimeCache,
/// Per-graph read caches: one shared Lance `Session` plus the held-`Dataset`
/// handle cache, handed to live-Branch-read snapshots (via
/// `resolved_target`) so table opens reuse handles (0 IO on a warm repeat)
/// and one session. Invalidated alongside `runtime_cache` on branch switch /
/// refresh — hygiene only; version-in-key carries correctness.
read_caches: Arc<crate::runtime_cache::ReadCaches>,
/// Read-heavy on every query, written only by `apply_schema`. ArcSwap
/// gives atomic pointer swap with zero-cost reads (`load()` returns a
/// `Guard<Arc<Catalog>>`), so concurrent queries on different actors
@ -327,6 +333,14 @@ impl Omnigraph {
coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
// One shared Session per graph (LanceDB's one-session-per-connection
// model) plus the held-handle cache, created once and reused across
// reads. Session::default() caps are lazy (6 GiB index / 1 GiB
// metadata); multi-graph cap/sharing is a deferred follow-up.
read_caches: Arc::new(crate::runtime_cache::ReadCaches {
session: Arc::new(lance::session::Session::default()),
handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
}),
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
schema_source: Arc::new(ArcSwap::from_pointee(schema_source.to_string())),
write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
@ -351,12 +365,10 @@ impl Omnigraph {
Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
}
/// `open_with_storage` retained for existing callers (init/test paths).
/// Defaults to `OpenMode::ReadWrite`.
pub(crate) async fn open_with_storage(
uri: &str,
storage: Arc<dyn StorageAdapter>,
) -> Result<Self> {
/// Open with a caller-supplied [`StorageAdapter`]. Used by init/test paths
/// and by embedding/test consumers that wrap storage (e.g. a counting
/// decorator for IO-budget tests). Defaults to `OpenMode::ReadWrite`.
pub async fn open_with_storage(uri: &str, storage: Arc<dyn StorageAdapter>) -> Result<Self> {
Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
}
@ -428,6 +440,14 @@ impl Omnigraph {
coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
// One shared Session per graph (LanceDB's one-session-per-connection
// model) plus the held-handle cache, created once and reused across
// reads. Session::default() caps are lazy (6 GiB index / 1 GiB
// metadata); multi-graph cap/sharing is a deferred follow-up.
read_caches: Arc::new(crate::runtime_cache::ReadCaches {
session: Arc::new(lance::session::Session::default()),
handles: Arc::new(crate::runtime_cache::TableHandleCache::default()),
}),
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
schema_source: Arc::new(ArcSwap::from_pointee(schema_source)),
write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()),
@ -539,6 +559,12 @@ impl Omnigraph {
}
pub(crate) async fn ensure_schema_state_valid(&self) -> Result<()> {
// Full per-call validation is intentional: a long-lived handle must
// detect external drift of the schema source, IR, OR state on its next
// operation (see lifecycle::long_lived_handle_rejects_schema_* tests). A
// source-only fast path would miss IR/state drift when _schema.pg is
// unchanged, so the only safe latency win is not calling this twice per
// query (finding A removes the redundant caller in exec/query.rs).
validate_schema_contract(self.uri(), Arc::clone(&self.storage)).await
}
@ -719,10 +745,13 @@ impl Omnigraph {
let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
let coord = self.coordinator.read().await;
if normalized.as_deref() == coord.current_branch() {
let snapshot_id = coord
.head_commit_id()
.await?
.unwrap_or_else(|| SnapshotId::synthetic(coord.current_branch(), coord.version()));
let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
SnapshotId::synthetic(
coord.current_branch(),
coord.version(),
coord.manifest_incarnation().e_tag.as_deref(),
)
});
return Ok(ResolvedTarget {
requested,
branch: coord.current_branch().map(str::to_string),
@ -785,10 +814,15 @@ impl Omnigraph {
let branch = normalize_branch_name(branch)?;
let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
*self.coordinator.write().await = next;
self.runtime_cache.invalidate_all().await;
self.invalidate_read_caches().await;
Ok(())
}
async fn invalidate_read_caches(&self) {
self.runtime_cache.invalidate_all().await;
self.read_caches.handles.invalidate_all().await;
}
/// Re-read the handle-local coordinator state from storage AND run
/// in-process recovery. Closes the Phase B → Phase C residual (e.g.
/// `MutationStaging::finalize` crash mid-publish in a long-running
@ -888,7 +922,7 @@ impl Omnigraph {
)
.await?;
self.reload_schema_if_source_changed().await?;
self.runtime_cache.invalidate_all().await;
self.invalidate_read_caches().await;
Ok(())
}
@ -920,7 +954,7 @@ impl Omnigraph {
// write that triggered the heal validates against the stale
// schema. Same post-heal step as `refresh`.
self.reload_schema_if_source_changed().await?;
self.runtime_cache.invalidate_all().await;
self.invalidate_read_caches().await;
}
Ok(())
}
@ -956,7 +990,7 @@ impl Omnigraph {
/// own publish path.
pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
self.coordinator.write().await.refresh().await?;
self.runtime_cache.invalidate_all().await;
self.invalidate_read_caches().await;
Ok(())
}
@ -974,11 +1008,66 @@ impl Omnigraph {
target: impl Into<ReadTarget>,
) -> Result<ResolvedTarget> {
self.ensure_schema_state_valid().await?;
self.coordinator
.read()
.await
.resolve_target(&target.into())
.await
let target = target.into();
let mut resolved = self.resolve_target_inner(&target).await?;
// Attach the read caches (shared Session + held-handle cache) for live
// Branch reads so table opens reuse handles (0 IO on a warm repeat).
// Snapshot-id reads are deliberately NOT cached: they pin a historical
// version `cleanup` may GC, so bypassing the cache sidesteps the
// cleanup-vs-cached-handle edge. Writes never reach here (they use
// `resolved_branch_target`), so they never receive a pinned handle.
if matches!(target, ReadTarget::Branch(_)) {
resolved
.snapshot
.set_read_caches(Arc::clone(&self.read_caches));
}
Ok(resolved)
}
/// Resolve a read target to its snapshot, without attaching read caches.
/// Same-branch reads reuse the warm coordinator, gated by a cheap version
/// probe (invariant 6: strong consistency, never a blind warm read). Reads do
/// not need the commit graph (the manifest version is the visibility
/// authority, invariant 2), so the id is synthetic and no commit-graph scan
/// happens on this path.
async fn resolve_target_inner(&self, target: &ReadTarget) -> Result<ResolvedTarget> {
if let ReadTarget::Branch(branch) = target {
let normalized = normalize_branch_name(branch)?;
{
let coord = self.coordinator.read().await;
if normalized.as_deref() != coord.current_branch() {
// Different branch: cold resolve (opens that branch).
return coord.resolve_target(target).await;
}
let held = coord.manifest_incarnation();
if coord.probe_latest_incarnation().await?.matches(&held) {
return Ok(warm_resolved_target(&coord, target));
}
// Stale: refresh under the write lock below.
}
let mut coord = self.coordinator.write().await;
if normalized.as_deref() == coord.current_branch() {
// Re-check after taking the write lock; another writer may have
// refreshed (tokio RwLock has no read->write upgrade).
let held = coord.manifest_incarnation();
let mut refreshed = false;
if !coord.probe_latest_incarnation().await?.matches(&held) {
coord.refresh_manifest_only().await?;
refreshed = true;
}
let resolved = warm_resolved_target(&coord, target);
drop(coord);
if refreshed {
self.invalidate_read_caches().await;
}
return Ok(resolved);
}
// Branch changed while waiting for the write lock: cold resolve.
return coord.resolve_target(target).await;
}
// Snapshot target: resolve through the commit graph as before.
self.coordinator.read().await.resolve_target(target).await
}
// ─── Change detection ────────────────────────────────────────────────
@ -1673,6 +1762,24 @@ pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
Ok(Some(branch.to_string()))
}
/// Build a `ResolvedTarget` from the warm coordinator without opening the commit
/// graph. The live branch snapshot is pinned by the manifest incarnation, so the
/// id is synthetic `(branch, version, e_tag when available)`; nothing on the read
/// path needs a real commit ULID (only `RuntimeCache` keys on the id, where
/// synthetic is consistent).
fn warm_resolved_target(coord: &GraphCoordinator, requested: &ReadTarget) -> ResolvedTarget {
ResolvedTarget {
requested: requested.clone(),
branch: coord.current_branch().map(str::to_string),
snapshot_id: SnapshotId::synthetic(
coord.current_branch(),
coord.version(),
coord.manifest_incarnation().e_tag.as_deref(),
),
snapshot: coord.snapshot(),
}
}
pub(crate) fn ensure_public_branch_ref(branch: &str, operation: &str) -> Result<()> {
if is_internal_system_branch(branch) {
return Err(OmniError::manifest(format!(
@ -2523,6 +2630,7 @@ edge WorksAt: Person -> Company
db.branch_create("__run__legacy").await.unwrap();
drop(db);
{
// forbidden-api-allow: test synthesizes a legacy graph by editing __manifest directly.
let mut ds = lance::Dataset::open(&format!("{}/__manifest", uri))
.await
.unwrap();

View file

@ -937,6 +937,7 @@ mod tests {
for type_name in ["Person", "Company"] {
let table_uri = node_table_uri(uri, type_name);
// forbidden-api-allow: test synthesizes a branch ref directly on the Lance dataset.
let mut ds = lance::Dataset::open(&table_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();

View file

@ -447,8 +447,7 @@ where
&& sidecar_registrations.is_empty()
&& sidecar_tombstones.is_empty());
if writes_sidecar {
schema_apply_queue_keys
.push(crate::db::manifest::schema_apply_serial_queue_key());
schema_apply_queue_keys.push(crate::db::manifest::schema_apply_serial_queue_key());
}
let _schema_apply_queue_guards = db
.write_queue()
@ -530,8 +529,7 @@ where
.await?;
let table_path = table_path_for_table_key(target_table_key)?;
let dataset_uri = db.storage().dataset_uri(&table_path);
let target_ds =
SnapshotHandle::new(TableStore::write_dataset(&dataset_uri, batch).await?);
let target_ds = SnapshotHandle::new(TableStore::write_dataset(&dataset_uri, batch).await?);
// Indexes on the renamed table are reconciled later (iss-848).
let state = db.storage().table_state(&dataset_uri, &target_ds).await?;
table_registrations.insert(target_table_key.clone(), table_path);
@ -750,6 +748,7 @@ where
async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> {
use chrono::Utc;
use lance::dataset::cleanup::CleanupPolicy;
// forbidden-api-allow: maintenance (Hard-drop version GC) opens the dataset to run cleanup_old_versions.
let ds = lance::Dataset::open(full_uri)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;

View file

@ -1097,7 +1097,8 @@ async fn prepare_updates_for_commit(
// have null embeddings) is deferred and logged inside
// build_indices; a later ensure_indices/optimize materializes it.
// The load/mutate/merge commit must not fail on it.
let _pending = build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
let _pending =
build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
let state = db.storage().table_state(&full_path, &ds).await?;
prepared_update.table_version = state.version;
prepared_update.row_count = state.row_count;
@ -1350,6 +1351,7 @@ mod classify_fork_ref_tests {
// the manifest's `feature` snapshot still places on main.
let person = node_path(&db, "feature", "node:Person").await;
{
// forbidden-api-allow: test synthesizes a branch ref directly on the Lance dataset.
let mut ds = lance::Dataset::open(&person).await.unwrap();
let v = ds.version().version;
ds.create_branch("feature", v, None).await.unwrap();
@ -1362,6 +1364,7 @@ mod classify_fork_ref_tests {
// Orphan (ghost): a ref for a branch the manifest does not have at all.
{
// forbidden-api-allow: test synthesizes a branch ref directly on the Lance dataset.
let mut ds = lance::Dataset::open(&person).await.unwrap();
let v = ds.version().version;
ds.create_branch("ghost", v, None).await.unwrap();

View file

@ -35,7 +35,7 @@ impl Omnigraph {
query_name: &str,
params: &ParamMap,
) -> Result<QueryResult> {
self.ensure_schema_state_valid().await?;
// resolved_target validates the schema contract; no redundant call here.
let resolved = self.resolved_target(target).await?;
let catalog = self.catalog();
@ -80,7 +80,7 @@ impl Omnigraph {
query_name: &str,
params: &ParamMap,
) -> Result<QueryResult> {
self.ensure_schema_state_valid().await?;
// snapshot_at_version validates the schema contract; no redundant call here.
let snapshot = self.snapshot_at_version(version).await?;
let catalog = self.catalog();

View file

@ -0,0 +1,231 @@
//! Read-path cost instrumentation (test seam).
//!
//! Two boundary instruments let cost-budget tests assert that a warm read does
//! no redundant IO, the way LanceDB's IO-counted tests do (see
//! `docs/dev/testing.md`, "Cost-budget tests"):
//!
//! - **Lance object store** — a per-query [`WrappingObjectStore`] attached to the
//! datasets a query opens, so a test counts real `read_iops`. Delivered through
//! a task-local ([`QueryIoProbes`]) set by the test; production leaves it unset,
//! so the open helpers attach nothing (one unset-`Option` check per open).
//! - **omnigraph `StorageAdapter`** — [`CountingStorageAdapter`], a decorator that
//! counts per-method calls (the schema-contract reads on the query path).
//!
//! Nothing here changes runtime behavior: the wrappers only observe, and the
//! decorator delegates every call. `IOTracker` (the concrete counter) lives in
//! tests via the `lance-io` dev-dependency; this module stays generic over the
//! `lance::io`-re-exported trait, so it adds no production dependency.
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;
/// Per-query IO probes, installed for a query's task via [`with_query_io_probes`].
///
/// Each wrapper is attached (when present) to the datasets that category opens,
/// so a test reads `read_iops` off its own `IOTracker` handle. `probe_count`
/// records calls to the version probe (which runs on the coordinator's already-open
/// handle, so it is counted by invocation rather than by the per-query wrappers).
#[derive(Clone, Default)]
pub struct QueryIoProbes {
pub manifest_wrapper: Option<Arc<dyn WrappingObjectStore>>,
pub commit_graph_wrapper: Option<Arc<dyn WrappingObjectStore>>,
/// Attached to the per-table data opens a query performs (the cache-miss
/// path in `SubTableEntry::open`). Lets a cost test assert how many tables
/// a query actually opened — N on a cold read, 0 on a warm repeat once the
/// handle cache (Fix 3) serves them.
pub table_wrapper: Option<Arc<dyn WrappingObjectStore>>,
pub probe_count: Arc<AtomicU64>,
}
tokio::task_local! {
static QUERY_IO_PROBES: QueryIoProbes;
}
/// Run `fut` with per-query IO probes installed. Test-only entry point; nothing
/// in production sets the probes, so the accessors below return `None`/no-op.
pub async fn with_query_io_probes<F>(probes: QueryIoProbes, fut: F) -> F::Output
where
F: std::future::Future,
{
QUERY_IO_PROBES.scope(probes, fut).await
}
fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
QUERY_IO_PROBES.try_with(f).ok()
}
pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
current(|p| p.manifest_wrapper.clone()).flatten()
}
pub(crate) fn commit_graph_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
current(|p| p.commit_graph_wrapper.clone()).flatten()
}
pub(crate) fn table_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
current(|p| p.table_wrapper.clone()).flatten()
}
/// Record one version-probe invocation against the active per-query probes.
/// No-op when no probes are installed (production).
pub(crate) fn record_probe() {
let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
}
/// Open a Lance dataset at `uri`, attaching `wrapper` (for IO counting) when
/// present. With no wrapper this is exactly `Dataset::open(uri)`. The wrapper is
/// set via `ObjectStoreParams` on the builder so the open itself is counted
/// (`Dataset::with_object_store_wrappers` only wraps an already-open store).
pub(crate) async fn open_dataset_tracked(
uri: &str,
wrapper: Option<Arc<dyn WrappingObjectStore>>,
) -> Result<Dataset> {
let result = match wrapper {
None => Dataset::open(uri).await,
Some(wrapper) => {
DatasetBuilder::from_uri(uri)
.with_store_params(ObjectStoreParams {
object_store_wrapper: Some(wrapper),
..Default::default()
})
.load()
.await
}
};
result.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Open a data-table dataset at `location` pinned to `version` — the cache-miss
/// path of the data-read boundary (`SubTableEntry::open`). Attaches the shared
/// per-graph `Session` (warms metadata/index caches across opens, LanceDB's
/// one-session-per-connection pattern) and the per-query `table_wrapper` (for IO
/// counting) when present. With neither, this is exactly the Fix-2
/// `from_uri(location).with_version(version)` open.
pub(crate) async fn open_table_dataset(
location: &str,
version: u64,
session: Option<&Arc<lance::session::Session>>,
) -> Result<Dataset> {
let mut builder = DatasetBuilder::from_uri(location).with_version(version);
if let Some(session) = session {
builder = builder.with_session(session.clone());
}
if let Some(wrapper) = table_wrapper() {
builder = builder.with_store_params(ObjectStoreParams {
object_store_wrapper: Some(wrapper),
..Default::default()
});
}
builder
.load()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Per-method read counts for [`CountingStorageAdapter`].
#[derive(Debug, Default)]
pub struct StorageReadCounts {
pub read_text: AtomicU64,
pub exists: AtomicU64,
pub read_text_versioned: AtomicU64,
pub list_dir: AtomicU64,
}
impl StorageReadCounts {
pub fn read_text(&self) -> u64 {
self.read_text.load(Ordering::Relaxed)
}
pub fn exists(&self) -> u64 {
self.exists.load(Ordering::Relaxed)
}
pub fn read_text_versioned(&self) -> u64 {
self.read_text_versioned.load(Ordering::Relaxed)
}
pub fn list_dir(&self) -> u64 {
self.list_dir.load(Ordering::Relaxed)
}
}
/// Boundary decorator over a [`StorageAdapter`] that counts read-facing calls.
/// Reads delegate after incrementing; writes delegate unchanged. Construct with
/// [`CountingStorageAdapter::new`] and open an engine via
/// `Omnigraph::open_with_storage` to count its non-Lance storage IO.
#[derive(Debug)]
pub struct CountingStorageAdapter {
inner: Arc<dyn StorageAdapter>,
counts: Arc<StorageReadCounts>,
}
impl CountingStorageAdapter {
/// Wrap `inner`, returning the adapter and a shared handle to its counts.
pub fn new(inner: Arc<dyn StorageAdapter>) -> (Arc<dyn StorageAdapter>, Arc<StorageReadCounts>) {
let counts = Arc::new(StorageReadCounts::default());
let adapter: Arc<dyn StorageAdapter> = Arc::new(Self {
inner,
counts: Arc::clone(&counts),
});
(adapter, counts)
}
}
#[async_trait]
impl StorageAdapter for CountingStorageAdapter {
async fn read_text(&self, uri: &str) -> Result<String> {
self.counts.read_text.fetch_add(1, Ordering::Relaxed);
self.inner.read_text(uri).await
}
async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
self.inner.write_text(uri, contents).await
}
async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
self.inner.write_text_if_absent(uri, contents).await
}
async fn exists(&self, uri: &str) -> Result<bool> {
self.counts.exists.fetch_add(1, Ordering::Relaxed);
self.inner.exists(uri).await
}
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
self.inner.rename_text(from_uri, to_uri).await
}
async fn delete(&self, uri: &str) -> Result<()> {
self.inner.delete(uri).await
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.counts.list_dir.fetch_add(1, Ordering::Relaxed);
self.inner.list_dir(dir_uri).await
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.counts.read_text_versioned.fetch_add(1, Ordering::Relaxed);
self.inner.read_text_versioned(uri).await
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}

View file

@ -14,6 +14,7 @@ pub mod error;
mod exec;
pub mod failpoints;
pub mod graph_index;
pub mod instrumentation;
pub mod loader;
pub mod runtime_cache;
pub mod storage;

View file

@ -1,6 +1,9 @@
use std::collections::{HashMap, VecDeque};
use std::hash::Hash;
use std::sync::Arc;
use lance::Dataset;
use lance::session::Session;
use omnigraph_compiler::catalog::Catalog;
use tokio::sync::Mutex;
@ -26,17 +29,15 @@ pub struct RuntimeCache {
graph_indices: Mutex<GraphIndexCache>,
}
#[derive(Debug, Default)]
#[derive(Debug)]
struct GraphIndexCache {
entries: HashMap<GraphIndexCacheKey, Arc<GraphIndex>>,
lru: VecDeque<GraphIndexCacheKey>,
entries: LruMap<GraphIndexCacheKey, Arc<GraphIndex>>,
}
impl RuntimeCache {
pub async fn invalidate_all(&self) {
let mut cache = self.graph_indices.lock().await;
cache.entries.clear();
cache.lru.clear();
cache.entries.invalidate_all();
}
pub async fn graph_index(
@ -48,7 +49,6 @@ impl RuntimeCache {
{
let mut cache = self.graph_indices.lock().await;
if let Some(index) = cache.entries.get(&key).cloned() {
cache.touch(key.clone());
return Ok(index);
}
}
@ -62,7 +62,6 @@ impl RuntimeCache {
let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?);
let mut cache = self.graph_indices.lock().await;
if let Some(existing) = cache.entries.get(&key).cloned() {
cache.touch(key);
return Ok(existing);
}
cache.insert(key, Arc::clone(&index));
@ -72,24 +71,86 @@ impl RuntimeCache {
impl GraphIndexCache {
fn insert(&mut self, key: GraphIndexCacheKey, value: Arc<GraphIndex>) {
self.entries.insert(key.clone(), value);
self.touch(key);
while self.entries.len() > 8 {
let Some(oldest) = self.lru.pop_front() else {
break;
};
if self.entries.remove(&oldest).is_some() {
break;
}
self.entries.insert(key, value);
}
#[cfg(test)]
fn touch(&mut self, key: GraphIndexCacheKey) {
self.entries.touch(key);
}
}
#[derive(Debug)]
struct LruMap<K, V>
where
K: Clone + Eq + Hash,
{
entries: HashMap<K, V>,
lru: VecDeque<K>,
cap: usize,
}
impl<K, V> LruMap<K, V>
where
K: Clone + Eq + Hash,
{
fn new(cap: usize) -> Self {
Self {
entries: HashMap::new(),
lru: VecDeque::new(),
cap,
}
}
fn touch(&mut self, key: GraphIndexCacheKey) {
fn get(&mut self, key: &K) -> Option<&V> {
if self.entries.contains_key(key) {
self.touch(key.clone());
self.entries.get(key)
} else {
None
}
}
fn insert(&mut self, key: K, value: V) {
self.entries.insert(key.clone(), value);
self.touch(key);
while self.entries.len() > self.cap {
let Some(oldest) = self.lru.pop_front() else {
break;
};
self.entries.remove(&oldest);
}
}
fn invalidate_all(&mut self) {
self.entries.clear();
self.lru.clear();
}
#[cfg(test)]
fn contains_key(&self, key: &K) -> bool {
self.entries.contains_key(key)
}
#[cfg(test)]
fn len(&self) -> usize {
self.entries.len()
}
fn touch(&mut self, key: K) {
self.lru.retain(|existing| existing != &key);
self.lru.push_back(key);
}
}
impl Default for GraphIndexCache {
fn default() -> Self {
Self {
entries: LruMap::new(8),
}
}
}
fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey {
let mut edge_tables: Vec<GraphIndexTableState> = catalog
.edge_types
@ -114,6 +175,114 @@ fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphI
}
}
/// Max held `Dataset` handles. A handle holds only Arcs (object store + manifest),
/// never table data, so this is cheap; it bounds how many `(table, branch,
/// version, e_tag)` cells stay warm. One graph's live table set across a couple
/// of branches at the current version fits comfortably, with headroom for the
/// recently-superseded versions left by writes until they age out.
const TABLE_HANDLE_CACHE_CAP: usize = 64;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct TableHandleKey {
table_path: String,
table_branch: Option<String>,
version: u64,
e_tag: Option<String>,
}
/// Held open-`Dataset` handles keyed by `(table_path, branch, version, e_tag)` — the
/// version-keyed analogue of LanceDB's `DatasetConsistencyWrapper`
/// (`rust/lancedb/src/table/dataset.rs`). A warm read reuses a held handle with
/// zero open IO (a cheap `Dataset` clone); a miss opens once at the location with
/// the shared `Session`. Version plus e_tag are in the key, so a write (or a
/// delete/recreate that reuses a version number on object stores with e_tags) is
/// simply a new key. A same-branch manifest refresh clears this cache as the
/// fallback for e_tag-less table locations. Only read-path Data opens use this —
/// writes open HEAD directly and never receive a pinned handle.
#[derive(Default)]
pub struct TableHandleCache {
inner: Mutex<TableHandleCacheInner>,
}
struct TableHandleCacheInner {
entries: LruMap<TableHandleKey, Dataset>,
}
impl TableHandleCache {
/// Drop all held handles. Correctness never requires this (version-in-key);
/// it is memory hygiene, called from the same hooks that clear the graph
/// index cache (branch switch / refresh).
pub async fn invalidate_all(&self) {
let mut inner = self.inner.lock().await;
inner.entries.invalidate_all();
}
/// Return the dataset for `(table_path, branch, version, e_tag)`, reusing a
/// held handle (0 open IO) or opening it once at `location` with the shared
/// `session` on a miss.
pub async fn get_or_open(
&self,
table_path: &str,
table_branch: Option<&str>,
version: u64,
e_tag: Option<&str>,
location: &str,
session: Option<&Arc<Session>>,
) -> Result<Dataset> {
let key = TableHandleKey {
table_path: table_path.to_string(),
table_branch: table_branch.map(str::to_string),
version,
e_tag: e_tag.map(str::to_string),
};
{
let mut inner = self.inner.lock().await;
if let Some(ds) = inner.entries.get(&key).cloned() {
return Ok(ds);
}
}
// Miss: open without holding the lock (the open is async IO). A concurrent
// double-miss opens twice and one wins the insert — correct (the dataset
// at a version is immutable) and rare.
let ds = crate::instrumentation::open_table_dataset(location, version, session).await?;
let mut inner = self.inner.lock().await;
if let Some(existing) = inner.entries.get(&key).cloned() {
return Ok(existing);
}
inner.insert(key, ds.clone());
Ok(ds)
}
}
impl TableHandleCacheInner {
fn insert(&mut self, key: TableHandleKey, value: Dataset) {
self.entries.insert(key, value);
}
}
impl Default for TableHandleCacheInner {
fn default() -> Self {
Self {
entries: LruMap::new(TABLE_HANDLE_CACHE_CAP),
}
}
}
/// Per-graph read caches handed to a resolved `Snapshot` so its table opens reuse
/// one shared `Session` (LanceDB's one-session-per-connection pattern) and the
/// held-handle cache. Manual `Debug` because `lance::session::Session` is not
/// `Debug`; this lets `Snapshot` keep its `#[derive(Debug)]`.
pub struct ReadCaches {
pub session: Arc<Session>,
pub handles: Arc<TableHandleCache>,
}
impl std::fmt::Debug for ReadCaches {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReadCaches").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@ -156,4 +325,21 @@ mod tests {
assert!(cache.entries.contains_key(&key(0)));
assert!(!cache.entries.contains_key(&key(1)));
}
#[test]
fn lru_map_evicts_oldest_and_touch_refreshes_order() {
let mut map = LruMap::new(2);
map.insert("a", 1);
map.insert("b", 2);
assert_eq!(map.get(&"a"), Some(&1));
map.insert("c", 3);
assert!(map.contains_key(&"a"));
assert!(!map.contains_key(&"b"));
assert!(map.contains_key(&"c"));
map.invalidate_all();
assert_eq!(map.len(), 0);
}
}

View file

@ -548,6 +548,174 @@ async fn branch_merge_records_single_latest_commit_with_two_parents() {
);
}
// ── P1: commit-DAG coherence on same-branch writes after an external commit ──
//
// `append_commit` takes a new commit's parent from the coordinator's in-memory
// head (commit_graph head_commit, zero storage read), but `commit_all` rebases
// the MANIFEST from a fresh coordinator. So after an external writer advances
// the branch, a same-branch write on a non-refreshed handle commits a fresh
// manifest version yet appends off the stale head — forking the commit DAG (the
// new commit and the external commit share a parent). Data is unaffected (the
// manifest is the visibility authority); only commit history is malformed.
// P1 refreshes the commit-graph head before the append, so the parent is the
// true current head. These two tests are RED before that fix, GREEN after.
/// Non-strict insert: the fork is pre-existing (commit_all rebases the manifest
/// regardless of the stale head), independent of Fix 1.
#[tokio::test]
async fn same_branch_insert_after_external_commit_is_linear() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// Handle A: a long-lived writer whose coordinator head stays pinned at the
// load commit (C0) — it never refreshes before its own write below.
let mut a = init_and_load(&dir).await;
let c0 = CommitGraph::open(uri)
.await
.unwrap()
.head_commit()
.await
.unwrap()
.unwrap();
// External writer B advances main: commit C1, parent C0.
let mut b = Omnigraph::open(uri).await.unwrap();
mutate_main(
&mut b,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "ext_b")], &[("$age", 30)]),
)
.await
.unwrap();
let c1 = CommitGraph::open(uri)
.await
.unwrap()
.head_commit()
.await
.unwrap()
.unwrap();
assert_eq!(
c1.parent_commit_id.as_deref(),
Some(c0.graph_commit_id.as_str()),
"sanity: B's commit C1 should descend from C0"
);
// A writes to main WITHOUT refreshing. A's coordinator still thinks the head
// is C0, so a pre-fix append parents the new commit on C0 instead of C1.
mutate_main(
&mut a,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "local_a")], &[("$age", 40)]),
)
.await
.unwrap();
let commits = CommitGraph::open(uri)
.await
.unwrap()
.load_commits()
.await
.unwrap();
let latest = commits.iter().max_by_key(|c| c.manifest_version).unwrap();
assert_eq!(
latest.parent_commit_id.as_deref(),
Some(c1.graph_commit_id.as_str()),
"A's same-branch write after an external commit must append off the true \
head C1, not the stale head C0 (commit-DAG fork)"
);
let c0_children = commits
.iter()
.filter(|c| c.parent_commit_id.as_deref() == Some(c0.graph_commit_id.as_str()))
.count();
assert_eq!(c0_children, 1, "C0 must have exactly one child; two is the fork");
}
/// Strict update after a read: Fix 1's `refresh_manifest_only` makes the read
/// freshen the read-time pin, defeating the strict 409 that used to force a
/// coherent refresh — so the same stale-head append forks strict ops too.
#[tokio::test]
async fn same_branch_update_after_external_commit_and_read_is_linear() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// A inserts the row it will later update; this is A's own commit (Ca), so
// A's coordinator head is Ca.
let mut a = init_and_load(&dir).await;
mutate_main(
&mut a,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "target")], &[("$age", 40)]),
)
.await
.unwrap();
let ca = CommitGraph::open(uri)
.await
.unwrap()
.head_commit()
.await
.unwrap()
.unwrap();
// External writer B advances main: commit Cb, parent Ca.
let mut b = Omnigraph::open(uri).await.unwrap();
mutate_main(
&mut b,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "ext_b")], &[("$age", 30)]),
)
.await
.unwrap();
let cb = CommitGraph::open(uri)
.await
.unwrap()
.head_commit()
.await
.unwrap()
.unwrap();
assert_eq!(cb.parent_commit_id.as_deref(), Some(ca.graph_commit_id.as_str()));
// A reads main: the stale-probe path refreshes A's MANIFEST (via
// refresh_manifest_only) but not its commit-graph head, freshening the
// read-time pin so the strict update below skips its 409.
query_main(&mut a, TEST_QUERIES, "total_people", &params(&[]))
.await
.unwrap();
// Strict update, no explicit refresh: pre-fix it appends off the stale head
// Ca instead of Cb.
mutate_main(
&mut a,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "target")], &[("$age", 99)]),
)
.await
.unwrap();
let commits = CommitGraph::open(uri)
.await
.unwrap()
.load_commits()
.await
.unwrap();
let latest = commits.iter().max_by_key(|c| c.manifest_version).unwrap();
assert_eq!(
latest.parent_commit_id.as_deref(),
Some(cb.graph_commit_id.as_str()),
"a strict update after an external commit and a local read must append \
off the true head Cb, not the stale head Ca"
);
let ca_children = commits
.iter()
.filter(|c| c.parent_commit_id.as_deref() == Some(ca.graph_commit_id.as_str()))
.count();
assert_eq!(ca_children, 1, "Ca must have exactly one child; two is the fork");
}
#[tokio::test]
async fn branch_merge_records_actor_on_latest_commit() {
let dir = tempfile::tempdir().unwrap();

View file

@ -71,6 +71,14 @@ const FORBIDDEN_PATTERNS: &[&str] = &[
"Dataset::drop_columns",
"Dataset::truncate_table",
"Dataset::restore",
// Raw dataset OPENS — all reads must route through `Snapshot::open` (the
// held-handle cache + shared Session, Fix 3). Only the instrumented opener
// (`instrumentation.rs`) and the storage/manifest layers (allow-listed below)
// open datasets directly; forbidding these in the read/exec layer keeps a
// future read from silently bypassing the cache.
"Dataset::open",
"DatasetBuilder::from_uri",
"DatasetBuilder::from_namespace",
// Lance-specific method names that don't clash with our `TableStore`
// wrappers (we use `merge_insert_batch{,es}`, `add_columns_to_*`,
// etc. — never the bare Lance names). Engine code that writes
@ -106,6 +114,7 @@ const ALLOW_LIST_FILES: &[&str] = &[
"commit_graph.rs", // Maintains `_graph_commits.lance` system table.
"graph_coordinator.rs", // Drives the manifest publisher / branch coordinator.
"recovery_audit.rs", // Maintains `_graph_commit_recoveries.lance` (recovery audit trail).
"instrumentation.rs", // The instrumented dataset opener (open_dataset_tracked / open_table_dataset).
];
/// Directories exempt from the guard. Files under these paths may use

View file

@ -166,6 +166,21 @@ pub async fn mutate_branch(
db.mutate(branch, query_source, query_name, params).await
}
/// Advance the manifest version `n` times (one commit per insert), building
/// deep commit history for cost-budget tests (history depth, not row count).
pub async fn commit_many(db: &mut Omnigraph, n: usize) {
for i in 0..n {
mutate_main(
db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", &format!("commit_many_{i}"))], &[("$age", 30)]),
)
.await
.unwrap();
}
}
pub async fn snapshot_main(db: &Omnigraph) -> Result<Snapshot> {
db.snapshot_of(ReadTarget::branch("main")).await
}

View file

@ -0,0 +1,833 @@
//! Cost-budget tests for the warm read path (Fix 1): a warm same-branch read
//! must perform no manifest or commit-graph opens, measured with Lance's
//! `IOTracker` at the object-store boundary (the LanceDB IO-counted-test
//! pattern; see docs/dev/testing.md). Guards invariant 15 (read cost bounded by
//! work, not history) for snapshot resolution, and invariant 6 (a warm reader
//! still observes external commits).
mod helpers;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use arrow_array::{Array, StringArray};
use lance::io::WrappingObjectStore;
use lance_io::utils::tracking_store::IOTracker;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes};
use omnigraph_compiler::result::QueryResult;
use helpers::{
MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params,
mutate_branch, mutate_main, params,
};
/// IO probes plus the tracker handles to read `read_iops` after the query.
/// Returns `(probes, manifest, commit_graph, table, probe_count)` — `table`
/// counts per-table data opens (the cache-miss path), so a cost test can assert
/// N opens on a cold read and 0 on a warm repeat (Fix 3).
fn probes() -> (
QueryIoProbes,
IOTracker,
IOTracker,
IOTracker,
Arc<AtomicU64>,
) {
let manifest = IOTracker::default();
let commit_graph = IOTracker::default();
let table = IOTracker::default();
let probe_count = Arc::new(AtomicU64::new(0));
let probes = QueryIoProbes {
manifest_wrapper: Some(Arc::new(manifest.clone()) as Arc<dyn WrappingObjectStore>),
commit_graph_wrapper: Some(Arc::new(commit_graph.clone()) as Arc<dyn WrappingObjectStore>),
table_wrapper: Some(Arc::new(table.clone()) as Arc<dyn WrappingObjectStore>),
probe_count: Arc::clone(&probe_count),
};
(probes, manifest, commit_graph, table, probe_count)
}
fn first_column_strings(result: &QueryResult) -> Vec<String> {
if result.num_rows() == 0 {
return Vec::new();
}
let batch = result.concat_batches().unwrap();
let values = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let mut out = (0..values.len())
.filter(|&row| !values.is_null(row))
.map(|row| values.value(row).to_string())
.collect::<Vec<_>>();
out.sort();
out
}
/// A warm same-branch read must not re-open or scan `__manifest`, and must not
/// open the commit graph, even at commit-history depth. The only manifest IO is
/// the version probe (counted by invocation). Fails before Fix 1, where the read
/// path re-opens a fresh coordinator and scans both internal tables.
#[tokio::test]
async fn warm_same_branch_read_does_no_resolution_opens() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Deep history: warm-read resolution cost must be flat in commit count.
commit_many(&mut db, 20).await;
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
// A warm same-branch read opens nothing from the internal tables, even at
// commit-history depth. Fix 1 reuses the coordinator (no re-open: 0
// commit-graph opens, exactly 1 cheap version probe). Fix 2 opens the touched
// data table by location+version instead of via the namespace, so the
// per-table __manifest scan is gone too. Pre-fix, each of these is a deep scan
// of an internal table that grows with commit count.
assert_eq!(
manifest.stats().read_iops,
0,
"warm same-branch read must not scan __manifest (resolution or per-table)"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
"warm same-branch read must not open the commit graph (no coordinator re-open)"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
1,
"warm same-branch read performs exactly one version probe"
);
}
/// A multi-table query (a traversal touching Person, WorksAt, and Company) scans
/// `__manifest` zero times. Fix 2 opens every touched table by location+version,
/// so manifest IO no longer scales with the number of tables — pre-Fix-2 each
/// table cost two full `__manifest` scans (`describe_table` +
/// `describe_table_version`), which is the "2 tables = 2×" multi-table tax.
#[tokio::test]
async fn multi_table_query_does_no_manifest_scans() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let (probes_in, manifest, _commit_graph, _table, _probe) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"age_stats",
&params(&[]),
),
)
.await
.unwrap();
assert_eq!(
manifest.stats().read_iops,
0,
"a multi-table read must not scan __manifest once per touched table"
);
}
/// A warm reader must observe a commit made through another handle (invariant 6,
/// strong consistency): the version probe detects the advance and refreshes.
/// Passes before and after Fix 1 (today's cold re-read is always fresh); a
/// regression guard so the warm-reuse fast path never serves a stale read.
#[tokio::test]
async fn external_commit_observed_by_warm_reader() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
let before = count_rows(&reader, "node:Person").await;
// External commit through a separate handle.
mutate_main(
&mut writer,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "ext_new_person")], &[("$age", 41)]),
)
.await
.unwrap();
let after = count_rows(&reader, "node:Person").await;
assert_eq!(
after,
before + 1,
"warm reader must observe an external commit"
);
}
// ── Finding A: drop the redundant per-query schema validation ─────────────────
//
// Every query runs `ensure_schema_state_valid`. It ran TWICE per query (once in
// query()/run_query_at, once again in resolved_target/snapshot_at_version), each
// reading 3 contract files + 2 existence probes (~10 storage ops). Finding A
// removes the redundant caller, so validation runs once. (A cheaper source-only
// probe was rejected: the codebase requires per-call detection of IR/state drift
// on long-lived handles -- lifecycle::long_lived_handle_rejects_schema_ir_drift
// -- which a source-only compare would miss.) Measured at the StorageAdapter
// boundary with the counting decorator.
/// A warm query validates the schema contract exactly once (3 reads + 2 exists),
/// not twice. Fails before finding A, where query() and resolved_target each
/// validate (6 read_text + 4 exists).
#[tokio::test]
async fn warm_query_validates_schema_contract_once() {
use omnigraph::instrumentation::CountingStorageAdapter;
use omnigraph::storage::storage_for_uri;
let dir = tempfile::tempdir().unwrap();
// Init through the standard path, then re-open behind a counting adapter to
// measure the per-query schema-contract storage reads (delta around the
// query excludes open-time reads).
let _ = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let (adapter, counts) = CountingStorageAdapter::new(storage_for_uri(uri).unwrap());
let db = Omnigraph::open_with_storage(uri, adapter).await.unwrap();
let before_read_text = counts.read_text();
let before_exists = counts.exists();
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
)
.await
.unwrap();
assert_eq!(
counts.read_text() - before_read_text,
3,
"warm query should validate the schema contract once (3 reads), not twice"
);
assert_eq!(
counts.exists() - before_exists,
2,
"warm query should probe contract-file existence once (2 probes), not twice"
);
}
/// The cheap source-compare must still detect that the on-disk schema source has
/// drifted from the validated contract and fail the read, rather than serving the
/// stale-but-cached schema. Passes before and after finding A (regression guard
/// for the documented weaker per-query guard).
#[tokio::test]
async fn schema_source_drift_is_caught_on_read() {
let dir = tempfile::tempdir().unwrap();
let _writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
// Drift the on-disk schema source behind the reader's back.
std::fs::write(
dir.path().join("_schema.pg"),
"this is not a valid schema {{{",
)
.unwrap();
let result = reader
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
)
.await;
assert!(
result.is_err(),
"a query must fail when the on-disk schema source has drifted from the validated contract"
);
}
// ── Morphological-matrix coverage: branch-warm + stale-refresh cells ──────────
/// A WARM read on a non-main branch (handle synced to that branch) also scans
/// `__manifest` zero times. Exercises Fix 2's branch-owned-table open
/// (`{table_path}/tree/{branch}` + with_version) on Fix 1's warm path — the cell
/// that regressed when the open used `with_branch` against the base.
#[tokio::test]
async fn warm_branch_read_does_no_manifest_scans() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
// Write to the branch so its tables are branch-owned (under tree/feature).
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
// Bind the handle's coordinator to the branch so reads of it take the warm path.
db.sync_branch("feature").await.unwrap();
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
db.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
assert_eq!(
manifest.stats().read_iops,
0,
"warm branch read must not scan __manifest (branch-owned table opened by location)"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
"warm branch read must not open the commit graph"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
1,
"warm branch read performs exactly one version probe"
);
}
/// A non-main branch can be deleted and recreated at the same Lance version
/// number. Warm branch freshness therefore needs the manifest incarnation, not
/// just `version()`, or a reader pinned to the old incarnation can serve stale
/// rows from the deleted branch. This is the correctness guard for Phase 6A.
#[tokio::test]
async fn warm_read_on_recreated_branch_observes_new_incarnation() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
reader.sync_branch("feature").await.unwrap();
let old_feature = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(
old_feature.num_rows(),
1,
"test setup: old feature branch must contain Eve"
);
let old_version = reader
.version_of(ReadTarget::branch("feature"))
.await
.unwrap();
writer.branch_delete("feature").await.unwrap();
mutate_main(
&mut writer,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "MainOnly")], &[("$age", 44)]),
)
.await
.unwrap();
writer.branch_create("feature").await.unwrap();
let new_version = writer
.version_of(ReadTarget::branch("feature"))
.await
.unwrap();
assert_eq!(
new_version, old_version,
"test setup must exercise branch incarnation reuse at one Lance version"
);
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
let new_feature = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "MainOnly")]),
),
)
.await
.unwrap();
assert_eq!(
new_feature.num_rows(),
1,
"warm reader must refresh to the recreated branch incarnation"
);
assert!(
manifest.stats().read_iops > 0,
"recreated branch must re-read the manifest after the incarnation probe"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
"same-branch incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
/// Recreated non-main branches can reuse the same branch-owned table version.
/// This forces the held table-handle cache to distinguish incarnations by the
/// per-table Lance manifest e_tag, not just `(table_path, branch, version)`.
#[tokio::test]
async fn recreated_branch_owned_table_handle_uses_table_etag() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "OldOnly")], &[("$age", 31)]),
)
.await
.unwrap();
reader.sync_branch("feature").await.unwrap();
let old_person = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "OldOnly")]),
)
.await
.unwrap();
assert_eq!(old_person.num_rows(), 1);
let old_entry = reader
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("node:Person")
.unwrap()
.clone();
assert_eq!(old_entry.table_branch.as_deref(), Some("feature"));
writer.branch_delete("feature").await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "NewOnly")], &[("$age", 32)]),
)
.await
.unwrap();
let new_entry = writer
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("node:Person")
.unwrap()
.clone();
assert_eq!(new_entry.table_path, old_entry.table_path);
assert_eq!(new_entry.table_branch, old_entry.table_branch);
assert_eq!(
new_entry.table_version, old_entry.table_version,
"test setup must force table handle identity to differ only by e_tag"
);
let (probes_in, manifest, commit_graph, table, probe_count) = probes();
let new_person = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "NewOnly")]),
),
)
.await
.unwrap();
assert_eq!(
new_person.num_rows(),
1,
"warm reader must open the recreated branch-owned table incarnation"
);
assert!(
table.stats().read_iops > 0,
"table e_tag must force a held-handle cache miss for the recreated table"
);
assert!(
manifest.stats().read_iops > 0,
"recreated branch must refresh the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
"same-branch table-incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
"stale same-branch read probes once under each lock"
);
let stale_old_person = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"get_person",
&params(&[("$name", "OldOnly")]),
)
.await
.unwrap();
assert_eq!(
stale_old_person.num_rows(),
0,
"old branch-owned table contents must not leak after branch recreation"
);
}
/// The graph-index cache is keyed by synthetic snapshot id plus edge-table
/// state. A recreated branch can reuse the same edge table `(branch, version)`,
/// so the synthetic snapshot id must carry the manifest incarnation or traversal
/// can reuse stale topology.
#[tokio::test]
async fn recreated_branch_traversal_uses_graph_index_incarnation() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "OldWalker"), ("$friend", "Alice")],
&[("$age", 41)],
),
)
.await
.unwrap();
reader.sync_branch("feature").await.unwrap();
let old_friends = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
&params(&[("$name", "OldWalker")]),
)
.await
.unwrap();
assert_eq!(first_column_strings(&old_friends), vec!["Alice"]);
let old_edge_entry = reader
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("edge:Knows")
.unwrap()
.clone();
assert_eq!(old_edge_entry.table_branch.as_deref(), Some("feature"));
writer.branch_delete("feature").await.unwrap();
writer.branch_create("feature").await.unwrap();
mutate_branch(
&mut writer,
"feature",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "NewWalker"), ("$friend", "Bob")],
&[("$age", 42)],
),
)
.await
.unwrap();
let new_edge_entry = writer
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap()
.entry("edge:Knows")
.unwrap()
.clone();
assert_eq!(new_edge_entry.table_path, old_edge_entry.table_path);
assert_eq!(new_edge_entry.table_branch, old_edge_entry.table_branch);
assert_eq!(
new_edge_entry.table_version, old_edge_entry.table_version,
"test setup must force graph-index identity to differ only by snapshot incarnation"
);
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
let new_friends = with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
&params(&[("$name", "NewWalker")]),
),
)
.await
.unwrap();
assert_eq!(
first_column_strings(&new_friends),
vec!["Bob"],
"traversal must use the recreated branch's topology, not stale cached graph index"
);
assert!(
manifest.stats().read_iops > 0,
"recreated branch traversal must refresh the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
"same-branch traversal incarnation refresh must be manifest-only"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
"stale same-branch read probes once under each lock"
);
let stale_old_friends = reader
.query(
ReadTarget::branch("feature"),
TEST_QUERIES,
"friends_of",
&params(&[("$name", "OldWalker")]),
)
.await
.unwrap();
assert_eq!(
first_column_strings(&stale_old_friends),
Vec::<String>::new(),
"old branch topology must not leak after branch recreation"
);
}
/// When an external writer advances the manifest, the reader's next query takes
/// the STALE path: it re-reads the manifest (read_iops > 0) but never scans the
/// commit graph (`refresh_manifest_only`), unlike a full coordinator refresh.
/// Pins Fix 1's manifest-only refresh.
#[tokio::test]
async fn stale_read_refreshes_manifest_only() {
let dir = tempfile::tempdir().unwrap();
let mut writer = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let reader = Omnigraph::open(uri).await.unwrap();
// Establish the reader's warm coordinator.
reader
.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
)
.await
.unwrap();
// External commit advances the on-disk manifest behind the reader.
mutate_main(
&mut writer,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.unwrap();
let (probes_in, manifest, commit_graph, _table, probe_count) = probes();
with_query_io_probes(
probes_in,
reader.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
assert!(
manifest.stats().read_iops > 0,
"stale read must re-read the manifest"
);
assert_eq!(
commit_graph.stats().read_iops,
0,
"stale refresh must be manifest-only (no commit-graph scan)"
);
assert_eq!(
probe_count.load(Ordering::Relaxed),
2,
"stale same-branch read probes once under the read lock and once under the write lock"
);
}
// ── Fix 3: held-handle cache — warm repeat reads stop re-opening tables ────────
//
// After Fix 1+2 a warm same-branch read still re-opened every touched table per
// query (the "never warms up" residual). Fix 3 holds the open `Dataset` per
// `(table, branch, version, e_tag)` (the version-keyed analogue of LanceDB's
// `DatasetConsistencyWrapper`) and shares one `Session` per graph, so a second
// identical warm read reuses the handle with zero table opens.
/// Headline: a second identical warm same-branch read does ZERO table opens
/// (the cold first read opens; the warm repeat serves from the held-handle
/// cache). Fails before Fix 3, where every read re-opens the table.
#[tokio::test]
async fn repeat_warm_read_reuses_table_handles() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Deep history: the win must hold regardless of commit count.
commit_many(&mut db, 10).await;
// Cold first read: opens the touched table.
let (p1, _m1, _c1, table1, _pr1) = probes();
with_query_io_probes(
p1,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
assert!(
table1.stats().read_iops > 0,
"the cold first read must open the table"
);
// Warm repeat: the held handle is reused, so no open happens through this
// query's table wrapper.
let (p2, manifest2, commit_graph2, table2, probe2) = probes();
with_query_io_probes(
p2,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
assert_eq!(
table2.stats().read_iops,
0,
"a warm repeat read must reuse the held handle (0 table opens)"
);
assert_eq!(
manifest2.stats().read_iops,
0,
"warm repeat read: 0 manifest opens"
);
assert_eq!(
commit_graph2.stats().read_iops,
0,
"warm repeat read: 0 commit-graph opens"
);
assert_eq!(
probe2.load(Ordering::Relaxed),
1,
"warm repeat read: exactly one version probe"
);
}
/// A write advances the table's version, so the next read misses the
/// version-keyed cache and re-opens — never serving a stale handle (invariant 6
/// for the cached path). Passes with or without the cache; a correctness guard
/// that the cache cannot serve pre-write data.
#[tokio::test]
async fn write_invalidates_table_cache_for_changed_table() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let before = count_rows(&db, "node:Person").await;
// Warm the cache for Person.
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
)
.await
.unwrap();
// Write Person: its version advances, so the cached (table, branch, version)
// key is now superseded.
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "cache_miss_one")], &[("$age", 50)]),
)
.await
.unwrap();
// The next read re-opens Person at the new version (cache miss).
let (p, _m, _c, table, _pr) = probes();
with_query_io_probes(
p,
db.query(
ReadTarget::branch("main"),
TEST_QUERIES,
"total_people",
&params(&[]),
),
)
.await
.unwrap();
assert!(
table.stats().read_iops > 0,
"a read after a write to the table must re-open it (version-keyed miss)"
);
let after = count_rows(&db, "node:Person").await;
assert_eq!(
after,
before + 1,
"the post-write read observes the new row (no stale handle served)"
);
}