diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index 5038fd1..8801c42 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -55,6 +55,8 @@ arc-swap = { workspace = true } omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" } tokio = { workspace = true } lance-namespace-impls = { workspace = true } -lance-io = "7.0.0" +# test-util gates IoStats.requests + assert_io_eq! (failure diagnostics only); dev-dep, +# so production builds (which exclude dev-deps) never see it. +lance-io = { version = "7.0.0", features = ["test-util"] } serial_test = "3" proptest = "1" diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index da22136..806c4be 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -579,13 +579,16 @@ impl ManifestCoordinator { let PublishOutcome { dataset, parent_commit_id, + known_state, } = self .publisher .publish(changes, expected_table_versions, lineage) .await?; + // RFC-013 PR2 #1b: the publisher folded the new visible state in-memory + // (byte-identical to a re-scan via the shared `assemble_manifest_state`), + // so adopt it directly instead of an O(fragments) `read_manifest_state`. self.dataset = dataset; - - self.known_state = read_manifest_state(&self.dataset).await?; + self.known_state = known_state; Ok(CommitOutcome { version: self.version(), parent_commit_id, diff --git a/crates/omnigraph/src/db/manifest/publisher.rs b/crates/omnigraph/src/db/manifest/publisher.rs index 382b51a..ccd3c5f 100644 --- a/crates/omnigraph/src/db/manifest/publisher.rs +++ b/crates/omnigraph/src/db/manifest/publisher.rs @@ -32,11 +32,12 @@ use crate::error::{OmniError, Result}; #[cfg(test)] use super::SubTableUpdate; use super::layout::{open_manifest_dataset, tombstone_object_id, version_object_id}; -use super::metadata::parse_namespace_version_request; +use super::metadata::{TableVersionMetadata, parse_namespace_version_request}; use super::migrations::migrate_internal_schema; use super::state::{ - GraphLineageRow, GraphLineageRowPart, graph_lineage_row_parts, head_lineage_row, - manifest_rows_batch, manifest_schema, read_publish_scan, + GraphLineageRow, GraphLineageRowPart, ManifestState, assemble_manifest_state, + graph_lineage_row_parts, head_lineage_row, manifest_rows_batch, manifest_schema, + read_manifest_state, read_publish_scan, }; use super::{ ManifestChange, OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION, @@ -82,6 +83,11 @@ pub(super) struct PublishOutcome { /// in-memory commit cache without a re-read. `None` when no lineage was /// recorded, or when the commit is the genesis (no parent). pub parent_commit_id: Option, + /// The new visible per-table state, folded in-memory from the pre-publish + /// state ∪ the committed rows (RFC-013 PR2 #1b). Returned so the caller skips + /// the O(fragments) post-publish `read_manifest_state` re-scan. Byte-identical + /// to that re-scan: built through the same `assemble_manifest_state` reduction. + pub known_state: ManifestState, } #[async_trait] @@ -451,6 +457,101 @@ impl GraphNamespacePublisher { latest } + /// Build the inputs for [`assemble_manifest_state`] from the pre-publish state + /// unioned with the pending rows about to be committed — the in-memory basis + /// for the post-publish `known_state` fold (RFC-013 PR2 #1b), so the caller + /// skips the O(fragments) re-scan. Mirrors `read_manifest_scan`'s row handling + /// exactly so the result is byte-identical: `table_path` resolves through + /// `table_locations` = `registered_tables` UNION the pending `OBJECT_TYPE_TABLE` + /// rows (a freshly-registered table is not yet in `registered_tables`); + /// `version_metadata` parses the SAME JSON string a re-scan would read. Pending + /// `OBJECT_TYPE_TABLE` rows feed only `table_locations`; lineage rows + /// (`graph_commit`/`graph_head`) are not manifest-state entries. + fn fold_inputs( + existing_versions: &HashMap<(String, u64), SubTableEntry>, + existing_tombstones: &HashMap<(String, u64), ()>, + rows: &[PendingVersionRow], + registered_tables: &HashMap, + ) -> Result<(Vec, Vec<(String, u64)>)> { + let mut table_locations: HashMap = registered_tables.clone(); + for row in rows { + if row.object_type == OBJECT_TYPE_TABLE { + if let Some(location) = &row.location { + table_locations.insert(row.table_key.clone(), location.clone()); + } + } + } + + // Key version entries by `(table_key, table_version)` so a pending row at + // the SAME version REPLACES the pre-publish entry — modelling merge-insert + // `UpdateAll` on the shared, deterministic `version_object_id(table_key, + // version)`. Load-bearing for the owner-branch handoff + // (`is_owner_branch_handoff`): a handoff updates a `table_version` row in + // place at the same version with a new `table_branch`, so `__manifest` ends + // with ONE row carrying the new branch and a re-scan reflects it; appending + // the pending row instead (and letting `assemble_manifest_state` keep the + // first equal-version entry) would leave `known_state` on the stale fork. + let mut version_map: HashMap<(String, u64), SubTableEntry> = existing_versions.clone(); + let mut tombstones: Vec<(String, u64)> = existing_tombstones + .keys() + .map(|(key, version)| (key.clone(), *version)) + .collect(); + + for row in rows { + match row.object_type.as_str() { + OBJECT_TYPE_TABLE_VERSION => { + let table_version = row.table_version.ok_or_else(|| { + OmniError::manifest_internal(format!( + "post-publish fold: table_version row missing version for {}", + row.table_key + )) + })?; + let table_path = + table_locations.get(&row.table_key).cloned().ok_or_else(|| { + OmniError::manifest_internal(format!( + "post-publish fold: missing table row for {}", + row.table_key + )) + })?; + let metadata_json = row.metadata.as_deref().ok_or_else(|| { + OmniError::manifest_internal(format!( + "post-publish fold: table_version row missing metadata for {}", + row.table_key + )) + })?; + version_map.insert( + (row.table_key.clone(), table_version), + SubTableEntry { + table_key: row.table_key.clone(), + table_path, + table_version, + table_branch: row.table_branch.clone(), + row_count: row.row_count.ok_or_else(|| { + OmniError::manifest_internal(format!( + "post-publish fold: table_version row missing row_count for {}", + row.table_key + )) + })?, + version_metadata: TableVersionMetadata::from_json_str(metadata_json)?, + }, + ); + } + OBJECT_TYPE_TABLE_TOMBSTONE => { + let tombstone_version = row.table_version.ok_or_else(|| { + OmniError::manifest_internal(format!( + "post-publish fold: tombstone row missing version for {}", + row.table_key + )) + })?; + tombstones.push((row.table_key.clone(), tombstone_version)); + } + _ => {} + } + } + + Ok((version_map.into_values().collect(), tombstones)) + } + /// Compare each caller-supplied expectation against the manifest's current /// latest visible version per table. The first mismatch is returned as a /// typed `ExpectedVersionMismatch` (`actual = 0` if the table isn't in the @@ -578,9 +679,15 @@ impl ManifestBatchPublisher for GraphNamespacePublisher { lineage: Option<&LineageIntent>, ) -> Result { if changes.is_empty() && expected_table_versions.is_empty() && lineage.is_none() { + // Defensive no-op (never reached from `commit_changes_with_lineage`, + // which short-circuits the all-empty case): state is unchanged, so a + // re-scan here is acceptable. + let dataset = self.dataset().await?; + let known_state = read_manifest_state(&dataset).await?; return Ok(PublishOutcome { - dataset: self.dataset().await?, + dataset, parent_commit_id: None, + known_state, }); } @@ -645,18 +752,39 @@ impl ManifestBatchPublisher for GraphNamespacePublisher { if rows.is_empty() { // Expected-version-only publish with no changes and no lineage: - // the precondition held, nothing to write. + // the precondition held, nothing to write. Fold the unchanged state + // from the loaded maps — no re-scan (RFC-013 PR2 #1b). + let known_state = assemble_manifest_state( + dataset.version().version, + existing_versions.values().cloned().collect(), + existing_tombstones + .keys() + .map(|(key, version)| (key.clone(), *version)), + ); return Ok(PublishOutcome { dataset, parent_commit_id, + known_state, }); } + // Build the post-publish fold inputs from the pre-publish state ∪ the + // rows we are about to commit, BEFORE `rows` is moved into merge_rows + // (RFC-013 PR2 #1b). Recomputed per attempt from freshly-loaded state. + let (fold_entries, fold_tombstones) = + Self::fold_inputs(&existing_versions, &existing_tombstones, &rows, &known_tables)?; + match self.merge_rows(dataset, rows).await { Ok(new_dataset) => { + let known_state = assemble_manifest_state( + new_dataset.version().version, + fold_entries, + fold_tombstones, + ); return Ok(PublishOutcome { dataset: new_dataset, parent_commit_id, + known_state, }); } Err(err) => { diff --git a/crates/omnigraph/src/db/manifest/state.rs b/crates/omnigraph/src/db/manifest/state.rs index 4fbbde3..13d80ba 100644 --- a/crates/omnigraph/src/db/manifest/state.rs +++ b/crates/omnigraph/src/db/manifest/state.rs @@ -131,9 +131,30 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result::new(); + Ok(assemble_manifest_state( + version, + scan.version_entries, + scan.tombstones + .into_iter() + .map(|t| (t.table_key, t.tombstone_version)), + )) +} - for entry in scan.version_entries { +/// Reduce raw manifest rows to the visible per-table state: keep the latest +/// `table_version` per `table_key`, drop any whose latest version is sealed by a +/// tombstone (`tombstone_version >= table_version`), then sort by `table_key` for +/// deterministic output. Shared by the scan path (`read_manifest_state`) and the +/// in-memory post-publish fold in the publisher (RFC-013 PR2 #1b), so the two +/// CANNOT diverge in the dedup/filter/sort — the byte-identity the fold relies on. +/// Tombstones are passed as `(table_key, tombstone_version)` tuples so callers +/// outside this module need not name the private `TableTombstoneEntry`. +pub(super) fn assemble_manifest_state( + version: u64, + version_entries: Vec, + tombstones: impl IntoIterator, +) -> ManifestState { + let mut latest_versions = HashMap::::new(); + for entry in version_entries { match latest_versions.get(&entry.table_key) { Some(existing) if existing.table_version >= entry.table_version => {} _ => { @@ -142,12 +163,12 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result::new(); - for tombstone in scan.tombstones { - match tombstones.get(&tombstone.table_key) { - Some(existing) if *existing >= tombstone.tombstone_version => {} + let mut tombstone_map = HashMap::::new(); + for (table_key, tombstone_version) in tombstones { + match tombstone_map.get(&table_key) { + Some(existing) if *existing >= tombstone_version => {} _ => { - tombstones.insert(tombstone.table_key, tombstone.tombstone_version); + tombstone_map.insert(table_key, tombstone_version); } } } @@ -155,15 +176,14 @@ pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result = latest_versions .into_values() .filter(|entry| { - tombstones + tombstone_map .get(&entry.table_key) .map(|tombstone_version| *tombstone_version < entry.table_version) .unwrap_or(true) }) .collect(); entries.sort_by(|a, b| a.table_key.cmp(&b.table_key)); - - Ok(ManifestState { version, entries }) + ManifestState { version, entries } } // After RFC-013 P2 folded the publish path off this accessor (it now projects @@ -245,8 +265,29 @@ fn decode_graph_commit_row( } async fn read_manifest_scan(dataset: &Dataset, collect_lineage: bool) -> Result { - let batches: Vec = dataset - .scan() + // Project only the columns the assembly below reads (RFC-013 PR2 #1c). The + // table-state hot path never touches `object_id` (lineage decode only) or + // `base_objects` (reserved/unused — never read on any path), so reading them + // is wasted bytes on every `__manifest` scan — write publish AND every + // branch-op open. Mirrors Lance's own directory-catalog `__manifest` reads, + // which project to the needed columns rather than scanning all of them. + let mut projection: Vec<&str> = vec![ + "object_type", + "location", + "metadata", + "table_key", + "table_version", + "table_branch", + "row_count", + ]; + if collect_lineage { + projection.push("object_id"); + } + let mut scanner = dataset.scan(); + scanner + .project(&projection) + .map_err(|e| OmniError::Lance(e.to_string()))?; + let batches: Vec = scanner .try_into_stream() .await .map_err(|e| OmniError::Lance(e.to_string()))? diff --git a/crates/omnigraph/src/db/manifest/tests.rs b/crates/omnigraph/src/db/manifest/tests.rs index 31a77fe..6584705 100644 --- a/crates/omnigraph/src/db/manifest/tests.rs +++ b/crates/omnigraph/src/db/manifest/tests.rs @@ -909,6 +909,143 @@ async fn test_batch_create_table_versions_allows_owner_branch_handoff_at_same_ve assert_eq!(experiment_entry.table_branch.as_deref(), Some("experiment")); } +/// Regression (PR #307 review — Cursor Bugbot High + Codex P2): the post-publish +/// fold (`#1b`) must reflect an owner-branch handoff. A handoff UPDATEs a +/// `table_version` row IN PLACE at the SAME Lance version with a new +/// `table_branch` — merge-insert `UpdateAll` on the deterministic +/// `version_object_id(table_key, version)`, so `__manifest` ends with one row +/// carrying the new branch. The buggy fold appended the pending row after +/// `existing_versions`, and `assemble_manifest_state` keeps the FIRST entry at +/// equal `table_version`, so the WARM coordinator retained the stale +/// `table_branch` ("feature") while a fresh `read_manifest_state` reopen reflected +/// the handoff ("experiment"). Unlike the namespace-publisher handoff test above, +/// this commits through the coordinator's `commit` path to exercise the fold, then +/// reads the warm `snapshot()` WITHOUT reopening. +#[tokio::test] +async fn test_post_publish_fold_reflects_owner_branch_handoff() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let catalog = build_test_catalog(); + + let mut main_mc = ManifestCoordinator::init(uri, &catalog).await.unwrap(); + main_mc.create_branch("feature").await.unwrap(); + + // Fork Person onto `feature` at version Vf (owner = feature). + let snap = main_mc.snapshot(); + let person_entry = snap.entry("node:Person").unwrap().clone(); + let mut person_ds = Dataset::open(&format!("{}/{}", uri, person_entry.table_path)) + .await + .unwrap(); + person_ds + .create_branch("feature", person_entry.table_version, None) + .await + .unwrap(); + let mut feature_ds = person_ds.checkout_branch("feature").await.unwrap(); + let person_schema = Arc::new(feature_ds.schema().into()); + let person_batch = RecordBatch::try_new( + Arc::clone(&person_schema), + vec![ + Arc::new(StringArray::from(vec!["person-1"])), + Arc::new(StringArray::from(vec!["Alice"])), + Arc::new(Int32Array::from(vec![Some(30)])), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(person_batch)], person_schema); + feature_ds.append(reader, None).await.unwrap(); + let feature_version = feature_ds.version().version; + let feature_metadata = table_version_metadata_for_state( + uri, + &person_entry.table_path, + Some("feature"), + feature_version, + ) + .await + .unwrap(); + branch_manifest_namespace(uri, Some("feature")) + .create_table_version(feature_metadata.to_create_table_version_request( + "node:Person", + feature_version, + 1, + Some("feature"), + )) + .await + .unwrap(); + + // Create `experiment` from feature and fork Person at the SAME version Vf. + let mut feature_mc = ManifestCoordinator::open_at_branch(uri, "feature") + .await + .unwrap(); + feature_mc.create_branch("experiment").await.unwrap(); + feature_ds + .create_branch("experiment", feature_version, None) + .await + .unwrap(); + let experiment_metadata = table_version_metadata_for_state( + uri, + &person_entry.table_path, + Some("experiment"), + feature_version, + ) + .await + .unwrap(); + + // Publish the handoff through a WARM coordinator's `commit` (exercises the + // post-publish fold), NOT GraphNamespacePublisher (which reopens fresh). + let mut experiment_mc = ManifestCoordinator::open_at_branch(uri, "experiment") + .await + .unwrap(); + // Pre-publish: experiment inherits feature's ownership of Person@Vf. + assert_eq!( + experiment_mc + .snapshot() + .entry("node:Person") + .unwrap() + .table_branch + .as_deref(), + Some("feature"), + ); + experiment_mc + .commit(&[SubTableUpdate { + table_key: "node:Person".to_string(), + table_version: feature_version, + table_branch: Some("experiment".to_string()), + row_count: 1, + version_metadata: experiment_metadata, + }]) + .await + .unwrap(); + + // Warm side: the folded known_state the commit adopted. + let folded_branch = experiment_mc + .snapshot() + .entry("node:Person") + .unwrap() + .table_branch + .clone(); + // Oracle: a fresh reopen rebuilds known_state via `read_manifest_state`. + let reopened = ManifestCoordinator::open_at_branch(uri, "experiment") + .await + .unwrap(); + let scanned_branch = reopened + .snapshot() + .entry("node:Person") + .unwrap() + .table_branch + .clone(); + + assert_eq!( + scanned_branch.as_deref(), + Some("experiment"), + "fresh reopen should reflect the owner-branch handoff", + ); + assert_eq!( + folded_branch, scanned_branch, + "warm coordinator's folded known_state diverged from a fresh re-scan after an \ + owner-branch handoff (folded {folded_branch:?} vs scanned {scanned_branch:?})", + ); +} + #[tokio::test] async fn test_staged_namespace_lists_native_table_versions_before_publish() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 4a770f0..823b157 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -866,6 +866,33 @@ impl Omnigraph { Ok(manifest.snapshot()) } + /// Probe-gated OCC re-capture snapshot (RFC-013 PR2 #1a). The commit-time OCC + /// re-read MUST be as fresh as a cold re-scan (see `commit_all`), but a cold + /// `__manifest` full scan per write is O(fragments) and a dominant write-path + /// cost. When the warm coordinator is already bound to `branch` AND a cheap + /// incarnation probe (one object-store op, no row scan) proves it is current, + /// the warm snapshot IS byte-identical to a fresh re-read, so reuse it with + /// zero scans. On ANY mismatch — a concurrent in- or cross-process advance, or + /// a different bound branch — fall through to the cold + /// `fresh_snapshot_for_branch_unchecked` read, preserving the "must be fresh" + /// contract and cross-process drift detection. Mirrors the read path's + /// `resolve_target_inner` probe-and-reuse idiom; same-branch sequential writes + /// keep `coordinator` current (each commit refreshes its `known_state`), so the + /// common case reuses and skips the scan. The publish CAS + /// (`expected_table_versions`) remains the final arbiter. + pub(crate) async fn occ_snapshot_for_branch(&self, branch: Option<&str>) -> Result { + { + let coord = self.coordinator.read().await; + if branch == coord.current_branch() { + let held = coord.manifest_incarnation(); + if coord.probe_latest_incarnation().await?.matches(&held) { + return Ok(coord.snapshot()); + } + } + } + self.fresh_snapshot_for_branch_unchecked(branch).await + } + pub(crate) async fn version(&self) -> u64 { self.coordinator.read().await.version() } diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 7760c95..ddfaa06 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -602,15 +602,17 @@ impl StagedMutation { // genuine cross-process drift detection from this read for // free. // - // This MUST be a FRESH per-branch manifest read (never the warm - // cache) for the OCC re-capture below — but with a `WriteTxn` the - // schema contract was already validated at capture, so use the - // `_unchecked` variant, which drops the redundant - // `ensure_schema_state_valid` AND the commit-graph load the OCC read - // never consults (a fresh manifest read yields the same `Snapshot`). - // Without a txn this is byte-identical to the prior checked call. + // This MUST be as fresh as a FRESH per-branch manifest read for the OCC + // re-capture below. With a `WriteTxn` the schema contract was already + // validated at capture, so the txn arm uses `occ_snapshot_for_branch` + // (RFC-013 PR2 #1a): a cheap incarnation probe reuses the warm + // coordinator when it is already current (probe-match ⟺ warm == fresh) + // and falls through to the cold `_unchecked` read on any mismatch — so + // freshness AND cross-process drift detection are preserved while the + // common same-branch sequential write skips the O(fragments) scan. + // Without a txn, keep the checked cold read (legacy path). let snapshot = match txn { - Some(_) => db.fresh_snapshot_for_branch_unchecked(branch).await?, + Some(_) => db.occ_snapshot_for_branch(branch).await?, None => db.fresh_snapshot_for_branch(branch).await?, }; for entry in staged.iter_mut() { diff --git a/crates/omnigraph/tests/helpers/cost.rs b/crates/omnigraph/tests/helpers/cost.rs index 9c82229..331bfb3 100644 --- a/crates/omnigraph/tests/helpers/cost.rs +++ b/crates/omnigraph/tests/helpers/cost.rs @@ -227,8 +227,59 @@ impl ObjectStore for PrefixCountingStore { } } -/// The tracker handles backing one measurement; read once into [`IoCounts`]. -struct ProbeHandles { +// ── Ground-truth `__manifest` meter (lance's per-request tracking idiom) ── +// +// Lance counts IO on a warm/cached dataset by attaching one `IOTracker` to the open +// handle (`Dataset::with_object_store_wrappers`, shared session) and reading +// `incremental_stats()` per request (`rust/lance/src/dataset/tests/dataset_io.rs`). +// We do the same for `__manifest`: `cost_harness` installs ONE persistent tracker for +// a whole test body, so the graph opens UNDER it and every coordinator handle — the +// init handle and each post-publish/refresh reassignment (`db/manifest.rs` keeps +// `self.dataset = …`) — carries the same tracker. `manifest_reads` is then ground +// truth (warm probe + cold scans), handle-age-irrelevant, instead of only the reads +// on handles a single measured op happened to open. Data/commit-graph/probe/open +// counters stay fresh per op (their warm-handle exposure is out of scope here). + +/// Persistent per-test meter: owns the ground-truth `__manifest` tracker reused +/// across every `measure` in a `cost_harness` body. +#[derive(Clone, Default)] +pub struct GraphIoMeter { + manifest: IOTracker, + /// The most recent measured op's `__manifest` request log (method + path), + /// stashed for `assert_io_eq!`-style failure diagnostics. Populated in + /// ground-truth mode only (the standalone fallback has no ambient meter). + last_manifest_log: Arc>>, +} + +tokio::task_local! { + static COST_METER: GraphIoMeter; +} + +/// Run `body` with a persistent ground-truth `__manifest` tracker installed for its +/// whole lifetime. The graph MUST be opened inside `body` (e.g. via `local_graph`) +/// so its coordinator's `__manifest` handle is wrapped from birth. `measure` calls +/// inside reuse that tracker, so `manifest_reads` counts every `__manifest` read +/// regardless of which handle performed it (the warm probe included). Outside +/// `cost_harness`, `measure` falls back to a fresh per-op tracker — today's +/// fresh-open-only behavior, used by `write_cost_s3.rs`. +pub async fn cost_harness(body: F) -> F::Output { + let meter = GraphIoMeter::default(); + let probes = QueryIoProbes { + manifest_wrapper: Some(Arc::new(meter.manifest.clone()) as Arc), + ..Default::default() + }; + // Box the body so the (large) per-test future lives on the heap. Wrapping a whole + // test body in another async layer otherwise overflows the test thread's stack — + // these cost tests already raise `recursion_limit` for the same reason. + COST_METER + .scope(meter, with_query_io_probes(probes, Box::pin(body))) + .await +} + +/// The tracker handles backing one measurement; read once into [`IoCounts`]. Data, +/// commit-graph, probe, and open counters are fresh per op; the `__manifest` tracker +/// is the ambient ground-truth one when inside `cost_harness`, else fresh. +struct OpProbes { manifest: IOTracker, commit_graph: IOTracker, table: PrefixCounter, @@ -237,10 +288,18 @@ struct ProbeHandles { internal_open_count: Arc, } -impl ProbeHandles { +impl OpProbes { fn install() -> (QueryIoProbes, Self) { - let h = ProbeHandles { - manifest: IOTracker::default(), + // Reuse the ambient ground-truth `__manifest` tracker so reads on the warm + // coordinator handle (the freshness probe) land in it; fall back to a fresh + // tracker when standalone. Reset it (get-and-reset) so this op's delta + // excludes reads from init / `commit_many` between measures. + let manifest = COST_METER + .try_with(|m| m.manifest.clone()) + .unwrap_or_default(); + let _ = manifest.incremental_stats(); + let h = OpProbes { + manifest, commit_graph: IOTracker::default(), table: PrefixCounter::default(), probe_count: Arc::new(AtomicU64::new(0)), @@ -262,13 +321,26 @@ impl ProbeHandles { fn counts(&self) -> IoCounts { let t = self.table.snapshot(); + // `incremental_stats()` (get-and-reset) yields this op's reads: in + // ground-truth mode the tracker spans the whole test and was reset in + // `install`; standalone it is fresh so the delta is the whole count. + let manifest = self.manifest.incremental_stats(); + // Stash the manifest read log (method + path) on the ambient meter for + // `assert_io_eq!`-style failure diagnostics; no-op when standalone. + let _ = COST_METER.try_with(|meter| { + *meter.last_manifest_log.lock().unwrap() = manifest + .requests + .iter() + .map(|r| format!("{} {}", r.method, r.path)) + .collect(); + }); IoCounts { data_reads: t.reads, data_writes: t.writes, data_opener_reads: t.opener_reads, data_scan_reads: t.scan_reads, - manifest_reads: self.manifest.stats().read_iops, - commit_graph_reads: self.commit_graph.stats().read_iops, + manifest_reads: manifest.read_iops, + commit_graph_reads: self.commit_graph.incremental_stats().read_iops, version_probes: self.probe_count.load(Ordering::Relaxed), data_open_count: self.data_open_count.load(Ordering::Relaxed), internal_open_count: self.internal_open_count.load(Ordering::Relaxed), @@ -276,10 +348,19 @@ impl ProbeHandles { } } +/// The most recent measured op's `__manifest` reads (`method path`) for failure +/// diagnostics — the `assert_io_eq!` read-log, scoped to `__manifest`. Empty +/// outside `cost_harness` (the standalone fallback records no ambient log). +pub fn last_manifest_reads() -> Vec { + COST_METER + .try_with(|m| m.last_manifest_log.lock().unwrap().clone()) + .unwrap_or_default() +} + /// Run `op` under object-store IO counting; return its output + the counts. /// The only place the `QueryIoProbes` task-local + tracker wiring lives. pub async fn measure(op: F) -> (F::Output, IoCounts) { - let (probes, handles) = ProbeHandles::install(); + let (probes, handles) = OpProbes::install(); let out = with_query_io_probes(probes, op).await; (out, handles.counts()) } @@ -287,7 +368,7 @@ pub async fn measure(op: F) -> (F::Output, IoCounts) { /// Like [`measure`], but also capture which staged-write primitives ran /// (composes the two task-locals cleanly). pub async fn measure_with_staged(op: F) -> (F::Output, IoCounts, StagedCounts) { - let (probes, handles) = ProbeHandles::install(); + let (probes, handles) = OpProbes::install(); let merge = MergeWriteProbes::default(); let out = with_merge_write_probes(merge.clone(), with_query_io_probes(probes, op)).await; let staged = StagedCounts { diff --git a/crates/omnigraph/tests/warm_read_cost.rs b/crates/omnigraph/tests/warm_read_cost.rs index b3f5446..8f6e74e 100644 --- a/crates/omnigraph/tests/warm_read_cost.rs +++ b/crates/omnigraph/tests/warm_read_cost.rs @@ -11,7 +11,7 @@ use arrow_array::{Array, StringArray}; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph_compiler::result::QueryResult; -use helpers::cost::measure; +use helpers::cost::{cost_harness, measure}; use helpers::{ MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params, mutate_branch, mutate_main, params, @@ -35,12 +35,17 @@ fn first_column_strings(result: &QueryResult) -> Vec { out } -/// A warm same-branch read must not re-open or scan `__manifest`, and must not -/// open the commit graph, even at commit-history depth. The only manifest IO is -/// the version probe (counted by invocation). Fails before Fix 1, where the read -/// path re-opens a fresh coordinator and scans both internal tables. +/// A warm same-branch read must do ZERO `__manifest` object-store reads and must +/// not open the commit graph, even at commit-history depth. Wrapped in +/// `cost_harness`, so `manifest_reads` is ground truth: the warm-coordinator +/// freshness probe rides the long-lived handle (which now carries the tracker) and +/// is served from Lance's cached manifest at 0 store reads, so this `== 0` also +/// catches any future warm-handle scan a per-op tracker would miss. Fails before +/// Fix 1, where the read path re-opens a fresh coordinator and scans both internal +/// tables. #[tokio::test] async fn warm_same_branch_read_does_no_resolution_opens() { + cost_harness(async { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; // Deep history: warm-read resolution cost must be flat in commit count. @@ -73,6 +78,8 @@ async fn warm_same_branch_read_does_no_resolution_opens() { io.version_probes, 1, "warm same-branch read performs exactly one version probe" ); + }) + .await; } /// A multi-table query (a traversal touching Person, WorksAt, and Company) scans @@ -82,6 +89,7 @@ async fn warm_same_branch_read_does_no_resolution_opens() { /// `describe_table_version`), which is the "2 tables = 2×" multi-table tax. #[tokio::test] async fn multi_table_query_does_no_manifest_scans() { + cost_harness(async { let dir = tempfile::tempdir().unwrap(); let db = init_and_load(&dir).await; @@ -98,6 +106,8 @@ async fn multi_table_query_does_no_manifest_scans() { io.manifest_reads, 0, "a multi-table read must not scan __manifest once per touched table" ); + }) + .await; } /// A warm reader must observe a commit made through another handle (invariant 6, @@ -222,6 +232,7 @@ async fn schema_source_drift_is_caught_on_read() { /// that regressed when the open used `with_branch` against the base. #[tokio::test] async fn warm_branch_read_does_no_manifest_scans() { + cost_harness(async { let dir = tempfile::tempdir().unwrap(); let db = init_and_load(&dir).await; db.branch_create("feature").await.unwrap(); @@ -258,6 +269,8 @@ async fn warm_branch_read_does_no_manifest_scans() { io.version_probes, 1, "warm branch read performs exactly one version probe" ); + }) + .await; } /// A non-main branch can be deleted and recreated at the same Lance version @@ -643,6 +656,7 @@ async fn stale_read_refreshes_manifest_only() { /// cache). Fails before Fix 3, where every read re-opens the table. #[tokio::test] async fn repeat_warm_read_reuses_table_handles() { + cost_harness(async { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; // Deep history: the win must hold regardless of commit count. @@ -685,6 +699,8 @@ async fn repeat_warm_read_reuses_table_handles() { warm.version_probes, 1, "warm repeat read: exactly one version probe" ); + }) + .await; } /// A write advances the table's version, so the next read misses the diff --git a/crates/omnigraph/tests/write_cost.rs b/crates/omnigraph/tests/write_cost.rs index 6cbf763..422be41 100644 --- a/crates/omnigraph/tests/write_cost.rs +++ b/crates/omnigraph/tests/write_cost.rs @@ -24,8 +24,8 @@ mod helpers; use helpers::cost::{ - IoCounts, assert_flat, assert_grows, local_graph, measure, measure_insert, measure_insert_as, - measure_with_staged, + IoCounts, assert_flat, assert_grows, cost_harness, last_manifest_reads, local_graph, measure, + measure_insert, measure_insert_as, measure_with_staged, }; use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixed_params}; @@ -43,6 +43,10 @@ use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixe // compacted graph's write cost does not grow with version history." #[tokio::test] async fn internal_table_scans_are_flat_in_history() { + // `cost_harness` installs the ground-truth __manifest tracker for the whole body, + // so `manifest_reads` includes the warm-coordinator probe (a constant per write + // that cancels in this depth-difference assertion). + cost_harness(async { const ACTOR: &str = "act-cost-gate"; let dir = tempfile::tempdir().unwrap(); let mut db = local_graph(&dir).await; @@ -70,6 +74,66 @@ async fn internal_table_scans_are_flat_in_history() { // commit_graph_reads covers BOTH _graph_commits and _graph_commit_actors (shared // wrapper), so this also gates the actor table on the authenticated path. assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits + _graph_commit_actors scan"); + }) + .await; +} + +/// **Served-regime twin of `internal_table_scans_are_flat_in_history` — the gate +/// that was missing.** The flat gate above calls `db.optimize()` before EVERY +/// measured write, so it only ever proves the *compacted* invariant and stays green +/// even if a served graph's per-write `__manifest` scan amplifies without bound. A +/// real served graph does NOT optimize between writes: every publish appends a +/// fragment to `__manifest`, and the publish-path scan (`read_manifest_scan`, a bare +/// `dataset.scan()` with no filter/projection) reads ALL of them, so the per-write +/// `__manifest` read count is O(fragments-since-compaction) and climbs with history. +/// That is the live amplification behind the reported single-row write latency +/// (~16s on 0.7.2; still growing post-#299) — physical fragment read cost, not +/// logical row count (output rows stay ~flat while requests grow). +/// +/// **This is a TRIPWIRE, not the final gate.** It asserts the scan *grows*, i.e. it +/// pins the CURRENT served-regime cost (green today) — exactly the `assert_grows` +/// idiom its sibling `data_table_reads_split_into_flat_opener_and_growing_scan` uses, +/// and the "turns red when the fix lands" shape of the Lance surface guards. It flips +/// RED the moment the amplification is fixed (write-path probe-gated warm reuse, and +/// bringing `__manifest` into `cleanup` version-GC so F stays bounded in history). +/// **When it goes red, that is the signal to invert it to** +/// `assert_flat(&curve, |c| c.manifest_reads, , "__manifest scan (served)")` — +/// promoting it to the permanent served-regime gate. Only `manifest_reads` is +/// asserted: #299 moved lineage into `__manifest` and made the per-write commit-graph +/// update in-memory, so `commit_graph_reads` no longer grows per write on this branch. +#[tokio::test] +async fn internal_table_scans_grow_without_compaction() { + cost_harness(async { + const ACTOR: &str = "act-cost-gate-served"; + let dir = tempfile::tempdir().unwrap(); + let mut db = local_graph(&dir).await; + + let mut curve: Vec<(u64, IoCounts)> = Vec::new(); + let mut current = 0u64; + for d in [10u64, 100] { + if d > current { + commit_many_as(&mut db, (d - current) as usize, ACTOR).await; + current = d; + } + // NO `db.optimize()` here — that omission is the whole point. The flat gate + // above compacts before measuring and so never exercises this served regime. + let io = measure_insert_as(&mut db, &format!("served_{d}"), ACTOR).await; + current += 1; // the measured write advanced depth by one + eprintln!( + "depth~{d} (uncompacted): data={} __manifest={} _graph_commits+actors={}", + io.data_reads, io.manifest_reads, io.commit_graph_reads + ); + curve.push((d, io)); + } + + // Green TODAY (the bug): the per-write `__manifest` scan is O(fragments) and grows + // by far more than the flat gate's slack of 4 across a 10→100 depth sweep. The `20` + // floor mirrors the proven-safe `assert_grows` sibling (data-table scan) and sits + // comfortably below the real growth (~+3 `__manifest` reads/depth × ~90 depth × the + // 3–4 publish-path scans) while unambiguously distinguishing "grows" from "flat". + assert_grows(&curve, |c| c.manifest_reads, 20, "__manifest scan (uncompacted/served)"); + }) + .await; } // The data-table OPENER history-gate (opener flat across depth) lives in @@ -142,6 +206,7 @@ async fn single_insert_data_write_is_bounded() { /// P2 fold (was ~44 / ~54 with the four separate scans). #[tokio::test] async fn write_op_count_ceiling_at_shallow_depth() { + cost_harness(async { let dir = tempfile::tempdir().unwrap(); let mut db = local_graph(&dir).await; commit_many(&mut db, 5).await; @@ -150,15 +215,19 @@ async fn write_op_count_ceiling_at_shallow_depth() { "depth~5: data={} __manifest={} _graph_commits={} total_reads={}", io.data_reads, io.manifest_reads, io.commit_graph_reads, io.total_reads() ); - // Sub-ceiling on `__manifest` reads specifically: the publish path does one - // scan, not four. ~26 measured at this depth; a re-added scan would push it - // well past this. (Deterministic on local FS.) - const MANIFEST_CEILING: u64 = 34; + // Sub-ceiling on ground-truth `__manifest` reads. ~18 measured at this depth = + // ~15 publish-path scans (one fold, not four — RFC-013 P2) + ~3 from the + // warm-coordinator freshness probe, which ground truth now counts (the + // `version_probes=1` call is 3 object-store RPCs). A re-added publish scan trips + // this; `last_manifest_reads()` dumps the read log (method + path) so a breach + // names the offending objects. (Deterministic on local FS.) + const MANIFEST_CEILING: u64 = 24; assert!( io.manifest_reads <= MANIFEST_CEILING, "per-write __manifest reads {} exceeded ceiling {MANIFEST_CEILING} — a publish-path \ - scan was re-added (RFC-013 P2 folds them into one)", + scan was re-added (RFC-013 P2 folds them into one). Reads: {:#?}", io.manifest_reads, + last_manifest_reads(), ); const CEILING: u64 = 80; assert!( @@ -166,6 +235,8 @@ async fn write_op_count_ceiling_at_shallow_depth() { "per-write read ops {} exceeded ceiling {CEILING} — a new round-trip was added", io.total_reads() ); + }) + .await; } // ── (C) Fitness assert via the staged-write probes ── @@ -271,3 +342,44 @@ async fn keyed_insert_opens_table_at_most_once() { io.data_open_count, ); } + +// ── (E) Ground-truth __manifest counting (PR2.1) — the blind-spot guard ── + +/// The warm-coordinator freshness probe rides a long-lived handle, so a per-op +/// (fresh) tracker installed at measure time CANNOT see its reads — that was the +/// blind spot. `cost_harness` attaches the tracker BEFORE the coordinator opens, so +/// the probe's reads ARE counted (`manifest_reads` is ground truth, not just fresh +/// opens). Proven by measuring the same warm write both ways: ground truth strictly +/// exceeds fresh-only, by the probe's object-store RPCs. Reverting the ground-truth +/// wiring (so `manifest_reads` reverts to fresh-per-op) makes the two equal → RED. +#[tokio::test] +async fn manifest_reads_capture_warm_probe() { + // Fresh-only (no `cost_harness`): the warm coordinator handle was opened outside + // any meter, so the freshness probe's reads escape `manifest_reads`. + let fresh = { + let dir = tempfile::tempdir().unwrap(); + let mut db = local_graph(&dir).await; + commit_many(&mut db, 3).await; // warm the coordinator + let io = measure_insert(&mut db, "fresh").await; + eprintln!("fresh-only warm write: __manifest={}", io.manifest_reads); + io.manifest_reads + }; + + // Ground truth (`cost_harness`): the same warm probe is now counted. + cost_harness(async move { + let dir = tempfile::tempdir().unwrap(); + let mut db = local_graph(&dir).await; + commit_many(&mut db, 3).await; + let io = measure_insert(&mut db, "ground_truth").await; + eprintln!("ground-truth warm write: __manifest={}", io.manifest_reads); + assert!( + io.manifest_reads > fresh, + "ground-truth __manifest reads {} must exceed fresh-only {fresh} by the \ + warm-coordinator probe's RPCs — else the warm-handle probe is escaping the \ + tracker (the blind spot this guards). Reads: {:#?}", + io.manifest_reads, + last_manifest_reads(), + ); + }) + .await; +} diff --git a/crates/omnigraph/tests/writes.rs b/crates/omnigraph/tests/writes.rs index b57d8fd..8c7b161 100644 --- a/crates/omnigraph/tests/writes.rs +++ b/crates/omnigraph/tests/writes.rs @@ -1725,3 +1725,57 @@ query chain($repo: String) { .expect("chained camelCase mutation must read the pending row, not fail at the MemTable SELECT"); assert_eq!(r.affected_nodes, 2, "both ops should touch the acme Doc (read-your-writes)"); } + +/// RFC-013 PR2 #1b: the publisher folds the new `known_state` in-memory after a +/// publish instead of re-scanning `__manifest`. That fold MUST be byte-identical +/// to a fresh re-scan, or the warm coordinator silently desyncs. After a sequence +/// of writes (insert, a second insert to the same table, then a delete that +/// advances the table version), the in-memory coordinator holds the folded state; +/// a freshly reopened graph rebuilds it via a real `read_manifest_state` scan. +/// Counting `node:Person` off each resolves the table at the version each side +/// recorded — a fold that set the wrong version (or path → open failure) makes the +/// in-memory count diverge from the reopened one. Reopen is the scan side; the live +/// `db` is the fold side. +#[tokio::test] +async fn post_publish_fold_matches_fresh_reopen() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = init_and_load(&dir).await; + + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "fold_a")], &[("$age", 30)]), + ) + .await + .unwrap(); + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "fold_b")], &[("$age", 31)]), + ) + .await + .unwrap(); + db.mutate( + "main", + MUTATION_QUERIES, + "remove_person", + &mixed_params(&[("$name", "fold_a")], &[]), + ) + .await + .unwrap(); + + // Fold side: count resolves the snapshot from the in-memory folded known_state. + let folded = count_rows(&db, "node:Person").await; + + // Scan side: a fresh open rebuilds known_state via `read_manifest_state`. + let reopened = Omnigraph::open(uri).await.unwrap(); + let scanned = count_rows(&reopened, "node:Person").await; + + assert_eq!( + folded, scanned, + "post-publish fold diverged from a fresh re-scan (folded {folded} vs scanned {scanned})" + ); +} diff --git a/docs/dev/handoff-rfc-013-write-path.md b/docs/dev/handoff-rfc-013-write-path.md index 3706012..a281b60 100644 --- a/docs/dev/handoff-rfc-013-write-path.md +++ b/docs/dev/handoff-rfc-013-write-path.md @@ -6,6 +6,180 @@ this doc is the *current-state map + the decisions/validation from the latest wo **Audience:** the engineer/agent who picks up RFC-013 next. +> **Two threads, one RFC.** RFC-013 has been worked in two overlapping lines by different +> cycles, with different sub-numbering. Don't let the numbering confuse you: +> - **Thread A — per-write call-count / RTT collapse + concurrency correctness** (steps 3b / +> 4 / 5, Design A / `PublishPlan`, #297 / #254 / #296 / #298). This is **§1–§10 below** (the +> original body of this doc). Read it for the concurrency model and the convergent fix. +> - **Thread B — `__manifest` scan amplification + the unbounded `_versions/` chain** (the +> investigation framed as **PR1 / PR2 / PR3 / PR4**). This is **§A below (read it first)** — +> it is the most recent cycle (2026-06-26) and where the live branch sits. +> +> They meet at the internal-table maintenance area (Thread A's "step 2a/2b" == Thread B's +> "compaction is done / PR1 = bound the chain"). §A maps the two framings. + +--- + +## §A. Current state — the `__manifest` scan + version-chain thread (2026-06-26) — READ FIRST + +The latest cycle attacked **write latency that grows with graph age on object storage** and a +**hard open failure on a large `_versions/` chain**. The working plan (survives context resets, +**not in the repo**) is `~/.claude/plans/in-the-mean-time-humble-reef.md` — pull it for the full +PR1/PR2/PR3/PR4 decomposition, the assumptions-validated-against-code list, and the critical-files +map. This section is the durable summary. + +### A.1 The problem (root cause) + +Two interacting terms, both centered on the internal `__manifest` Lance dataset (the cross-table +catalog; one dataset, 217 tables on a real graph): + +- **Term 1 — repeated full `__manifest` scans per write.** Each read of `__manifest` was a bare + `dataset.scan()` (no filter/projection/index), cost **O(fragments F)**. A publish did **4** such + scans (OCC re-capture + `load_publish_state` + the `use_index(false)` merge-insert join + + post-publish read-back). `F` grows **+1 per write**, so per-write cost climbs with history. +- **Term 2 — unbounded `_versions/` chain.** `cleanup` version-GC **excludes** the internal tables + (`all_table_keys` is node/edge-only, `db/omnigraph/optimize.rs`), so `__manifest/_versions/` + grows without bound (768 objects measured). Lance **lists** that prefix on every open; once large, + a store like RustFS times out serving the list page → branch ops take minutes or **fail outright**. + Unfixable via the public CLI today (`cleanup --keep` can't target `__manifest`). + +Validated three ways: a multi-agent workflow (readers + adversarial refutation), Lance 7.0.0 source, +and live branch-op probes on a RustFS mirror. **Key Lance fact (closed the open-path investigation):** +on standard S3-class stores (R2/RustFS, not S3 Express) Lance sets `list_is_lexically_ordered = true`, +so it **never** uses `latest_version_hint.json` — it always lists `_versions/`. So the version hint is +**not** a lever; only **bounding the chain (PR1)** fixes the open path. (Corollary: on production R2 the +open is one cheap list page regardless of chain length, so the chain barely affects R2 open latency — +the RustFS *failure* is a RustFS list-page limit; the R2 ~16s write latency is Term-1 fragment +amplification = PR2's target.) + +### A.2 What is LANDED on `main` + +- **Step 2a** — `optimize` compacts the internal tables too (`__manifest` / `_graph_commits` / + `_graph_commit_actors`), so a *periodically-compacted* graph keeps Term-1 flat. (Cleanup/version-GC + of them is the still-open PR1.) +- **Phase 7 / #299** (`1c5cb874`) — graph lineage lives in `__manifest` (`graph_commit` + + `graph_head:` rows in the same publish merge-insert; `_graph_commits` is now a projection; + v3→v4 internal-schema migration; schema-version floor). This removed the per-write commit-graph + scan and closed the manifest→commit-graph atomicity + commit-graph-parent-under-concurrency gaps. + **This is the base everything below builds on.** + +### A.3 What is IN FLIGHT — branch `ragnorc/read-lance-table-docs` = **PR #307** (OPEN, base `main`) + +Ten commits ahead of `origin/main`. PR #307 includes the scan-halving work, the fold +correctness fix, and PR2.1's ground-truth cost harness. + +1. **PR2 — halve per-write `__manifest` scans** (`4ac3cde4` + tripwire `52a7e0cd`). Three moves: + - **#1a probe-gate the OCC re-capture** (`occ_snapshot_for_branch`, mirrors the read path's + `resolve_target_inner`): replace the cold re-scan with a cheap incarnation **probe**; reuse the + warm coordinator on match, cold-scan only on mismatch. + - **#1b in-memory post-publish fold** (`fold_inputs` in `publisher.rs`; `PublishOutcome.known_state`): + build the new `ManifestState` from `existing_versions ∪ pending rows` instead of re-scanning. + - **#1c projection** on `read_manifest_scan` (drop `base_objects` always, `object_id` off the + table-state path). + - Net: per-write `__manifest` scans **4 → 2**; the two inherent publisher scans + (`load_publish_state` + the `use_index(false)` merge-join) remain O(F). +2. **Fold correctness fix** (`5537cd95` test, `245cb26d` fix) — a reviewer (Cursor Bugbot High + + Codex P2) caught that `#1b`'s fold dropped a **same-version owner-branch handoff**: a `table_version` + row UPDATEd in place at the same Lance version with a new `table_branch` (merge-insert `UpdateAll` + on the deterministic `version_object_id`) was appended after `existing_versions`, and + `assemble_manifest_state` kept the stale first entry, so the warm coordinator held the wrong fork + until refresh. Fix: key the fold's version entries by `(table_key, table_version)` so a pending row + **replaces** the existing one (mirroring `UpdateAll`). Test-first repro in + `db/manifest/tests.rs::test_post_publish_fold_reflects_owner_branch_handoff` (red→green). +3. **PR2.1 — ground-truth cost harness** (`fd73f01b`, `59d9ff39`, `3cd2b2c1`, `383022e8`, `9f1e5b6e`). + Rebuilt `tests/helpers/cost.rs` on lance's IO-counted idiom (`incremental_stats()` deltas; one + `IOTracker` per class). Added `cost_harness(body)` / `GraphIoMeter`: it installs one `__manifest` + tracker **before the coordinator opens**, so the tracker rides every handle (init + each + post-publish reassignment at `db/manifest.rs:590`). `manifest_reads` is now **ground truth** + (handle-age-irrelevant), closing the blind spot where a per-op tracker installed at measure time + could not see reads on the long-lived warm handle. `last_manifest_reads()` dumps the read log for + `assert_io_eq!`-style failure diagnostics. Outside `cost_harness`, `measure` falls back to + fresh-per-op, so `write_cost_s3.rs` is untouched. (Kept the bespoke `PrefixCounter` for the + opener/scan split — lance does the same with `throttle_store`/`failing_store`, and the + request-log alternative would couple to unstable debug method-strings.) + +### A.4 The accurate measurement (PR2.1's payoff — what it told us) + +The old (fresh-only) harness **undercounted writes**: `#1a`'s probe rides the warm handle, and its +reads escaped the per-op tracker (they showed only as `version_probes=1`). Ground truth counts them +and reveals **a write's freshness probe does ~3 `__manifest` object-store RPCs** (a *read*'s probe is +a 0-IO cache hit). So, apples-to-apples (both ground truth), per-write `__manifest` ops: + +| | depth 10 | depth 100 | slope | +|---|---|---|---| +| Pre-PR2 (4 cold scans) | 50 | 410 | +4/write | +| Post-PR2 (ground truth) | 28 | 208 | +2/write | + +- PR2 roughly **halved** the per-write manifest work **and its growth slope** (+4 → +2/write). +- The **compacted/maintained floor is ~5 RPCs/write, flat in history** — the 3-RPC probe now dominates + it (it is O(1), not O(F)). So `#1a` made the OCC re-capture O(1), it did not make it free. +- For latency: a periodically-compacted graph has bounded, history-independent per-write manifest + cost; an unmaintained graph still grows at half the rate (PR1 flattens the residual). The probe and + RFC #7264 are the levers for the compacted floor. (The harness measures op *count*, the latency proxy + on object stores; the ~16s R2 figure is the open-path chain = PR1, separate.) +- Regression guard: `write_cost.rs::manifest_reads_capture_warm_probe` (fresh=11 vs ground-truth=14) + goes red if the ground-truth wiring reverts. + +### A.5 What is LEFT (priority order) — Thread B + +1. **PR1a — manual `__manifest`-only cleanup** *(available now, no new invariant; HIGHEST priority — + it is the only thing that fixes the hard open **failures**)*. Add `all_table_keys_internal()` + + `cleanup_internal_tables()` reusing the generic `cleanup_all_tables` loop (`optimize.rs`); refuse on + a pending recovery sidecar. Safe **only on a quiesced graph** (no concurrent writers → no Q8 + resurrection race). Shrinks `_versions/` (768 → keep-N). This is RFC **step 2b's available half**. + Pair with a **V2-naming surface guard** (protects the one-page open fast-path). +2. **PR3 (the available half) — branch-op de-amplification.** Branch **merge** candidate-scoping + (avoid 3 full cross-branch snapshots + union-all-keys upfront, `exec/merge.rs`); **parallelize** the + branch-delete loop (`ensure_branch_delete_safe` snapshots every other branch — O(branches)). Each + per-branch scan is already cheaper post-PR2 (#1c projection). +3. **Design-gated / deferred:** + - **PR1b — the Q8 durable boundary watermark** for SAFE automated/cadenced GC under live writers + (Lance version create is a bare `PutMode::Create` with no monotonic guard → a stalled writer can + resurrect a GC'd version = silent lost write on R2/S3). Invariant-level, partially MTT-redundant. + **This is the same design point as Thread A's "step 2b / Q8 watermark" in §8.** Design deliberately + or wait for RFC #7264. + - **PR3 branch-delete O(1)** — needs a cross-branch dependency index (the `table_branch` dependency + is genuinely cross-branch with no index today). + - **PR4 / RFC #7264** — Lance native branch-aware `BatchCreateTableVersions`; manifest read → O(1), + per-write fragment append gone; retires most of PR1/PR2. Upstream-blocked. +4. **Low-leverage:** retire the vestigial `_graph_commits`/`_graph_commit_actors` datasets (zero rows + post-#299, only branch-ref carriers); a bitmap index on `__manifest` (no builder exists; `use_index(false)` + means it can't serve the CAS join anyway — a `graph_head:` point-lookup is the better variant). + +### A.6 Critical files (Thread B) + +- `db/manifest/state.rs` — `read_manifest_state` / `read_manifest_scan` / `assemble_manifest_state` (the + shared reduction both the fold and the scan feed). +- `db/manifest/publisher.rs` — `fold_inputs` / `PublishOutcome` / `is_owner_branch_handoff` (publisher.rs:267, + the same-version handoff the fold must honor) / the merge-insert CAS. +- `db/manifest.rs` — `commit_changes_with_lineage` (adopts the fold; `self.dataset = dataset` reassignment + at :590, the reason the cost tracker must be installed before open) + the probes. +- `db/omnigraph.rs` — `occ_snapshot_for_branch` (#1a), `resolved_branch_target`, `ensure_branch_delete_safe` (PR3). +- `exec/staging.rs` `commit_all`; `exec/merge.rs` (PR3); `db/omnigraph/optimize.rs` (`all_table_keys`, + `cleanup_all_tables` — PR1). +- `tests/helpers/cost.rs` (the harness), `tests/write_cost.rs` / `warm_read_cost.rs` / `write_cost_s3.rs`, + `tests/writes.rs` / `consistency.rs` / `composite_flow.rs` (must stay green). + +### A.7 Gotchas (Thread B, learned this cycle) + +- **A per-op object-store wrapper cannot see a long-lived handle's reads.** That was the measurement + blind spot. The fix is to install the tracker before the handle opens (`cost_harness`), not at measure + time. A write's warm-handle probe is **3 RPCs** that hid behind `version_probes=1`. +- **`cost_harness` must wrap the WHOLE test body** (the graph must open inside it), and the body future + must be **`Box::pin`-ed** — wrapping a whole test body in another async layer overflows the test + thread's stack (these cost tests already raise `recursion_limit`). +- **The fold must mirror merge-insert identity.** `version_object_id(table_key, version)` is + deterministic, so a same-version handoff is an in-place `UpdateAll`; the in-memory fold must key by + `(table_key, version)` and replace, or the warm coordinator desyncs from a fresh re-scan. The + byte-identity guard is `writes.rs::post_publish_fold_matches_fresh_reopen`. +- **`lance-io` `test-util`** is enabled in dev-deps (gives `IoStats.requests` + `assert_io_eq!`, + diagnostics only); production builds exclude dev-deps so they never see it. + +### A.8 Immediate next action + +The natural next PR is **PR1a** (no design gate, fixes the RustFS open failures). Run and confirm +the relevant test gate before starting or stacking that follow-up. + --- ## 0. TL;DR — where we are and what's next @@ -22,6 +196,9 @@ for the canonical list. Current reality: - **Step 2a** — internal-table compaction: `optimize` now compacts `__manifest` / `_graph_commits` / `_graph_commit_actors` (#291). Plus the RFC latency-model correction (#292). +- **Step 4 / Phase 7** — graph lineage moved into `__manifest` (#299 `1c5cb874`): + `graph_commit` + mutable `graph_head:` in the publish merge-insert, + `_graph_commits` now a projection. **The base for the live branch (§A).** - **Optimize-vs-write race** — optimize survives a cross-process write race on the same table (#297, **LANDED** — origin/main `6d4606a8`; see §6 for why it's not redundant with Design A). Step 3b stacks on top of this. @@ -36,9 +213,11 @@ for the canonical list. Current reality: (same op-class family as #297, logical side). **Step 3b is DONE** (capture-once `WriteTxn`, schema-once + open-collapse; see §4) on -`rfc-013-step-3b-writetxn-v2`. **Next: Phase 7 (step 4), then the big one — Design A / -`PublishPlan` unification (step 5)** — see §5, the convergent fix for the bug *class* this -area keeps generating, which also absorbs 3b's deferred session-aware write opens. +`rfc-013-step-3b-writetxn-v2`. **Phase 7 (step 4) has since LANDED on `main` (#299 `1c5cb874`)** +— lineage now lives in `__manifest` (see §A.2). **Next for Thread A: the big one — Design A / +`PublishPlan` unification (step 5)** — see §5, the convergent fix for the bug *class* this area +keeps generating, which also absorbs 3b's deferred session-aware write opens. **Next for Thread B +(the live branch, §A): PR1a** (manual `__manifest` cleanup — fixes the RustFS open failures). --- @@ -397,11 +576,12 @@ for #298** (which built none of those constructs) but are **load-bearing constra (§1d.1). Restore the live-HEAD cardinality scan, add the deterministic regression test, fix the wrong doc comment. Small, gate-safe, un-regresses an integrity check (invariant 9). The residual concurrent TOCTOU is the §7.1 gap (step 4) — un-widen here, don't over-reach. -- **Step 4 / Phase 7** (`iss-991`): lineage into `__manifest` (publish `graph_commit` + - mutable `graph_head:` in the same merge-insert; `_graph_commits` becomes a - projection). Removes the per-write `commit_graph.refresh`; closes the manifest→commit-graph - atomicity + commit-graph-parent-under-concurrency gaps. **Hard prereq: step 2 (done).** - Carries the §7.1 *concurrent* write-skew fix (needs the `graph_head` contention row) — +- **Step 4 / Phase 7** (`iss-991`): **LANDED on `main` as #299 (`1c5cb874`).** Lineage now lives + in `__manifest` (`graph_commit` + mutable `graph_head:` in the same merge-insert; + `_graph_commits` is a projection). Removed the per-write `commit_graph.refresh`; closed the + manifest→commit-graph atomicity + commit-graph-parent-under-concurrency gaps. *(Historical note, + kept for the §7.1 framing it carried:)* it + carries the §7.1 *concurrent* write-skew fix (needs the `graph_head` contention row) — **frame §7.1 as "unify the entire write-validation read-set" (endpoint + cardinality + cross-version uniqueness), not merely "add `graph_head`"** (§1d.1): the bespoke `edge_cardinality_read_handle` and the mutation-vs-loader freshness fork dissolve into one diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 1b480f4..2038804 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -27,7 +27,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands | | `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by synthetic snapshot-id incarnation or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below | | `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest`/`_graph_commits` scans flat in commit-history depth — `internal_table_scans_are_flat_in_history`, now **green every-PR** since RFC-013 step 2 brought the internal tables into `optimize`; the test compacts at each depth before measuring) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below | -| `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary | +| `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives; reads per-op deltas via lance's `incremental_stats()`, the upstream per-request idiom from `rust/lance/src/dataset/tests/dataset_io.rs`), `cost_harness`/`GraphIoMeter` (installs ONE `__manifest` `IOTracker` for a whole test body so the graph opens **under** it and `manifest_reads` is **ground truth** — every read regardless of handle age, the warm-coordinator freshness probe included — closing the blind spot where a per-op tracker installed at measure time cannot see a long-lived handle's reads; outside `cost_harness`, `measure` falls back to fresh per-op tracking, so `write_cost_s3.rs` is unaffected), `last_manifest_reads()` (the manifest read log for `assert_io_eq!`-style failure diagnostics), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary | | `lifecycle.rs` | Graph lifecycle, schema state | | `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) | | `changes.rs` | `diff_between` / `diff_commits` | @@ -140,6 +140,7 @@ Correctness bugs fail loudly in tests; cost-scaling bugs pass every test and deg - **Assert a cost budget, not just a result.** For a read/open path, assert the number of `Dataset::open` calls (or object-store ops) a warm query performs, and that it does not grow with commit count. The reference is LanceDB's IO-counted tests, which assert a cached read costs 0-1 IO and carry a named regression test against "a list call on every subsequent query." - **Test at history depth.** Build a fixture with many *commits* (not many rows) and assert warm-read cost is flat across depths. A shallow fixture cannot catch an O(commits) cost. - **Use the shared harness, and gate each term on the backend where it manifests.** `helpers::cost` (`measure`/`IoCounts`/`assert_flat`/`local_graph`/`s3_graph`) is the one place the `IOTracker`/task-local plumbing lives — consume it, don't duplicate it. The write path has *two distinct* depth terms that split cleanly across backends, and conflating them is a real trap (the local data-table read count grows with depth too, but for a different reason — the merge-insert/RI scan reading O(depth) *fragments*, reduced by compaction, not by the opener): (1) the **internal-table** scan term (`__manifest`/`_graph_commits` fragment scans) reproduces on **any** backend including local FS, so `write_cost.rs` gates it on local every-PR; (2) the **data-table opener** term (latest-version resolution) is a per-object-store-RPC phenomenon — local-FS resolves latest with one cheap `read_dir` regardless of the opener used, so the namespace-vs-direct difference is **invisible on local** and only shows on a real object store (per-version GETs), gated by the bucket-gated `write_cost_s3.rs`. Same harness, different fixture; each term asserted where it actually appears. +- **Count on the handle that does the reads, not just the one a measured op opens.** Lance's IO-counted tests attach the `IOTracker` to the (warm, cached) dataset and read `incremental_stats()` per request — the tracker MUST be on the handle performing the reads, or warm-handle reads escape. A per-op tracker installed at measure time cannot see reads on a long-lived handle opened earlier (the warm coordinator's `__manifest` handle, reused across writes), so such reads were silently undercounted. Wrap a depth-swept body in `cost_harness` so the manifest tracker is installed before the graph opens and `manifest_reads` is **ground truth** (handle-age-irrelevant). The `version_probes` counter is the freshness-probe *call* count; ground truth additionally reveals that a write's probe does ~3 object-store RPCs (a read's probe is a 0-IO cache hit). `manifest_reads_capture_warm_probe` is the guard that this stays true. - This is the testing companion to invariant 15 in [docs/dev/invariants.md](invariants.md) (hot-path cost is bounded by work, not history). When in doubt, re-read [docs/dev/invariants.md](invariants.md) — quality gates apply to every change.