From 85a35629570d125fa3ec47e1b7e47f6195476da9 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 20 Jun 2026 17:28:52 +0200 Subject: [PATCH] feat(engine): compact the internal __manifest/_graph_commits tables in optimize `optimize` iterated node/edge catalog tables only, so the two internal system tables (`__manifest`, `_graph_commits`) accumulated one fragment per commit and were never compacted -- making every write's metadata scan O(fragments), which grows forever on a long-lived graph (RFC-013 step 2). `optimize_all_tables` now also compacts both internal tables via a new `compact_internal_table`. They are not catalog-tracked (readers open them at their latest Lance HEAD), so it is a much simpler path than `optimize_one_table`: compact in place, no manifest publish (nothing to publish to), no recovery sidecar (a single atomic Lance commit -- no HEAD-before-publish gap), and no optimize_indices (they carry no Lance index, only object_id's unenforced-PK metadata). No application lock: Lance's compact_files auto-retries its Rewrite against any concurrent writer (the canonical LanceDB pattern; Rewrite vs Append is compatible, vs Update a retryable same-fragment conflict Lance rebases), and a coordinator refresh afterwards makes the warm handle observe the compacted HEAD. Compacts both tables even though Phase 7 (iss-991) will later fold _graph_commits into __manifest -- a one-call throwaway for the full interim win; __manifest compaction is also the prerequisite for Phase 7's graph_head contention. Cleanup (version GC) of the internal tables is deliberately NOT included here: it needs the Q8 cleanup-resurrection watermark first (deferred). maintenance.rs: optimize now returns 6 stats (4 data + 2 internal); adds optimize_compacts_internal_tables (sheds fragments, leaks no recovery sidecar, graph coherent for reads + strict writes after). --- crates/omnigraph/src/db/manifest.rs | 3 +- crates/omnigraph/src/db/manifest/layout.rs | 2 +- crates/omnigraph/src/db/omnigraph/optimize.rs | 79 ++++++++++++++++++- crates/omnigraph/tests/maintenance.rs | 78 +++++++++++++++++- 4 files changed, 156 insertions(+), 6 deletions(-) diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 4c6410b..fa05b49 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -28,7 +28,8 @@ mod recovery; mod state; use graph::{init_manifest_graph, open_manifest_graph, snapshot_state_at}; -use layout::{manifest_uri, open_manifest_dataset, table_uri_for_path, type_name_hash}; +use layout::{open_manifest_dataset, table_uri_for_path, type_name_hash}; +pub(crate) use layout::manifest_uri; pub(crate) use metadata::TableVersionMetadata; #[cfg(test)] use metadata::{OMNIGRAPH_ROW_COUNT_KEY, table_version_metadata_for_state}; diff --git a/crates/omnigraph/src/db/manifest/layout.rs b/crates/omnigraph/src/db/manifest/layout.rs index f4ac09b..12894a7 100644 --- a/crates/omnigraph/src/db/manifest/layout.rs +++ b/crates/omnigraph/src/db/manifest/layout.rs @@ -15,7 +15,7 @@ pub(super) fn type_name_hash(name: &str) -> String { format!("{:016x}", h) } -pub(super) fn manifest_uri(root: &str) -> String { +pub(crate) fn manifest_uri(root: &str) -> String { format!("{}/{}", root.trim_end_matches('/'), MANIFEST_DIR) } diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 9a0a17f..b1f67cf 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -279,7 +279,27 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result Result { + let handle = db + .storage() + .open_dataset_head_for_write(table_key, &uri, None) + .await?; + let mut ds = handle.into_dataset(); + + let options = CompactionOptions::default(); + let plan = plan_compaction(&ds, &options) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + if plan.num_tasks() == 0 { + return Ok(TableOptimizeStats::compacted( + table_key.to_string(), + &CompactionMetrics::default(), + false, + )); + } + + let metrics = compact_files(&mut ds, options, None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + + // Cache coherence: re-open the warm coordinator's `__manifest`/`_graph_commits` + // handle at the compacted HEAD (they live in `db.coordinator`, not the + // data-table `runtime_cache`). + db.coordinator.write().await.refresh().await?; + + Ok(TableOptimizeStats::compacted( + table_key.to_string(), + &metrics, + true, + )) +} + /// Run Lance `cleanup_old_versions` on every node + edge table on `main`, /// using [`CleanupPolicyOptions`]. The latest manifest is always preserved /// regardless (Lance invariant). diff --git a/crates/omnigraph/tests/maintenance.rs b/crates/omnigraph/tests/maintenance.rs index 78e31fa..410179c 100644 --- a/crates/omnigraph/tests/maintenance.rs +++ b/crates/omnigraph/tests/maintenance.rs @@ -94,13 +94,22 @@ async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() { let stats = db.optimize().await.unwrap(); - // Schema declares 2 nodes + 2 edges = 4 tables. Compaction should run on - // each but find nothing to merge. - assert_eq!(stats.len(), 4); + // Schema declares 2 nodes + 2 edges = 4 data tables, plus the 2 internal + // system tables (`__manifest`, `_graph_commits`) optimize also compacts + // (RFC-013 step 2) = 6. Compaction should run on each but find nothing to merge. + assert_eq!(stats.len(), 6); for s in &stats { assert_eq!(s.fragments_removed, 0, "{} should not remove", s.table_key); assert_eq!(s.fragments_added, 0, "{} should not add", s.table_key); } + // The internal tables are present and reported as no-ops on an empty graph. + for key in ["__manifest", "_graph_commits"] { + let s = stats + .iter() + .find(|s| s.table_key == key) + .unwrap_or_else(|| panic!("optimize stats missing internal table {key}")); + assert!(!s.committed, "{key} should be a no-op on an empty graph"); + } } #[tokio::test] @@ -133,6 +142,69 @@ async fn optimize_after_load_then_again_is_idempotent() { } } +/// RFC-013 step 2: `optimize` compacts the internal system tables +/// (`__manifest`, `_graph_commits`), which accumulate one fragment per commit. +/// After compaction they shed fragments, write no recovery sidecar (a single +/// atomic Lance commit — no HEAD-before-publish gap), and the graph stays +/// coherent for subsequent reads + strict writes. +#[tokio::test] +async fn optimize_compacts_internal_tables() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + // Build version-history depth so the internal tables accumulate fragments. + for i in 0..20 { + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", &format!("p{i}"))], &[("$age", 30)]), + ) + .await + .unwrap(); + } + + let stats = db.optimize().await.unwrap(); + + for key in ["__manifest", "_graph_commits"] { + let s = stats + .iter() + .find(|s| s.table_key == key) + .unwrap_or_else(|| panic!("optimize stats missing internal table {key}")); + assert!(s.committed, "{key} should compact after 20 commits"); + assert!( + s.fragments_removed > 0, + "{key} should shed fragments, removed {}", + s.fragments_removed + ); + } + + // Internal compaction leaks no recovery sidecar. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let leftover: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .map(|e| e.file_name()) + .collect(); + assert!( + leftover.is_empty(), + "optimize leaked recovery sidecars: {leftover:?}" + ); + } + + // Coherent after internal compaction: reads + a strict write still work. + assert!(count_rows(&db, "node:Person").await > 0); + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "after_compact")], &[("$age", 40)]), + ) + .await + .unwrap(); +} + // PR3 (Workstream B): an existing scalar index does not cover fragments // appended after it was built (build_indices is existence-gated), so those // rows are scanned unindexed. `optimize` must fold them back in via Lance's