diff --git a/crates/omnigraph/src/db/commit_graph.rs b/crates/omnigraph/src/db/commit_graph.rs index 572bdf5..181d1d8 100644 --- a/crates/omnigraph/src/db/commit_graph.rs +++ b/crates/omnigraph/src/db/commit_graph.rs @@ -396,7 +396,7 @@ pub(crate) fn graph_commits_uri(root_uri: &str) -> String { format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR) } -fn graph_commit_actors_uri(root_uri: &str) -> String { +pub(crate) fn graph_commit_actors_uri(root_uri: &str) -> String { format!( "{}/{}", root_uri.trim_end_matches('/'), diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 4c6410b..fa05b49 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -28,7 +28,8 @@ mod recovery; mod state; use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at}; -use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash}; +use layout::{open_manifest_dataset, table_uri_for_path, type_name_hash}; +pub(crate) use layout::manifest_uri; pub(crate) use metadata::TableVersionMetadata; #[cfg(test)] use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state}; diff --git a/crates/omnigraph/src/db/manifest/layout.rs b/crates/omnigraph/src/db/manifest/layout.rs index f4ac09b..12894a7 100644 --- a/crates/omnigraph/src/db/manifest/layout.rs +++ b/crates/omnigraph/src/db/manifest/layout.rs @@ -15,7 +15,7 @@ pub(super) fn type_name_hash(name: &str) -> String { format!("{:016x}", h) } -pub(super) fn manifest_uri(root: &str) -> String { +pub(crate) fn manifest_uri(root: &str) -> String { format!("{}/{}", root.trim_end_matches('/'), MANIFEST_DIR) } diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 9a0a17f..29bf2b6 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -248,10 +248,8 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result> = futures::stream::iter(table_tasks.into_iter()) @@ -279,7 +277,42 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Resultmanifest drift). let version_before = ds.version().version; + + // Keep optimize non-destructive on upgraded graphs (same guarantee the + // internal-table path makes — see `clear_stale_auto_cleanup_config`). + // `compact_files` / `optimize_indices` commit with a default `CommitConfig` + // (`skip_auto_cleanup = false`) and expose no skip override, so on a graph + // created by a pre-v7 binary (auto_cleanup ON) those commits would fire + // Lance's version-GC hook and prune `__manifest`-pinned data-table versions. + // Strip the stale config first. We hold the per-table queue, so no concurrent + // writer can race this (no retry loop needed, unlike the internal-table path); + // any commit it makes is content-preserving and covered by the Optimize + // sidecar's loose `post_commit_pin` like the other Phase-B commits. + clear_stale_auto_cleanup_config(&mut ds) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let metrics: CompactionMetrics = if will_compact { compact_files(&mut ds, options, None) .await @@ -514,6 +565,173 @@ async fn optimize_one_table( Ok(stat) } +/// Bound on the app-level retry of an internal-table compaction against a +/// concurrent live writer (see [`is_retryable_lance_conflict`]). +const INTERNAL_COMPACTION_RETRY_BUDGET: u32 = 5; + +/// A Lance commit error that means "a concurrent writer preempted us; reload the +/// dataset and rerun." `compact_files` commits via `commit_compaction` -> +/// `apply_commit` *directly* — unlike the merge-insert path it is NOT wrapped in +/// `execute_with_retry`, so a `Rewrite`-vs-`Merge`/`Update`/`Delete` `check_txn` +/// conflict propagates raw instead of being rebased or converted to +/// `TooMuchWriteContention`. Lance's transaction spec prescribes that the +/// *application* reruns these, which is what `compact_internal_table` does — so a +/// maintenance compaction (a physical op) never fails a live write (a logical op), +/// invariant 7. (`TooMuchWriteContention` is included for the exhausted-retry form +/// some commit paths surface.) +fn is_retryable_lance_conflict(err: &lance::Error) -> bool { + matches!( + err, + lance::Error::RetryableCommitConflict { .. } + | lance::Error::CommitConflict { .. } + | lance::Error::TooMuchWriteContention { .. } + ) +} + +/// Remove any stored `lance.auto_cleanup.*` config from a table so compaction +/// stays **non-destructive by construction**. Used by both the internal-table +/// path ([`compact_internal_table`]) and the data-table path +/// ([`optimize_one_table`]). +/// +/// `compact_files` / `optimize_indices` commit with a default `CommitConfig` +/// (`skip_auto_cleanup = false`) and `CompactionOptions` exposes no override, so on +/// a dataset whose stored config has `lance.auto_cleanup.interval` set, the +/// compaction/reindex commit would fire Lance's auto-cleanup hook (version GC) — +/// deletion of old versions, including ones `__manifest` pins for snapshots / +/// time-travel (data tables) or that hold lineage/time-travel state (internal +/// tables). New graphs create tables with `auto_cleanup: None` (`manifest/graph.rs`, +/// `commit_graph.rs`, and the data-table create path) so there is nothing to clear; +/// only pre-`auto_cleanup`-fix *upgraded* graphs carry the config. OmniGraph owns +/// version cleanup explicitly (`cleanup`), so Lance's hook is unwanted regardless — +/// clearing it both makes `optimize` non-destructive and aligns the table with the +/// new-graph posture. The `delete_config_keys` commit itself does not GC: the +/// resulting manifest no longer has the `interval` key, so the post-commit hook is a +/// no-op. Returns whether any config was cleared (it advances Lance HEAD iff so). +/// Recovery coverage differs by caller: the data-table path runs this inside the +/// Optimize sidecar window; the internal-table path needs none (it commits at HEAD +/// and is read at HEAD — the strip is a content-preserving config commit, so a crash +/// leaves the table readable and content-identical, see [`compact_internal_table`]). +async fn clear_stale_auto_cleanup_config( + ds: &mut lance::Dataset, +) -> std::result::Result { + let keys: Vec = ds + .config() + .keys() + .filter(|k| k.starts_with("lance.auto_cleanup.")) + .cloned() + .collect(); + if keys.is_empty() { + return Ok(false); + } + // Merge-update with `None` values to delete the keys — the non-deprecated + // replacement for `delete_config_keys` (awaiting the builder merges rather + // than replacing the whole config map). + let entries: Vec<(&str, Option<&str>)> = keys.iter().map(|k| (k.as_str(), None)).collect(); + ds.update_config(entries).await?; + Ok(true) +} + +/// Compact one INTERNAL system table (`__manifest` / `_graph_commits` / +/// `_graph_commit_actors`) in place. +/// +/// Unlike catalog data tables, the internal tables are not tracked in the +/// `__manifest` (they ARE the manifest / the lineage DAG): readers open them at +/// their latest Lance HEAD, so compaction just advances that HEAD and the next +/// reader transparently observes the compacted version. That makes this path much +/// simpler than [`optimize_one_table`] — no manifest publish (nothing to publish +/// to), and no recovery sidecar. The sidecar-free claim does NOT rest on +/// single-commit atomicity: `compact_files` can emit a `ReserveFragments` commit +/// before the final `Rewrite` (and the config strip is a separate commit before +/// both), so this advances HEAD over one or more commits. It needs no sidecar +/// because every one of those commits is content-preserving and the table is read +/// at HEAD — a crash at any point leaves the table readable and content-identical, +/// and the next `optimize` re-plans. Internal tables carry no Lance index (only +/// `object_id`'s unenforced-PK schema metadata), so no `optimize_indices`. +/// +/// Concurrency: no application lock, but `compact_files` does NOT auto-retry a +/// semantic conflict — its `Operation::Rewrite` commits through `apply_commit` +/// directly (not the merge-insert `execute_with_retry` path), so a `Rewrite` +/// vs concurrent `Update`/`Merge`/`Delete` `check_txn` conflict propagates raw. +/// We own the retry here (see [`is_retryable_lance_conflict`]): on a retryable +/// conflict, reopen at the new HEAD and rerun. A follow-up coordinator `refresh` +/// makes the warm internal-table handles observe the compacted HEAD +/// deterministically (the version probe would also self-heal on the next read). +async fn compact_internal_table( + db: &Omnigraph, + table_key: &str, + uri: String, +) -> Result { + // App-level retry against concurrent live writers. compact_files does NOT + // auto-retry a Rewrite-vs-live-write conflict (see is_retryable_lance_conflict), + // so optimize would otherwise fail spuriously on a live graph. On a retryable + // conflict we re-open at the new HEAD and rerun — the canonical Lance-consumer + // pattern. Each attempt opens fresh because the conflict means the version moved. + for attempt in 0..INTERNAL_COMPACTION_RETRY_BUDGET { + let handle = db + .storage() + .open_dataset_head_for_write(table_key, &uri, None) + .await?; + let mut ds = handle.into_dataset(); + + // Keep optimize non-destructive by construction (see clear_stale_auto_cleanup_config). + // Returns whether it committed a config-strip (which advances Lance HEAD). + let cleared_config = match clear_stale_auto_cleanup_config(&mut ds).await { + Ok(cleared) => cleared, + Err(e) => { + if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) + { + continue; + } + return Err(OmniError::Lance(e.to_string())); + } + }; + + let options = CompactionOptions::default(); + let plan = plan_compaction(&ds, &options) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + if plan.num_tasks() == 0 { + // No compaction work, but a config-strip still advanced HEAD — refresh + // the warm coordinator handles so they observe it deterministically + // (same cache-coherence step the successful-compaction path takes + // below; otherwise they stay pinned until the next version probe). + if cleared_config { + db.coordinator.write().await.refresh().await?; + } + return Ok(TableOptimizeStats::compacted( + table_key.to_string(), + &CompactionMetrics::default(), + false, + )); + } + + match compact_files(&mut ds, options, None).await { + Ok(metrics) => { + // Cache coherence: re-open the warm coordinator's internal-table + // handles at the compacted HEAD (they live in `db.coordinator`, not + // the data-table `runtime_cache`). + db.coordinator.write().await.refresh().await?; + return Ok(TableOptimizeStats::compacted( + table_key.to_string(), + &metrics, + true, + )); + } + Err(e) + if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET + && is_retryable_lance_conflict(&e) => + { + continue; + } + Err(e) => return Err(OmniError::Lance(e.to_string())), + } + } + Err(OmniError::manifest_conflict(format!( + "internal-table compaction of {table_key} exhausted {INTERNAL_COMPACTION_RETRY_BUDGET} \ + retries against concurrent writers" + ))) +} + /// Run Lance `cleanup_old_versions` on every node + edge table on `main`, /// using [`CleanupPolicyOptions`]. The latest manifest is always preserved /// regardless (Lance invariant). @@ -912,6 +1130,26 @@ mod tests { use crate::failpoints::ScopedFailPoint; use crate::loader::{LoadMode, load_jsonl}; + /// The internal-table compaction retry classifier: a concurrent live writer + /// preempting our `Rewrite` is retryable (Lance prescribes app-rerun, and + /// compact_files does not auto-retry it); a non-conflict error is not (must not + /// be masked by a blind retry). + #[test] + fn retryable_lance_conflicts_are_classified() { + assert!(is_retryable_lance_conflict( + &lance::Error::retryable_commit_conflict_source( + 1, + Box::new(std::io::Error::other("preempted by concurrent write")), + ) + )); + assert!(is_retryable_lance_conflict( + &lance::Error::too_much_write_contention("contended") + )); + assert!(!is_retryable_lance_conflict(&lance::Error::invalid_input( + "not a conflict" + ))); + } + fn node_table_uri(root: &str, type_name: &str) -> String { let mut hash: u64 = 0xcbf2_9ce4_8422_2325; for &b in type_name.as_bytes() { diff --git a/crates/omnigraph/tests/helpers/cost.rs b/crates/omnigraph/tests/helpers/cost.rs index 4be9ee6..2114f23 100644 --- a/crates/omnigraph/tests/helpers/cost.rs +++ b/crates/omnigraph/tests/helpers/cost.rs @@ -334,6 +334,23 @@ pub async fn measure_insert(db: &mut Omnigraph, tag: &str) -> IoCounts { io } +/// Like [`measure_insert`] but carries an actor, so the write appends to and reads +/// `_graph_commit_actors.lance` — the authenticated (server/CLI) write path. The +/// commit-graph IO wrapper covers both `_graph_commits` and `_graph_commit_actors`, +/// so `IoCounts::commit_graph_reads` includes the actor-table scan on this path. +pub async fn measure_insert_as(db: &mut Omnigraph, tag: &str, actor: &str) -> IoCounts { + let (res, io) = measure(db.mutate_as( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", tag)], &[("$age", 30)]), + Some(actor), + )) + .await; + res.unwrap(); + io +} + // ── Backend fixtures — one knob, store-agnostic body ── /// Local tempdir graph (default; deterministic, every-PR). diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index 131f91b..d89227f 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -182,6 +182,22 @@ pub async fn commit_many(db: &mut Omnigraph, n: usize) { } } +/// Like [`commit_many`] but every commit carries an actor, so it grows +/// `_graph_commit_actors.lance` too — the authenticated (server/CLI) write path. +pub async fn commit_many_as(db: &mut Omnigraph, n: usize, actor: &str) { + for i in 0..n { + db.mutate_as( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", &format!("commit_many_as_{i}"))], &[("$age", 30)]), + Some(actor), + ) + .await + .unwrap(); + } +} + pub async fn snapshot_main(db: &Omnigraph) -> Result { db.snapshot_of(ReadTarget::branch("main")).await } diff --git a/crates/omnigraph/tests/maintenance.rs b/crates/omnigraph/tests/maintenance.rs index 78e31fa..ca9026d 100644 --- a/crates/omnigraph/tests/maintenance.rs +++ b/crates/omnigraph/tests/maintenance.rs @@ -94,13 +94,23 @@ async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() { let stats = db.optimize().await.unwrap(); - // Schema declares 2 nodes + 2 edges = 4 tables. Compaction should run on - // each but find nothing to merge. - assert_eq!(stats.len(), 4); + // Schema declares 2 nodes + 2 edges = 4 data tables, plus the 3 internal + // system tables (`__manifest`, `_graph_commits`, `_graph_commit_actors`) optimize + // also compacts (RFC-013 step 2) = 7. Compaction should run on each but find + // nothing to merge. + assert_eq!(stats.len(), 7); for s in &stats { assert_eq!(s.fragments_removed, 0, "{} should not remove", s.table_key); assert_eq!(s.fragments_added, 0, "{} should not add", s.table_key); } + // The internal tables are present and reported as no-ops on an empty graph. + for key in ["__manifest", "_graph_commits", "_graph_commit_actors"] { + let s = stats + .iter() + .find(|s| s.table_key == key) + .unwrap_or_else(|| panic!("optimize stats missing internal table {key}")); + assert!(!s.committed, "{key} should be a no-op on an empty graph"); + } } #[tokio::test] @@ -133,6 +143,224 @@ async fn optimize_after_load_then_again_is_idempotent() { } } +/// RFC-013 step 2: `optimize` compacts the internal system tables +/// (`__manifest`, `_graph_commits`), which accumulate one fragment per commit. +/// After compaction they shed fragments, write no recovery sidecar (a single +/// atomic Lance commit — no HEAD-before-publish gap), and the graph stays +/// coherent for subsequent reads + strict writes. +#[tokio::test] +async fn optimize_compacts_internal_tables() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + // Build version-history depth so the internal tables accumulate fragments. + for i in 0..20 { + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", &format!("p{i}"))], &[("$age", 30)]), + ) + .await + .unwrap(); + } + + let stats = db.optimize().await.unwrap(); + + for key in ["__manifest", "_graph_commits"] { + let s = stats + .iter() + .find(|s| s.table_key == key) + .unwrap_or_else(|| panic!("optimize stats missing internal table {key}")); + assert!(s.committed, "{key} should compact after 20 commits"); + assert!( + s.fragments_removed > 0, + "{key} should shed fragments, removed {}", + s.fragments_removed + ); + } + + // Internal compaction leaks no recovery sidecar. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let leftover: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .map(|e| e.file_name()) + .collect(); + assert!( + leftover.is_empty(), + "optimize leaked recovery sidecars: {leftover:?}" + ); + } + + // Coherent after internal compaction: reads + a strict write still work. + assert!(count_rows(&db, "node:Person").await > 0); + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "after_compact")], &[("$age", 40)]), + ) + .await + .unwrap(); +} + +/// `optimize` must not fail on a graph that has no `_graph_commits.lance` — a valid +/// state the coordinator opens as `commit_graph = None` (graphs predating the commit +/// graph). Without the existence guard, `Dataset::open` on the absent table errors +/// and fails the whole optimize. Regression for the missing-existence-guard. +/// +/// Uses an EMPTY graph deliberately: a graph with data would publish during +/// optimize, and a publish records a graph commit that recreates `_graph_commits` +/// before the guard runs — masking the bug. With no data, nothing recreates it, so +/// the table stays absent through the guard. +#[tokio::test] +async fn optimize_tolerates_absent_graph_commits_table() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + // Simulate a graph with no commit-graph dataset. + std::fs::remove_dir_all(dir.path().join("_graph_commits.lance")).unwrap(); + + // Coordinator tolerates the absence; optimize must succeed (the guard skips the + // absent table rather than letting `Dataset::open` error) and omit its stat. + let db = Omnigraph::open(uri).await.unwrap(); + let stats = db.optimize().await.unwrap(); + assert!( + stats.iter().any(|s| s.table_key == "__manifest"), + "__manifest must still be compacted" + ); + assert!( + !stats.iter().any(|s| s.table_key == "_graph_commits"), + "absent _graph_commits must be skipped, not opened (would error)" + ); +} + +/// `optimize` must stay NON-DESTRUCTIVE on a pre-`auto_cleanup`-fix upgraded graph: +/// `compact_files` would otherwise fire the dataset's stored `lance.auto_cleanup.*` +/// hook (version GC) during the compaction commit. Internal-table compaction clears +/// that stale config first, so no versions are deleted. Without the clear, the +/// aggressive policy below GCs old versions and the count drops. +#[tokio::test] +async fn optimize_clears_stale_auto_cleanup_and_preserves_versions() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + for i in 0..5 { + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", &format!("v{i}"))], &[("$age", 30)]), + ) + .await + .unwrap(); + } + let manifest_uri = format!("{}/__manifest", dir.path().to_str().unwrap()); + + // Simulate an upgraded graph: an aggressive stored auto_cleanup config that, if + // it fired during compaction, would GC old versions. + { + let mut ds = Dataset::open(&manifest_uri).await.unwrap(); + ds.update_config([ + ("lance.auto_cleanup.interval", Some("1")), + ("lance.auto_cleanup.older_than", Some("0s")), + ]) + .await + .unwrap(); + } + let versions_before = Dataset::open(&manifest_uri) + .await + .unwrap() + .versions() + .await + .unwrap() + .len(); + + db.optimize().await.unwrap(); + + let ds = Dataset::open(&manifest_uri).await.unwrap(); + // (a) the stale auto_cleanup config was cleared (non-destructive by construction). + assert!( + !ds.config().keys().any(|k| k.starts_with("lance.auto_cleanup.")), + "optimize must clear stale auto_cleanup config; config = {:?}", + ds.config() + ); + // (b) no version GC: every pre-optimize version survives (compaction + the + // config-clear each add versions, so the count only grows). + let versions_after = ds.versions().await.unwrap().len(); + assert!( + versions_after >= versions_before, + "optimize must not GC __manifest versions: before={versions_before} after={versions_after}" + ); +} + +/// The same non-destructive guarantee on a DATA (node/edge) table, not just the +/// internal tables. `optimize_one_table` runs `compact_files` / `optimize_indices` +/// with a default `CommitConfig` (`skip_auto_cleanup = false`); on an upgraded +/// graph whose Person table still carries the pre-v7 `lance.auto_cleanup.*` config, +/// those commits would fire Lance's version-GC hook and prune `__manifest`-pinned +/// data-table versions. The path must strip that config first. Without the strip, +/// the aggressive policy below GCs old versions and the config survives the run. +#[tokio::test] +async fn optimize_clears_stale_auto_cleanup_on_data_tables_too() { + let dir = tempfile::tempdir().unwrap(); + let root = dir.path().to_str().unwrap().trim_end_matches('/').to_string(); + let mut db = init_and_load(&dir).await; + add_person_fragments(&mut db).await; // multiple fragments → will_compact + + // Simulate an upgraded graph: set an aggressive stored auto_cleanup config on + // the Person table. This is an out-of-band Lance commit (an `UpdateConfig` that + // advances HEAD past the manifest), so realign the manifest with a forced repair + // first — otherwise optimize skips the table as uncovered drift and never + // reaches the scrub. (Forced because UpdateConfig is not verified maintenance.) + let (_, _, person_full) = person_manifest_and_head(&db, &root).await; + { + let mut ds = Dataset::open(&person_full).await.unwrap(); + ds.update_config([ + ("lance.auto_cleanup.interval", Some("1")), + ("lance.auto_cleanup.older_than", Some("0s")), + ]) + .await + .unwrap(); + } + db.repair(RepairOptions { + confirm: true, + force: true, + }) + .await + .unwrap(); + + let versions_before = Dataset::open(&person_full) + .await + .unwrap() + .versions() + .await + .unwrap() + .len(); + let rows_before = count_rows(&db, "node:Person").await; + + db.optimize().await.unwrap(); + + let ds = Dataset::open(&person_full).await.unwrap(); + // (a) the stale auto_cleanup config was cleared (non-destructive by construction). + assert!( + !ds.config().keys().any(|k| k.starts_with("lance.auto_cleanup.")), + "optimize must clear stale auto_cleanup config on data tables; config = {:?}", + ds.config() + ); + // (b) no version GC: every pre-optimize version survives (compaction + the + // config-clear each add versions, so the count only grows). + let versions_after = ds.versions().await.unwrap().len(); + assert!( + versions_after >= versions_before, + "optimize must not GC Person versions: before={versions_before} after={versions_after}" + ); + // (c) data is intact — the run rewrote fragments, it did not drop rows. + assert_eq!(count_rows(&db, "node:Person").await, rows_before); +} + // PR3 (Workstream B): an existing scalar index does not cover fragments // appended after it was built (build_indices is existence-gated), so those // rows are scanned unindexed. `optimize` must fold them back in via Lance's diff --git a/crates/omnigraph/tests/write_cost.rs b/crates/omnigraph/tests/write_cost.rs index 5f753d7..c7e8528 100644 --- a/crates/omnigraph/tests/write_cost.rs +++ b/crates/omnigraph/tests/write_cost.rs @@ -24,19 +24,26 @@ mod helpers; use helpers::cost::{ - IoCounts, assert_flat, assert_grows, local_graph, measure_insert, measure_with_staged, + IoCounts, assert_flat, assert_grows, local_graph, measure_insert, measure_insert_as, + measure_with_staged, }; -use helpers::{MUTATION_QUERIES, commit_many, mixed_params}; +use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, mixed_params}; -// ── (A) The internal-table LOCK — RED today, the acceptance test for step 2 ── +// ── (A) The internal-table LOCK — the acceptance test for step 2 (compaction) ── // -// `__manifest` / `_graph_commits` scans must be O(1) in commit-history depth. -// RED today (O(fragments), uncompacted). Un-ignore when step 2 (internal-table -// compaction) lands — it must go green flat. (The data-table term is the S3 -// gate's, `write_cost_s3.rs`; local-FS hides it.) +// `__manifest` / `_graph_commits` / `_graph_commit_actors` scans on a write must be +// O(1) in commit-history depth **on a compacted graph**. Without internal-table +// compaction these scans are O(fragments) and grow forever; step 2 brings all three +// internal tables into `db.optimize()`, so after compaction the per-write scan is +// flat. The test runs the **authenticated (actorful) write path** — every commit +// carries an actor, so it grows `_graph_commit_actors.lance` too (the production +// server/CLI path); the commit-graph IO wrapper covers both that and `_graph_commits`, +// so `commit_graph_reads` includes the actor-table scan. It compacts at each depth +// checkpoint before measuring — pinning the production invariant "a periodically- +// compacted graph's write cost does not grow with version history." #[tokio::test] -#[ignore = "RED until step 2 (internal-table compaction): __manifest/_graph_commits scans are O(fragments) today — RFC-013 §0/§2.2. Un-ignore there as the red→green acceptance test."] async fn internal_table_scans_are_flat_in_history() { + const ACTOR: &str = "act-cost-gate"; let dir = tempfile::tempdir().unwrap(); let mut db = local_graph(&dir).await; @@ -44,20 +51,25 @@ async fn internal_table_scans_are_flat_in_history() { let mut current = 0u64; for d in [10u64, 100] { if d > current { - commit_many(&mut db, (d - current) as usize).await; + commit_many_as(&mut db, (d - current) as usize, ACTOR).await; current = d; } - let io = measure_insert(&mut db, &format!("lock_{d}")).await; + // Step 2: compaction folds all three internal tables' O(depth) fragments back + // to a small constant, so the following write's scan of them is flat. + db.optimize().await.unwrap(); + let io = measure_insert_as(&mut db, &format!("lock_{d}"), ACTOR).await; current += 1; // the measured write advanced depth by one eprintln!( - "depth~{d}: data={} __manifest={} _graph_commits={}", + "depth~{d}: data={} __manifest={} _graph_commits+actors={}", io.data_reads, io.manifest_reads, io.commit_graph_reads ); curve.push((d, io)); } assert_flat(&curve, |c| c.manifest_reads, 4, "__manifest scan"); - assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits scan"); + // commit_graph_reads covers BOTH _graph_commits and _graph_commit_actors (shared + // wrapper), so this also gates the actor table on the authenticated path. + assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits + _graph_commit_actors scan"); } // The data-table OPENER history-gate (opener flat across depth) lives in diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index eb6821a..3195bd0 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -285,11 +285,14 @@ them explicit. because Lance branch names can be deleted/recreated at the same version number; the manifest e_tag is carried into synthetic snapshot ids when available, and a detected same-branch manifest refresh clears read caches as the fallback for - e_tag-less table locations/topology. Remaining: the internal metadata tables - (`__manifest`, `_graph_commits`) are still not compacted, so the probe and - refresh cost still grows with fragment count on a long-lived graph (the - `optimize`-covers-internal-tables follow-up); the commit graph is not yet - reconcilable from the manifest; and the traversal id-map is still rebuilt. + e_tag-less table locations/topology. Remaining: `optimize` now compacts the + internal metadata tables (`__manifest`, `_graph_commits`) too (RFC-013 step 2), + so a *periodically-optimized* graph keeps the probe/refresh/per-write scan flat + in history; but they are not yet brought into `cleanup` (version GC), so the + `_versions/` chain still grows until an explicit cleanup (the cleanup half is + deferred — it needs the Q8 cleanup-resurrection watermark first). The commit + graph is not yet reconcilable from the manifest; and the traversal id-map is + still rebuilt. - **Commit-graph parent under concurrency:** `record_graph_commit` now refreshes the commit-graph head from storage before appending, so a same-branch write after an external commit no longer forks the commit DAG by parenting off a diff --git a/docs/dev/rfc-013-write-path-latency.md b/docs/dev/rfc-013-write-path-latency.md index fa4abf3..37e6a8a 100644 --- a/docs/dev/rfc-013-write-path-latency.md +++ b/docs/dev/rfc-013-write-path-latency.md @@ -846,23 +846,60 @@ to flatten the curve. internal-table LOCK (step 2's red→green acceptance). *Still owed:* the prod `storage.ops` span metric (§5.3) and the bucket-gated `write_cost_s3.rs` opener LOCK (step 3a's red→green, S3-only per the §9-3a measurement note). -2. **Bound history — bring the INTERNAL tables into optimize/cleanup (a code - change, not just scheduling).** Today `optimize`/`cleanup` iterate **node/edge - keys only** (`optimize.rs:895-904`) — confirmed: the prototype's `cleanup --keep 3` - pruned "7 tables" = the node/edge data tables; `__manifest`/`_graph_commits` were - untouched **[M]**. So the residual +5/depth internal slope (§0b) is **not** fixed - by today's tooling — step 2 is a real `all_table_keys` change to add the internal - tables, then schedule compaction+cleanup (pass `--yes`; cleanup aborts on remote - otherwise). The pruning mechanism is proven on a data table (1035→63, 16× **[M]**); - the internal tables need the same inclusion. **Proven [M]:** compacting the - internal tables collapsed their scans `__manifest` 285→32, `_graph_commits` - 177→11; with step 3 a depth-87 edge drops **~1720 → 198 ops** (§2.4). (Separately, - node/edge cleanup **caps** the dominant data-table term as an interim *before* - step 3 — after step 3 that term is flat regardless.) **HARD PREREQUISITE:** the - Q8 boundary watermark must land **with** this step — Lance's version CAS is - confirmed vulnerable to cleanup-resurrection (§12 Q8, a silent lost write on - R2/S3), so scheduling cleanup without the watermark trades a latency bug for a - correctness bug. (`gap-read-path-rederivation` write twin.) +2. **Bound history — bring the INTERNAL tables into optimize/cleanup.** Split into + a compaction half (the latency win, safe) and a cleanup half (version GC, needs + the Q8 watermark). Validated (Lance docs + source): compaction *preserves* + versions and is the only term needed to flatten the per-write metadata scan; + cleanup is the separate version-deleting op that opens the Q8 hole. + - **2a. Internal-table compaction. ✅ LANDED.** `optimize` now compacts all + three internal tables — `__manifest`, `_graph_commits`, **and + `_graph_commit_actors`** (the actor table grows one fragment per commit on the + authenticated write path, so it carries the same O(depth) scan as the other + two and is compacted from one source-of-truth list with per-table existence + guards). `compact_internal_table` is a separate simpler path than + `optimize_one_table`: no manifest publish, no recovery sidecar. The sidecar-free + property does **not** rest on single-commit atomicity (`compact_files` can emit a + `ReserveFragments` commit before the `Rewrite`, and the auto-cleanup strip is a + further commit) — it holds because each of those commits is content-preserving + and the table is read at HEAD, so a crash leaves it readable and content-identical + and the next `optimize` re-plans. **Non-destructive by construction:** compaction + preserves versions, and before compacting it strips any stale `lance.auto_cleanup.*` + config off the table, so a graph created by an older binary (on-by-default GC + hook) cannot have the commit-time hook silently prune `__manifest`-pinned + versions during an `optimize` (current binaries store no such config; the + strip is the upgrade-path safety net). **The same strip now also runs on the + data-table path** (`optimize_one_table`), inside the Optimize sidecar window — + so `optimize` is non-destructive on node/edge tables too, not just the internal + ones (the data-table path was a pre-existing gap, since `compact_files`/ + `optimize_indices` there also commit with the auto-cleanup hook enabled). **Concurrency:** + no app lock on the internal path — and `compact_files` does *not* auto-retry a + semantic conflict against a concurrent live writer (Lance prescribes app-rerun for + `Rewrite` vs `Update`/`Merge`), so `compact_internal_table` runs a *bounded* + retry loop that reopens fresh and replans on a retryable Lance conflict (the + canonical Lance-consumer pattern); transient contention does not fail the live + publisher or the operator's `optimize`, but sustained contention past the budget + surfaces a loud conflict error (bounded + observable, not an infinite loop). The + data-table path instead holds the per-table write queue, so it never contends. A + coordinator `refresh` after the compaction restores cache coherence. The + `internal_table_scans_are_flat_in_history` LOCK is now green on the + **authenticated** write path: on a compacted graph a write's + `__manifest`/`_graph_commits`+`_graph_commit_actors` scan is flat in history + (measured `__manifest` 4→2, commit-graph+actors 10→2 across depth 10→100). + Compacts all three tables even though Phase 7 (`iss-991`) will later fold + `_graph_commits` into `__manifest` (one-call throwaway; full interim win until + then). **2a is also the hard prerequisite for Phase 7** (its `graph_head` CAS + contention is only acceptable once `__manifest` compaction bounds the + publisher's `load_publish_state` scan). + - **2b. Internal-table cleanup + Q8 watermark — DEFERRED** (debated; not bundled + with 2a). Cleanup is the version-deleting op that hits cleanup-resurrection + (§12 Q8: Lance's version CAS has no monotonic guard), so it must land **with** + a durable monotonic watermark (a Lance boundary tag — durable across cleanup, + `cleanup.rs` `is_tagged`). Deferred because it touches the read/open path + (a tag-floor clamp on every coordinator open), is the MTT-redundant part (MTT + may replace `__manifest`), and only buys the secondary version-count/space term + — whereas 2a delivers the dominant per-write scan win with zero resurrection + risk. Land it when the version-count cost bites or the Lance MTT timeline + clarifies. (`gap-read-path-rederivation` write twin.) 3. **The opener fix — a shippable lead + the structural follow-on.** - **3a. Opener bypass (standalone PR, THE dominant fix — [M] proven). ✅ LANDED.** `TableStore::open_dataset_head_for_write` now delegates to the direct diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 941cec6..ac5d4f0 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -26,7 +26,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: ` sentinel exempts reviewed lines | | `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands | | `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by synthetic snapshot-id incarnation or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below | -| `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest`/`_graph_commits` scans flat in commit-history depth — the RED `internal_table_scans_are_flat_in_history` LOCK, `#[ignore]`'d until internal-table compaction lands) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below | +| `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest`/`_graph_commits` scans flat in commit-history depth — `internal_table_scans_are_flat_in_history`, now **green every-PR** since RFC-013 step 2 brought the internal tables into `optimize`; the test compacts at each depth before measuring) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below | | `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary | | `lifecycle.rs` | Graph lifecycle, schema state | | `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) | diff --git a/docs/user/operations/maintenance.md b/docs/user/operations/maintenance.md index e2a88eb..d8df950 100644 --- a/docs/user/operations/maintenance.md +++ b/docs/user/operations/maintenance.md @@ -6,6 +6,8 @@ - Compacts every node + edge table on `main`, then reindexes them, then **publishes the resulting version to the `__manifest`** so the manifest's recorded version tracks the compacted-and-reindexed state. Reads pin the manifest version, so without this publish the work would be invisible to readers *and* would break the version precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually changed. - Rewrites small fragments into fewer large ones; old fragments remain reachable via older versions until `cleanup` runs. +- **Also compacts the internal system tables** `__manifest`, `_graph_commits`, and `_graph_commit_actors` (RFC-013 step 2), which accumulate one fragment per commit (the actor table only on the authenticated write path, where every commit carries an actor) and otherwise make every write's metadata scan grow with history. These take a simpler path than data tables: they are not `__manifest`-tracked (readers open them at their latest version), so compaction just advances their version in place — **no manifest publish and no recovery sidecar**. (The sidecar-free property is not because it is one commit — `compact_files` can emit a `ReserveFragments` commit before the `Rewrite`, and the auto-cleanup strip below is a further commit — but because every one of those commits is content-preserving and the table is read at its latest version, so a crash at any point leaves it readable and content-identical and the next `optimize` re-plans.) They appear in the returned stats under `table_key` `"__manifest"` / `"_graph_commits"` / `"_graph_commit_actors"` (the latter two only when present). They are **not yet covered by `cleanup`**, so their version chain still grows until the cleanup half lands (it requires a cleanup-resurrection safeguard first); run `optimize` on a cadence to keep per-write metadata scans flat. +- **`optimize` is non-destructive by construction — it never garbage-collects versions, on any table (data or internal).** Compaction rewrites fragments and advances the version; old versions stay reachable until you run `cleanup`. This holds even for a graph created by an older binary that stored an on-by-default Lance `auto_cleanup` hook: `compact_files` / `optimize_indices` commit with the hook enabled and expose no skip override, so before compacting **any** table `optimize` strips its stale `lance.auto_cleanup.*` config first, so Lance's commit-time GC hook cannot fire and silently prune `__manifest`-pinned versions. (Graphs created by current binaries store no such config; the strip is the upgrade-path safety net.) The internal-table path additionally tolerates a concurrent live writer: it runs a **bounded** rebase-and-retry, so transient contention does not fail the operator's `optimize` or the live write — but sustained contention past the retry budget surfaces a loud conflict error rather than looping forever (bounded and observable, not a silent give-up). The data-table path holds the per-table write queue while it compacts, so it does not contend with mutations on that table in the first place. - **Reindex (index coverage maintenance).** A scalar/FTS/vector index only covers the fragments it was built over. Rows appended after the index was built (e.g. by `load --mode merge`, whose commit does not rebuild an already-existing index) are scanned unindexed, and compaction itself rewrites fragments out of an index's coverage. `optimize` runs Lance's incremental `optimize_indices` after compaction to fold those fragments back in (a delta merge, not a full retrain), restoring full coverage so equality/range/traversal predicates stay index-accelerated. This is why a table with **no compaction work but stale index coverage still commits** a new version under `optimize`. Run `optimize` on a cadence at least as frequent as your freshness window so recently-loaded rows do not linger in the unindexed flat-scan tail. - **Create declared-but-missing indexes (the index reconciler).** `@index`/`@key` declares intent; `schema apply` records it but builds nothing, and `load`/`mutate` defer a column that cannot be built yet (a `Vector` column with no trainable vectors). `optimize` materializes any such declared-but-unbuilt index over the compacted layout — so it is the convergence path for an `@index` added after data exists, or a vector index whose embeddings arrived via a later `embed`. A column still not buildable (no vectors yet) is reported on the table's stat as `pending_indexes` (visible in `--json`), not treated as a failure; the next `optimize` retries. So `optimize` is the single operator-facing index reconciler: it compacts, restores coverage, **and** builds declared-but-missing indexes. - Each table's compact→reindex→publish serializes with concurrent mutations on the same table. A crash mid-operation is recovered automatically on the next open (both compaction and reindex are content-preserving, so roll-forward is always safe).