perf(engine): halve per-write __manifest scans (#307)

* test(write_cost): served-regime __manifest scan tripwire

Adds `internal_table_scans_grow_without_compaction`, the served-regime twin of
`internal_table_scans_are_flat_in_history`. The flat gate `optimize()`s before
every measured write, so it only proves the *compacted* invariant and stays
green even when a served graph's per-write `__manifest` scan amplifies without
bound. This tripwire measures the uncompacted regime and asserts the scan
grows — green today, and it flips RED once the amplification is bounded
(write-path warm-reuse + version-GC), at which point it inverts to a permanent
`assert_flat` gate. RFC-013.

* perf(engine): halve per-write __manifest scans (RFC-013 PR2)

Cuts a same-branch write from ~4 to ~2 `__manifest` scans (measured 50->25 at
depth 10, 410->205 at depth 100) with the OCC contract and snapshot isolation
preserved:

- #1a probe-gate the OCC re-capture in `commit_all` via `occ_snapshot_for_branch`
  (mirrors the read path's `resolve_target_inner`): reuse the warm coordinator
  when a cheap incarnation probe proves it current, fall through to a cold read
  on mismatch.
- #1b fold the post-publish `known_state` in-memory from `existing_versions` plus
  the committed rows instead of an O(fragments) re-scan; extracted the shared
  `assemble_manifest_state` reduction so the fold is byte-identical to a scan,
  proven by the new `post_publish_fold_matches_fresh_reopen` test.
- #1c project `read_manifest_scan` to the columns it reads (drop `base_objects`
  always, `object_id` on the table-state path).

The two remaining publish scans (`load_publish_state` and the `use_index(false)`
merge-insert join) stay O(fragments), bounded by compaction/version-GC (RFC-013
PR1, not in this change).

* test(manifest): reproduce owner-branch handoff fold desync

The PR #307 post-publish fold appends pending table_version rows after
existing_versions, and assemble_manifest_state keeps the first equal-version
entry. A same-version owner-branch handoff updates a table_version row in place
at the same Lance version with a new table_branch (merge-insert UpdateAll on the
deterministic version_object_id), so the warm coordinator keeps the stale fork
while a fresh re-scan reflects the handoff.

This test commits a handoff through the coordinator commit path (exercising the
fold) and asserts the warm snapshot equals a fresh reopen. It is red against the
current fold; the following commit turns it green. Flagged by Cursor Bugbot
(High) and ChatGPT Codex (P2) on PR #307.

* fix(engine): fold table_version rows by (table_key, version) identity

fold_inputs now keys version entries by (table_key, table_version), the manifest
row identity carried by the deterministic version_object_id that the merge-insert
CAS uses. A pending row at the same identity replaces the pre-publish entry,
mirroring merge-insert UpdateAll on disk. Previously the fold appended pending
rows after existing_versions, so an owner-branch handoff left two equal-version
entries and assemble_manifest_state retained the stale one.

The fold input now carries the same one-row-per-(table_key, version) uniqueness a
fresh scan produces, so both feed assemble_manifest_state equivalent inputs and
the warm known_state stays byte-identical to read_manifest_state. This corrects
the derivation's identity model structurally and applies to any same-version
in-place update. Closes the PR #307 review finding.

* test(cost): enable lance-io test-util for IO request diagnostics

Gives IoStats.requests + assert_io_eq!, used by the cost harness to record the
__manifest read log (method + path) for failure diagnostics. Dev-dependency only,
so production builds (which exclude dev-deps) never compile it.

* test(cost): rebuild IO harness on GraphIoMeter + incremental_stats

Consolidate the per-op ProbeHandles into OpProbes plus a persistent GraphIoMeter,
and read per-op deltas via lance's incremental_stats() (get-and-reset) instead of
cumulative stats() -- the upstream per-request idiom
(rust/lance/src/dataset/tests/dataset_io.rs). Add cost_harness(body): it installs
one __manifest tracker for a whole test body, so the graph opens under it and
every coordinator handle (init plus each post-publish reassignment) carries the
same tracker. measure reuses that ambient tracker when present, making
manifest_reads ground truth (warm probe plus cold scans, handle-age-irrelevant);
outside cost_harness it falls back to a fresh per-op tracker (today's behavior).
The body future is boxed so wrapping a whole test body does not overflow the test
thread's stack.

Also stash each op's __manifest read log on the meter for assert_io_eq!-style
failure diagnostics (last_manifest_reads).

Behavior-preserving: no test wraps its body in cost_harness yet, so measure takes
the fallback path and every cost number is unchanged. write_cost and
warm_read_cost stay green.

* test(write_cost): ground-truth __manifest counting via cost_harness

Wrap the three __manifest-asserting tests (flat, grow, ceiling) in cost_harness so
manifest_reads is ground truth -- the warm-coordinator freshness probe rides a
long-lived handle a per-op tracker installed at measure time cannot see. The
flat/grow gates are depth-difference assertions, so the constant per-write probe
offset cancels and they pass unchanged; the absolute ceiling is retightened from
34 to 24 (~18 measured = ~15 publish-path scans + ~3 probe RPCs) with the read log
dumped on a breach.

Add manifest_reads_capture_warm_probe: it measures the same warm write fresh-only
and under cost_harness and asserts ground truth strictly exceeds fresh-only by the
probe's RPCs (11 vs 14). Reverting the ground-truth wiring makes the two equal, so
this guards that a write's warm-handle probe (3 object-store RPCs that were counted
as a single version_probe) cannot silently escape manifest_reads again.

* test(warm_read_cost): ground-truth __manifest counting via cost_harness

Wrap the warm (== 0) manifest gates in cost_harness so manifest_reads is ground
truth. A read's freshness probe is served from Lance's cached manifest at 0
object-store reads (unlike a write's probe, which re-reads after its commit), so
the == 0 assertions hold with no re-baseline -- and now also catch any future
warm-handle scan a per-op tracker would miss. The stale (> 0) tests are unaffected
either way and stay on the fresh fallback.

* docs(testing): document ground-truth cost harness (GraphIoMeter)

The cost harness now reads incremental_stats() deltas and, under cost_harness,
installs one __manifest tracker before the graph opens so manifest_reads is ground
truth (handle-age-irrelevant). Note that version_probes is the probe call count and
that ground truth reveals a write's probe does ~3 object-store RPCs.

* docs(rfc-013): bring write-path handoff current (Thread B + Phase 7 landed)

Prepend a current-state section (§A) for the __manifest scan-amplification /
version-chain thread: the problem, what landed on main (step 2a, Phase 7 #299),
what is in flight on this branch / PR #307 (PR2 scan-halving, the owner-branch
handoff fold fix, the PR2.1 ground-truth cost harness), the accurate measurement
(per-write __manifest ops ~50->410 pre-PR2 vs 28->208 ground truth; the hidden
3-RPC freshness probe), the remaining roadmap (PR1a manual cleanup, PR3-scoping,
deferred PR1b/PR4), critical files, and gotchas.

Staleness fixes: Phase 7 was listed as a future "step 4" but landed as #299, so
mark it LANDED in the TL;DR landed list and in the remaining-steps section.

* docs(rfc-013): refresh PR307 handoff state
This commit is contained in:
Ragnor Comerford 2026-06-27 13:18:04 +02:00 committed by GitHub
parent 1c5cb8741e
commit a7d4cba53d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 842 additions and 58 deletions

View file

@ -55,6 +55,8 @@ arc-swap = { workspace = true }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" }
tokio = { workspace = true }
lance-namespace-impls = { workspace = true }
lance-io = "7.0.0"
# test-util gates IoStats.requests + assert_io_eq! (failure diagnostics only); dev-dep,
# so production builds (which exclude dev-deps) never see it.
lance-io = { version = "7.0.0", features = ["test-util"] }
serial_test = "3"
proptest = "1"

View file

@ -579,13 +579,16 @@ impl ManifestCoordinator {
let PublishOutcome {
dataset,
parent_commit_id,
known_state,
} = self
.publisher
.publish(changes, expected_table_versions, lineage)
.await?;
// RFC-013 PR2 #1b: the publisher folded the new visible state in-memory
// (byte-identical to a re-scan via the shared `assemble_manifest_state`),
// so adopt it directly instead of an O(fragments) `read_manifest_state`.
self.dataset = dataset;
self.known_state = read_manifest_state(&self.dataset).await?;
self.known_state = known_state;
Ok(CommitOutcome {
version: self.version(),
parent_commit_id,

View file

@ -32,11 +32,12 @@ use crate::error::{OmniError, Result};
#[cfg(test)]
use super::SubTableUpdate;
use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id};
use super::metadata::parse_namespace_version_request;
use super::metadata::{TableVersionMetadata, parse_namespace_version_request};
use super::migrations::migrate_internal_schema;
use super::state::{
GraphLineageRow, GraphLineageRowPart, graph_lineage_row_parts, head_lineage_row,
manifest_rows_batch, manifest_schema, read_publish_scan,
GraphLineageRow, GraphLineageRowPart, ManifestState, assemble_manifest_state,
graph_lineage_row_parts, head_lineage_row, manifest_rows_batch, manifest_schema,
read_manifest_state, read_publish_scan,
};
use super::{
ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION,
@ -82,6 +83,11 @@ pub(super) struct PublishOutcome {
/// in-memory commit cache without a re-read. `None` when no lineage was
/// recorded, or when the commit is the genesis (no parent).
pub parent_commit_id: Option<String>,
/// The new visible per-table state, folded in-memory from the pre-publish
/// state the committed rows (RFC-013 PR2 #1b). Returned so the caller skips
/// the O(fragments) post-publish `read_manifest_state` re-scan. Byte-identical
/// to that re-scan: built through the same `assemble_manifest_state` reduction.
pub known_state: ManifestState,
}
#[async_trait]
@ -451,6 +457,101 @@ impl GraphNamespacePublisher {
latest
}
/// Build the inputs for [`assemble_manifest_state`] from the pre-publish state
/// unioned with the pending rows about to be committed — the in-memory basis
/// for the post-publish `known_state` fold (RFC-013 PR2 #1b), so the caller
/// skips the O(fragments) re-scan. Mirrors `read_manifest_scan`'s row handling
/// exactly so the result is byte-identical: `table_path` resolves through
/// `table_locations` = `registered_tables` UNION the pending `OBJECT_TYPE_TABLE`
/// rows (a freshly-registered table is not yet in `registered_tables`);
/// `version_metadata` parses the SAME JSON string a re-scan would read. Pending
/// `OBJECT_TYPE_TABLE` rows feed only `table_locations`; lineage rows
/// (`graph_commit`/`graph_head`) are not manifest-state entries.
fn fold_inputs(
existing_versions: &HashMap<(String, u64), SubTableEntry>,
existing_tombstones: &HashMap<(String, u64), ()>,
rows: &[PendingVersionRow],
registered_tables: &HashMap<String, String>,
) -> Result<(Vec<SubTableEntry>, Vec<(String, u64)>)> {
let mut table_locations: HashMap<String, String> = registered_tables.clone();
for row in rows {
if row.object_type == OBJECT_TYPE_TABLE {
if let Some(location) = &row.location {
table_locations.insert(row.table_key.clone(), location.clone());
}
}
}
// Key version entries by `(table_key, table_version)` so a pending row at
// the SAME version REPLACES the pre-publish entry — modelling merge-insert
// `UpdateAll` on the shared, deterministic `version_object_id(table_key,
// version)`. Load-bearing for the owner-branch handoff
// (`is_owner_branch_handoff`): a handoff updates a `table_version` row in
// place at the same version with a new `table_branch`, so `__manifest` ends
// with ONE row carrying the new branch and a re-scan reflects it; appending
// the pending row instead (and letting `assemble_manifest_state` keep the
// first equal-version entry) would leave `known_state` on the stale fork.
let mut version_map: HashMap<(String, u64), SubTableEntry> = existing_versions.clone();
let mut tombstones: Vec<(String, u64)> = existing_tombstones
.keys()
.map(|(key, version)| (key.clone(), *version))
.collect();
for row in rows {
match row.object_type.as_str() {
OBJECT_TYPE_TABLE_VERSION => {
let table_version = row.table_version.ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: table_version row missing version for {}",
row.table_key
))
})?;
let table_path =
table_locations.get(&row.table_key).cloned().ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: missing table row for {}",
row.table_key
))
})?;
let metadata_json = row.metadata.as_deref().ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: table_version row missing metadata for {}",
row.table_key
))
})?;
version_map.insert(
(row.table_key.clone(), table_version),
SubTableEntry {
table_key: row.table_key.clone(),
table_path,
table_version,
table_branch: row.table_branch.clone(),
row_count: row.row_count.ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: table_version row missing row_count for {}",
row.table_key
))
})?,
version_metadata: TableVersionMetadata::from_json_str(metadata_json)?,
},
);
}
OBJECT_TYPE_TABLE_TOMBSTONE => {
let tombstone_version = row.table_version.ok_or_else(|| {
OmniError::manifest_internal(format!(
"post-publish fold: tombstone row missing version for {}",
row.table_key
))
})?;
tombstones.push((row.table_key.clone(), tombstone_version));
}
_ => {}
}
}
Ok((version_map.into_values().collect(), tombstones))
}
/// Compare each caller-supplied expectation against the manifest's current
/// latest visible version per table. The first mismatch is returned as a
/// typed `ExpectedVersionMismatch` (`actual = 0` if the table isn't in the
@ -578,9 +679,15 @@ impl ManifestBatchPublisher for GraphNamespacePublisher {
lineage: Option<&LineageIntent>,
) -> Result<PublishOutcome> {
if changes.is_empty() && expected_table_versions.is_empty() && lineage.is_none() {
// Defensive no-op (never reached from `commit_changes_with_lineage`,
// which short-circuits the all-empty case): state is unchanged, so a
// re-scan here is acceptable.
let dataset = self.dataset().await?;
let known_state = read_manifest_state(&dataset).await?;
return Ok(PublishOutcome {
dataset: self.dataset().await?,
dataset,
parent_commit_id: None,
known_state,
});
}
@ -645,18 +752,39 @@ impl ManifestBatchPublisher for GraphNamespacePublisher {
if rows.is_empty() {
// Expected-version-only publish with no changes and no lineage:
// the precondition held, nothing to write.
// the precondition held, nothing to write. Fold the unchanged state
// from the loaded maps — no re-scan (RFC-013 PR2 #1b).
let known_state = assemble_manifest_state(
dataset.version().version,
existing_versions.values().cloned().collect(),
existing_tombstones
.keys()
.map(|(key, version)| (key.clone(), *version)),
);
return Ok(PublishOutcome {
dataset,
parent_commit_id,
known_state,
});
}
// Build the post-publish fold inputs from the pre-publish state the
// rows we are about to commit, BEFORE `rows` is moved into merge_rows
// (RFC-013 PR2 #1b). Recomputed per attempt from freshly-loaded state.
let (fold_entries, fold_tombstones) =
Self::fold_inputs(&existing_versions, &existing_tombstones, &rows, &known_tables)?;
match self.merge_rows(dataset, rows).await {
Ok(new_dataset) => {
let known_state = assemble_manifest_state(
new_dataset.version().version,
fold_entries,
fold_tombstones,
);
return Ok(PublishOutcome {
dataset: new_dataset,
parent_commit_id,
known_state,
});
}
Err(err) => {

View file

@ -131,9 +131,30 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestSta
let version = dataset.version().version;
// The table-state hot path never needs lineage, so don't pay its JSON decode.
let scan = read_manifest_scan(dataset, false).await?;
let mut latest_versions = HashMap::<String, SubTableEntry>::new();
Ok(assemble_manifest_state(
version,
scan.version_entries,
scan.tombstones
.into_iter()
.map(|t| (t.table_key, t.tombstone_version)),
))
}
for entry in scan.version_entries {
/// Reduce raw manifest rows to the visible per-table state: keep the latest
/// `table_version` per `table_key`, drop any whose latest version is sealed by a
/// tombstone (`tombstone_version >= table_version`), then sort by `table_key` for
/// deterministic output. Shared by the scan path (`read_manifest_state`) and the
/// in-memory post-publish fold in the publisher (RFC-013 PR2 #1b), so the two
/// CANNOT diverge in the dedup/filter/sort — the byte-identity the fold relies on.
/// Tombstones are passed as `(table_key, tombstone_version)` tuples so callers
/// outside this module need not name the private `TableTombstoneEntry`.
pub(super) fn assemble_manifest_state(
version: u64,
version_entries: Vec<SubTableEntry>,
tombstones: impl IntoIterator<Item = (String, u64)>,
) -> ManifestState {
let mut latest_versions = HashMap::<String, SubTableEntry>::new();
for entry in version_entries {
match latest_versions.get(&entry.table_key) {
Some(existing) if existing.table_version >= entry.table_version => {}
_ => {
@ -142,12 +163,12 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestSta
}
}
let mut tombstones = HashMap::<String, u64>::new();
for tombstone in scan.tombstones {
match tombstones.get(&tombstone.table_key) {
Some(existing) if *existing >= tombstone.tombstone_version => {}
let mut tombstone_map = HashMap::<String, u64>::new();
for (table_key, tombstone_version) in tombstones {
match tombstone_map.get(&table_key) {
Some(existing) if *existing >= tombstone_version => {}
_ => {
tombstones.insert(tombstone.table_key, tombstone.tombstone_version);
tombstone_map.insert(table_key, tombstone_version);
}
}
}
@ -155,15 +176,14 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestSta
let mut entries: Vec<SubTableEntry> = latest_versions
.into_values()
.filter(|entry| {
tombstones
tombstone_map
.get(&entry.table_key)
.map(|tombstone_version| *tombstone_version < entry.table_version)
.unwrap_or(true)
})
.collect();
entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
Ok(ManifestState { version, entries })
ManifestState { version, entries }
}
// After RFC-013 P2 folded the publish path off this accessor (it now projects
@ -245,8 +265,29 @@ fn decode_graph_commit_row(
}
async fn read_manifest_scan(dataset: &Dataset, collect_lineage: bool) -> Result<ManifestScan> {
let batches: Vec<RecordBatch> = dataset
.scan()
// Project only the columns the assembly below reads (RFC-013 PR2 #1c). The
// table-state hot path never touches `object_id` (lineage decode only) or
// `base_objects` (reserved/unused — never read on any path), so reading them
// is wasted bytes on every `__manifest` scan — write publish AND every
// branch-op open. Mirrors Lance's own directory-catalog `__manifest` reads,
// which project to the needed columns rather than scanning all of them.
let mut projection: Vec<&str> = vec![
"object_type",
"location",
"metadata",
"table_key",
"table_version",
"table_branch",
"row_count",
];
if collect_lineage {
projection.push("object_id");
}
let mut scanner = dataset.scan();
scanner
.project(&projection)
.map_err(|e| OmniError::Lance(e.to_string()))?;
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?

View file

@ -909,6 +909,143 @@ async fn test_batch_create_table_versions_allows_owner_branch_handoff_at_same_ve
assert_eq!(experiment_entry.table_branch.as_deref(), Some("experiment"));
}
/// Regression (PR #307 review — Cursor Bugbot High + Codex P2): the post-publish
/// fold (`#1b`) must reflect an owner-branch handoff. A handoff UPDATEs a
/// `table_version` row IN PLACE at the SAME Lance version with a new
/// `table_branch` — merge-insert `UpdateAll` on the deterministic
/// `version_object_id(table_key, version)`, so `__manifest` ends with one row
/// carrying the new branch. The buggy fold appended the pending row after
/// `existing_versions`, and `assemble_manifest_state` keeps the FIRST entry at
/// equal `table_version`, so the WARM coordinator retained the stale
/// `table_branch` ("feature") while a fresh `read_manifest_state` reopen reflected
/// the handoff ("experiment"). Unlike the namespace-publisher handoff test above,
/// this commits through the coordinator's `commit` path to exercise the fold, then
/// reads the warm `snapshot()` WITHOUT reopening.
#[tokio::test]
async fn test_post_publish_fold_reflects_owner_branch_handoff() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let catalog = build_test_catalog();
let mut main_mc = ManifestCoordinator::init(uri, &catalog).await.unwrap();
main_mc.create_branch("feature").await.unwrap();
// Fork Person onto `feature` at version Vf (owner = feature).
let snap = main_mc.snapshot();
let person_entry = snap.entry("node:Person").unwrap().clone();
let mut person_ds = Dataset::open(&format!("{}/{}", uri, person_entry.table_path))
.await
.unwrap();
person_ds
.create_branch("feature", person_entry.table_version, None)
.await
.unwrap();
let mut feature_ds = person_ds.checkout_branch("feature").await.unwrap();
let person_schema = Arc::new(feature_ds.schema().into());
let person_batch = RecordBatch::try_new(
Arc::clone(&person_schema),
vec![
Arc::new(StringArray::from(vec!["person-1"])),
Arc::new(StringArray::from(vec!["Alice"])),
Arc::new(Int32Array::from(vec![Some(30)])),
],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![Ok(person_batch)], person_schema);
feature_ds.append(reader, None).await.unwrap();
let feature_version = feature_ds.version().version;
let feature_metadata = table_version_metadata_for_state(
uri,
&person_entry.table_path,
Some("feature"),
feature_version,
)
.await
.unwrap();
branch_manifest_namespace(uri, Some("feature"))
.create_table_version(feature_metadata.to_create_table_version_request(
"node:Person",
feature_version,
1,
Some("feature"),
))
.await
.unwrap();
// Create `experiment` from feature and fork Person at the SAME version Vf.
let mut feature_mc = ManifestCoordinator::open_at_branch(uri, "feature")
.await
.unwrap();
feature_mc.create_branch("experiment").await.unwrap();
feature_ds
.create_branch("experiment", feature_version, None)
.await
.unwrap();
let experiment_metadata = table_version_metadata_for_state(
uri,
&person_entry.table_path,
Some("experiment"),
feature_version,
)
.await
.unwrap();
// Publish the handoff through a WARM coordinator's `commit` (exercises the
// post-publish fold), NOT GraphNamespacePublisher (which reopens fresh).
let mut experiment_mc = ManifestCoordinator::open_at_branch(uri, "experiment")
.await
.unwrap();
// Pre-publish: experiment inherits feature's ownership of Person@Vf.
assert_eq!(
experiment_mc
.snapshot()
.entry("node:Person")
.unwrap()
.table_branch
.as_deref(),
Some("feature"),
);
experiment_mc
.commit(&[SubTableUpdate {
table_key: "node:Person".to_string(),
table_version: feature_version,
table_branch: Some("experiment".to_string()),
row_count: 1,
version_metadata: experiment_metadata,
}])
.await
.unwrap();
// Warm side: the folded known_state the commit adopted.
let folded_branch = experiment_mc
.snapshot()
.entry("node:Person")
.unwrap()
.table_branch
.clone();
// Oracle: a fresh reopen rebuilds known_state via `read_manifest_state`.
let reopened = ManifestCoordinator::open_at_branch(uri, "experiment")
.await
.unwrap();
let scanned_branch = reopened
.snapshot()
.entry("node:Person")
.unwrap()
.table_branch
.clone();
assert_eq!(
scanned_branch.as_deref(),
Some("experiment"),
"fresh reopen should reflect the owner-branch handoff",
);
assert_eq!(
folded_branch, scanned_branch,
"warm coordinator's folded known_state diverged from a fresh re-scan after an \
owner-branch handoff (folded {folded_branch:?} vs scanned {scanned_branch:?})",
);
}
#[tokio::test]
async fn test_staged_namespace_lists_native_table_versions_before_publish() {
let dir = tempfile::tempdir().unwrap();

View file

@ -866,6 +866,33 @@ impl Omnigraph {
Ok(manifest.snapshot())
}
/// Probe-gated OCC re-capture snapshot (RFC-013 PR2 #1a). The commit-time OCC
/// re-read MUST be as fresh as a cold re-scan (see `commit_all`), but a cold
/// `__manifest` full scan per write is O(fragments) and a dominant write-path
/// cost. When the warm coordinator is already bound to `branch` AND a cheap
/// incarnation probe (one object-store op, no row scan) proves it is current,
/// the warm snapshot IS byte-identical to a fresh re-read, so reuse it with
/// zero scans. On ANY mismatch — a concurrent in- or cross-process advance, or
/// a different bound branch — fall through to the cold
/// `fresh_snapshot_for_branch_unchecked` read, preserving the "must be fresh"
/// contract and cross-process drift detection. Mirrors the read path's
/// `resolve_target_inner` probe-and-reuse idiom; same-branch sequential writes
/// keep `coordinator` current (each commit refreshes its `known_state`), so the
/// common case reuses and skips the scan. The publish CAS
/// (`expected_table_versions`) remains the final arbiter.
pub(crate) async fn occ_snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
{
let coord = self.coordinator.read().await;
if branch == coord.current_branch() {
let held = coord.manifest_incarnation();
if coord.probe_latest_incarnation().await?.matches(&held) {
return Ok(coord.snapshot());
}
}
}
self.fresh_snapshot_for_branch_unchecked(branch).await
}
pub(crate) async fn version(&self) -> u64 {
self.coordinator.read().await.version()
}

View file

@ -602,15 +602,17 @@ impl StagedMutation {
// genuine cross-process drift detection from this read for
// free.
//
// This MUST be a FRESH per-branch manifest read (never the warm
// cache) for the OCC re-capture below — but with a `WriteTxn` the
// schema contract was already validated at capture, so use the
// `_unchecked` variant, which drops the redundant
// `ensure_schema_state_valid` AND the commit-graph load the OCC read
// never consults (a fresh manifest read yields the same `Snapshot`).
// Without a txn this is byte-identical to the prior checked call.
// This MUST be as fresh as a FRESH per-branch manifest read for the OCC
// re-capture below. With a `WriteTxn` the schema contract was already
// validated at capture, so the txn arm uses `occ_snapshot_for_branch`
// (RFC-013 PR2 #1a): a cheap incarnation probe reuses the warm
// coordinator when it is already current (probe-match ⟺ warm == fresh)
// and falls through to the cold `_unchecked` read on any mismatch — so
// freshness AND cross-process drift detection are preserved while the
// common same-branch sequential write skips the O(fragments) scan.
// Without a txn, keep the checked cold read (legacy path).
let snapshot = match txn {
Some(_) => db.fresh_snapshot_for_branch_unchecked(branch).await?,
Some(_) => db.occ_snapshot_for_branch(branch).await?,
None => db.fresh_snapshot_for_branch(branch).await?,
};
for entry in staged.iter_mut() {

View file

@ -227,8 +227,59 @@ impl ObjectStore for PrefixCountingStore {
}
}
/// The tracker handles backing one measurement; read once into [`IoCounts`].
struct ProbeHandles {
// ── Ground-truth `__manifest` meter (lance's per-request tracking idiom) ──
//
// Lance counts IO on a warm/cached dataset by attaching one `IOTracker` to the open
// handle (`Dataset::with_object_store_wrappers`, shared session) and reading
// `incremental_stats()` per request (`rust/lance/src/dataset/tests/dataset_io.rs`).
// We do the same for `__manifest`: `cost_harness` installs ONE persistent tracker for
// a whole test body, so the graph opens UNDER it and every coordinator handle — the
// init handle and each post-publish/refresh reassignment (`db/manifest.rs` keeps
// `self.dataset = …`) — carries the same tracker. `manifest_reads` is then ground
// truth (warm probe + cold scans), handle-age-irrelevant, instead of only the reads
// on handles a single measured op happened to open. Data/commit-graph/probe/open
// counters stay fresh per op (their warm-handle exposure is out of scope here).
/// Persistent per-test meter: owns the ground-truth `__manifest` tracker reused
/// across every `measure` in a `cost_harness` body.
#[derive(Clone, Default)]
pub struct GraphIoMeter {
manifest: IOTracker,
/// The most recent measured op's `__manifest` request log (method + path),
/// stashed for `assert_io_eq!`-style failure diagnostics. Populated in
/// ground-truth mode only (the standalone fallback has no ambient meter).
last_manifest_log: Arc<Mutex<Vec<String>>>,
}
tokio::task_local! {
static COST_METER: GraphIoMeter;
}
/// Run `body` with a persistent ground-truth `__manifest` tracker installed for its
/// whole lifetime. The graph MUST be opened inside `body` (e.g. via `local_graph`)
/// so its coordinator's `__manifest` handle is wrapped from birth. `measure` calls
/// inside reuse that tracker, so `manifest_reads` counts every `__manifest` read
/// regardless of which handle performed it (the warm probe included). Outside
/// `cost_harness`, `measure` falls back to a fresh per-op tracker — today's
/// fresh-open-only behavior, used by `write_cost_s3.rs`.
pub async fn cost_harness<F: Future>(body: F) -> F::Output {
let meter = GraphIoMeter::default();
let probes = QueryIoProbes {
manifest_wrapper: Some(Arc::new(meter.manifest.clone()) as Arc<dyn WrappingObjectStore>),
..Default::default()
};
// Box the body so the (large) per-test future lives on the heap. Wrapping a whole
// test body in another async layer otherwise overflows the test thread's stack —
// these cost tests already raise `recursion_limit` for the same reason.
COST_METER
.scope(meter, with_query_io_probes(probes, Box::pin(body)))
.await
}
/// The tracker handles backing one measurement; read once into [`IoCounts`]. Data,
/// commit-graph, probe, and open counters are fresh per op; the `__manifest` tracker
/// is the ambient ground-truth one when inside `cost_harness`, else fresh.
struct OpProbes {
manifest: IOTracker,
commit_graph: IOTracker,
table: PrefixCounter,
@ -237,10 +288,18 @@ struct ProbeHandles {
internal_open_count: Arc<AtomicU64>,
}
impl ProbeHandles {
impl OpProbes {
fn install() -> (QueryIoProbes, Self) {
let h = ProbeHandles {
manifest: IOTracker::default(),
// Reuse the ambient ground-truth `__manifest` tracker so reads on the warm
// coordinator handle (the freshness probe) land in it; fall back to a fresh
// tracker when standalone. Reset it (get-and-reset) so this op's delta
// excludes reads from init / `commit_many` between measures.
let manifest = COST_METER
.try_with(|m| m.manifest.clone())
.unwrap_or_default();
let _ = manifest.incremental_stats();
let h = OpProbes {
manifest,
commit_graph: IOTracker::default(),
table: PrefixCounter::default(),
probe_count: Arc::new(AtomicU64::new(0)),
@ -262,13 +321,26 @@ impl ProbeHandles {
fn counts(&self) -> IoCounts {
let t = self.table.snapshot();
// `incremental_stats()` (get-and-reset) yields this op's reads: in
// ground-truth mode the tracker spans the whole test and was reset in
// `install`; standalone it is fresh so the delta is the whole count.
let manifest = self.manifest.incremental_stats();
// Stash the manifest read log (method + path) on the ambient meter for
// `assert_io_eq!`-style failure diagnostics; no-op when standalone.
let _ = COST_METER.try_with(|meter| {
*meter.last_manifest_log.lock().unwrap() = manifest
.requests
.iter()
.map(|r| format!("{} {}", r.method, r.path))
.collect();
});
IoCounts {
data_reads: t.reads,
data_writes: t.writes,
data_opener_reads: t.opener_reads,
data_scan_reads: t.scan_reads,
manifest_reads: self.manifest.stats().read_iops,
commit_graph_reads: self.commit_graph.stats().read_iops,
manifest_reads: manifest.read_iops,
commit_graph_reads: self.commit_graph.incremental_stats().read_iops,
version_probes: self.probe_count.load(Ordering::Relaxed),
data_open_count: self.data_open_count.load(Ordering::Relaxed),
internal_open_count: self.internal_open_count.load(Ordering::Relaxed),
@ -276,10 +348,19 @@ impl ProbeHandles {
}
}
/// The most recent measured op's `__manifest` reads (`method path`) for failure
/// diagnostics — the `assert_io_eq!` read-log, scoped to `__manifest`. Empty
/// outside `cost_harness` (the standalone fallback records no ambient log).
pub fn last_manifest_reads() -> Vec<String> {
COST_METER
.try_with(|m| m.last_manifest_log.lock().unwrap().clone())
.unwrap_or_default()
}
/// Run `op` under object-store IO counting; return its output + the counts.
/// The only place the `QueryIoProbes` task-local + tracker wiring lives.
pub async fn measure<F: Future>(op: F) -> (F::Output, IoCounts) {
let (probes, handles) = ProbeHandles::install();
let (probes, handles) = OpProbes::install();
let out = with_query_io_probes(probes, op).await;
(out, handles.counts())
}
@ -287,7 +368,7 @@ pub async fn measure<F: Future>(op: F) -> (F::Output, IoCounts) {
/// Like [`measure`], but also capture which staged-write primitives ran
/// (composes the two task-locals cleanly).
pub async fn measure_with_staged<F: Future>(op: F) -> (F::Output, IoCounts, StagedCounts) {
let (probes, handles) = ProbeHandles::install();
let (probes, handles) = OpProbes::install();
let merge = MergeWriteProbes::default();
let out = with_merge_write_probes(merge.clone(), with_query_io_probes(probes, op)).await;
let staged = StagedCounts {

View file

@ -11,7 +11,7 @@ use arrow_array::{Array, StringArray};
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph_compiler::result::QueryResult;
use helpers::cost::measure;
use helpers::cost::{cost_harness, measure};
use helpers::{
MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params,
mutate_branch, mutate_main, params,
@ -35,12 +35,17 @@ fn first_column_strings(result: &QueryResult) -> Vec<String> {
out
}
/// A warm same-branch read must not re-open or scan `__manifest`, and must not
/// open the commit graph, even at commit-history depth. The only manifest IO is
/// the version probe (counted by invocation). Fails before Fix 1, where the read
/// path re-opens a fresh coordinator and scans both internal tables.
/// A warm same-branch read must do ZERO `__manifest` object-store reads and must
/// not open the commit graph, even at commit-history depth. Wrapped in
/// `cost_harness`, so `manifest_reads` is ground truth: the warm-coordinator
/// freshness probe rides the long-lived handle (which now carries the tracker) and
/// is served from Lance's cached manifest at 0 store reads, so this `== 0` also
/// catches any future warm-handle scan a per-op tracker would miss. Fails before
/// Fix 1, where the read path re-opens a fresh coordinator and scans both internal
/// tables.
#[tokio::test]
async fn warm_same_branch_read_does_no_resolution_opens() {
cost_harness(async {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Deep history: warm-read resolution cost must be flat in commit count.
@ -73,6 +78,8 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
io.version_probes, 1,
"warm same-branch read performs exactly one version probe"
);
})
.await;
}
/// A multi-table query (a traversal touching Person, WorksAt, and Company) scans
@ -82,6 +89,7 @@ async fn warm_same_branch_read_does_no_resolution_opens() {
/// `describe_table_version`), which is the "2 tables = 2×" multi-table tax.
#[tokio::test]
async fn multi_table_query_does_no_manifest_scans() {
cost_harness(async {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
@ -98,6 +106,8 @@ async fn multi_table_query_does_no_manifest_scans() {
io.manifest_reads, 0,
"a multi-table read must not scan __manifest once per touched table"
);
})
.await;
}
/// A warm reader must observe a commit made through another handle (invariant 6,
@ -222,6 +232,7 @@ async fn schema_source_drift_is_caught_on_read() {
/// that regressed when the open used `with_branch` against the base.
#[tokio::test]
async fn warm_branch_read_does_no_manifest_scans() {
cost_harness(async {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
@ -258,6 +269,8 @@ async fn warm_branch_read_does_no_manifest_scans() {
io.version_probes, 1,
"warm branch read performs exactly one version probe"
);
})
.await;
}
/// A non-main branch can be deleted and recreated at the same Lance version
@ -643,6 +656,7 @@ async fn stale_read_refreshes_manifest_only() {
/// cache). Fails before Fix 3, where every read re-opens the table.
#[tokio::test]
async fn repeat_warm_read_reuses_table_handles() {
cost_harness(async {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Deep history: the win must hold regardless of commit count.
@ -685,6 +699,8 @@ async fn repeat_warm_read_reuses_table_handles() {
warm.version_probes, 1,
"warm repeat read: exactly one version probe"
);
})
.await;
}
/// A write advances the table's version, so the next read misses the

View file

@ -24,8 +24,8 @@
mod helpers;
use helpers::cost::{
IoCounts, assert_flat, assert_grows, local_graph, measure, measure_insert, measure_insert_as,
measure_with_staged,
IoCounts, assert_flat, assert_grows, cost_harness, last_manifest_reads, local_graph, measure,
measure_insert, measure_insert_as, measure_with_staged,
};
use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixed_params};
@ -43,6 +43,10 @@ use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixe
// compacted graph's write cost does not grow with version history."
#[tokio::test]
async fn internal_table_scans_are_flat_in_history() {
// `cost_harness` installs the ground-truth __manifest tracker for the whole body,
// so `manifest_reads` includes the warm-coordinator probe (a constant per write
// that cancels in this depth-difference assertion).
cost_harness(async {
const ACTOR: &str = "act-cost-gate";
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
@ -70,6 +74,66 @@ async fn internal_table_scans_are_flat_in_history() {
// commit_graph_reads covers BOTH _graph_commits and _graph_commit_actors (shared
// wrapper), so this also gates the actor table on the authenticated path.
assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits + _graph_commit_actors scan");
})
.await;
}
/// **Served-regime twin of `internal_table_scans_are_flat_in_history` — the gate
/// that was missing.** The flat gate above calls `db.optimize()` before EVERY
/// measured write, so it only ever proves the *compacted* invariant and stays green
/// even if a served graph's per-write `__manifest` scan amplifies without bound. A
/// real served graph does NOT optimize between writes: every publish appends a
/// fragment to `__manifest`, and the publish-path scan (`read_manifest_scan`, a bare
/// `dataset.scan()` with no filter/projection) reads ALL of them, so the per-write
/// `__manifest` read count is O(fragments-since-compaction) and climbs with history.
/// That is the live amplification behind the reported single-row write latency
/// (~16s on 0.7.2; still growing post-#299) — physical fragment read cost, not
/// logical row count (output rows stay ~flat while requests grow).
///
/// **This is a TRIPWIRE, not the final gate.** It asserts the scan *grows*, i.e. it
/// pins the CURRENT served-regime cost (green today) — exactly the `assert_grows`
/// idiom its sibling `data_table_reads_split_into_flat_opener_and_growing_scan` uses,
/// and the "turns red when the fix lands" shape of the Lance surface guards. It flips
/// RED the moment the amplification is fixed (write-path probe-gated warm reuse, and
/// bringing `__manifest` into `cleanup` version-GC so F stays bounded in history).
/// **When it goes red, that is the signal to invert it to**
/// `assert_flat(&curve, |c| c.manifest_reads, <slack>, "__manifest scan (served)")` —
/// promoting it to the permanent served-regime gate. Only `manifest_reads` is
/// asserted: #299 moved lineage into `__manifest` and made the per-write commit-graph
/// update in-memory, so `commit_graph_reads` no longer grows per write on this branch.
#[tokio::test]
async fn internal_table_scans_grow_without_compaction() {
cost_harness(async {
const ACTOR: &str = "act-cost-gate-served";
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many_as(&mut db, (d - current) as usize, ACTOR).await;
current = d;
}
// NO `db.optimize()` here — that omission is the whole point. The flat gate
// above compacts before measuring and so never exercises this served regime.
let io = measure_insert_as(&mut db, &format!("served_{d}"), ACTOR).await;
current += 1; // the measured write advanced depth by one
eprintln!(
"depth~{d} (uncompacted): data={} __manifest={} _graph_commits+actors={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads
);
curve.push((d, io));
}
// Green TODAY (the bug): the per-write `__manifest` scan is O(fragments) and grows
// by far more than the flat gate's slack of 4 across a 10→100 depth sweep. The `20`
// floor mirrors the proven-safe `assert_grows` sibling (data-table scan) and sits
// comfortably below the real growth (~+3 `__manifest` reads/depth × ~90 depth × the
// 34 publish-path scans) while unambiguously distinguishing "grows" from "flat".
assert_grows(&curve, |c| c.manifest_reads, 20, "__manifest scan (uncompacted/served)");
})
.await;
}
// The data-table OPENER history-gate (opener flat across depth) lives in
@ -142,6 +206,7 @@ async fn single_insert_data_write_is_bounded() {
/// P2 fold (was ~44 / ~54 with the four separate scans).
#[tokio::test]
async fn write_op_count_ceiling_at_shallow_depth() {
cost_harness(async {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
@ -150,15 +215,19 @@ async fn write_op_count_ceiling_at_shallow_depth() {
"depth~5: data={} __manifest={} _graph_commits={} total_reads={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads, io.total_reads()
);
// Sub-ceiling on `__manifest` reads specifically: the publish path does one
// scan, not four. ~26 measured at this depth; a re-added scan would push it
// well past this. (Deterministic on local FS.)
const MANIFEST_CEILING: u64 = 34;
// Sub-ceiling on ground-truth `__manifest` reads. ~18 measured at this depth =
// ~15 publish-path scans (one fold, not four — RFC-013 P2) + ~3 from the
// warm-coordinator freshness probe, which ground truth now counts (the
// `version_probes=1` call is 3 object-store RPCs). A re-added publish scan trips
// this; `last_manifest_reads()` dumps the read log (method + path) so a breach
// names the offending objects. (Deterministic on local FS.)
const MANIFEST_CEILING: u64 = 24;
assert!(
io.manifest_reads <= MANIFEST_CEILING,
"per-write __manifest reads {} exceeded ceiling {MANIFEST_CEILING} — a publish-path \
scan was re-added (RFC-013 P2 folds them into one)",
scan was re-added (RFC-013 P2 folds them into one). Reads: {:#?}",
io.manifest_reads,
last_manifest_reads(),
);
const CEILING: u64 = 80;
assert!(
@ -166,6 +235,8 @@ async fn write_op_count_ceiling_at_shallow_depth() {
"per-write read ops {} exceeded ceiling {CEILING} — a new round-trip was added",
io.total_reads()
);
})
.await;
}
// ── (C) Fitness assert via the staged-write probes ──
@ -271,3 +342,44 @@ async fn keyed_insert_opens_table_at_most_once() {
io.data_open_count,
);
}
// ── (E) Ground-truth __manifest counting (PR2.1) — the blind-spot guard ──
/// The warm-coordinator freshness probe rides a long-lived handle, so a per-op
/// (fresh) tracker installed at measure time CANNOT see its reads — that was the
/// blind spot. `cost_harness` attaches the tracker BEFORE the coordinator opens, so
/// the probe's reads ARE counted (`manifest_reads` is ground truth, not just fresh
/// opens). Proven by measuring the same warm write both ways: ground truth strictly
/// exceeds fresh-only, by the probe's object-store RPCs. Reverting the ground-truth
/// wiring (so `manifest_reads` reverts to fresh-per-op) makes the two equal → RED.
#[tokio::test]
async fn manifest_reads_capture_warm_probe() {
// Fresh-only (no `cost_harness`): the warm coordinator handle was opened outside
// any meter, so the freshness probe's reads escape `manifest_reads`.
let fresh = {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 3).await; // warm the coordinator
let io = measure_insert(&mut db, "fresh").await;
eprintln!("fresh-only warm write: __manifest={}", io.manifest_reads);
io.manifest_reads
};
// Ground truth (`cost_harness`): the same warm probe is now counted.
cost_harness(async move {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 3).await;
let io = measure_insert(&mut db, "ground_truth").await;
eprintln!("ground-truth warm write: __manifest={}", io.manifest_reads);
assert!(
io.manifest_reads > fresh,
"ground-truth __manifest reads {} must exceed fresh-only {fresh} by the \
warm-coordinator probe's RPCs else the warm-handle probe is escaping the \
tracker (the blind spot this guards). Reads: {:#?}",
io.manifest_reads,
last_manifest_reads(),
);
})
.await;
}

View file

@ -1725,3 +1725,57 @@ query chain($repo: String) {
.expect("chained camelCase mutation must read the pending row, not fail at the MemTable SELECT");
assert_eq!(r.affected_nodes, 2, "both ops should touch the acme Doc (read-your-writes)");
}
/// RFC-013 PR2 #1b: the publisher folds the new `known_state` in-memory after a
/// publish instead of re-scanning `__manifest`. That fold MUST be byte-identical
/// to a fresh re-scan, or the warm coordinator silently desyncs. After a sequence
/// of writes (insert, a second insert to the same table, then a delete that
/// advances the table version), the in-memory coordinator holds the folded state;
/// a freshly reopened graph rebuilds it via a real `read_manifest_state` scan.
/// Counting `node:Person` off each resolves the table at the version each side
/// recorded — a fold that set the wrong version (or path → open failure) makes the
/// in-memory count diverge from the reopened one. Reopen is the scan side; the live
/// `db` is the fold side.
#[tokio::test]
async fn post_publish_fold_matches_fresh_reopen() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = init_and_load(&dir).await;
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "fold_a")], &[("$age", 30)]),
)
.await
.unwrap();
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "fold_b")], &[("$age", 31)]),
)
.await
.unwrap();
db.mutate(
"main",
MUTATION_QUERIES,
"remove_person",
&mixed_params(&[("$name", "fold_a")], &[]),
)
.await
.unwrap();
// Fold side: count resolves the snapshot from the in-memory folded known_state.
let folded = count_rows(&db, "node:Person").await;
// Scan side: a fresh open rebuilds known_state via `read_manifest_state`.
let reopened = Omnigraph::open(uri).await.unwrap();
let scanned = count_rows(&reopened, "node:Person").await;
assert_eq!(
folded, scanned,
"post-publish fold diverged from a fresh re-scan (folded {folded} vs scanned {scanned})"
);
}