From d0e39e677e3ba77d8a74f5a52f40244aa2d25787 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Tue, 9 Jun 2026 14:42:54 +0200 Subject: [PATCH] fix(maintenance): route uncovered drift through repair (#156) * docs(invariants): note the non-atomic manifest->commit-graph publish gap Every graph publish commits __manifest then appends _graph_commits as two separate writes; a crash between them leaves the manifest ahead of the commit DAG. Live reads + durability are unaffected (reads resolve via the manifest) and recovery does not repair it; impact is bounded to commit history / time-travel by commit id / merge-base completeness. Pre-existing across all publishes, not the optimize reconcile specifically. Documented as a Known Gap; the fix is a commit-graph reconcilable from the manifest, not a recovery sidecar. * fix(maintenance): route uncovered drift through repair * fix(maintenance): harden repair review feedback --- AGENTS.md | 9 +- crates/omnigraph-cli/src/main.rs | 104 ++++++ crates/omnigraph-cli/tests/cli.rs | 97 +++++ crates/omnigraph/src/db/mod.rs | 5 +- crates/omnigraph/src/db/omnigraph.rs | 21 ++ crates/omnigraph/src/db/omnigraph/optimize.rs | 79 +++- crates/omnigraph/src/db/omnigraph/repair.rs | 332 +++++++++++++++++ crates/omnigraph/src/exec/mutation.rs | 3 +- crates/omnigraph/src/exec/staging.rs | 55 ++- .../omnigraph/tests/lance_surface_guards.rs | 33 +- crates/omnigraph/tests/maintenance.rs | 345 +++++++++++++++++- crates/omnigraph/tests/writes.rs | 77 ++-- docs/dev/invariants.md | 14 + docs/dev/testing.md | 4 +- docs/user/cli-reference.md | 6 +- docs/user/maintenance.md | 17 +- 16 files changed, 1108 insertions(+), 93 deletions(-) create mode 100644 crates/omnigraph/src/db/omnigraph/repair.rs diff --git a/AGENTS.md b/AGENTS.md index 3f5b711..69272f8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -214,8 +214,12 @@ omnigraph schema apply --schema ./next.pg s3://my-bucket/graph.omni --json # Merge review branch back omnigraph branch merge review/2026-04-25 --into main s3://my-bucket/graph.omni -# Compact + GC (preview, then confirm) +# Compact, preview any uncovered drift, then repair/GC after review omnigraph optimize s3://my-bucket/graph.omni +omnigraph repair s3://my-bucket/graph.omni +omnigraph repair --confirm s3://my-bucket/graph.omni +# For suspicious/unverifiable drift only after deliberate review: +# omnigraph repair --force --confirm s3://my-bucket/graph.omni omnigraph cleanup --keep 10 --older-than 7d s3://my-bucket/graph.omni omnigraph cleanup --keep 10 --older-than 7d --confirm s3://my-bucket/graph.omni @@ -237,7 +241,8 @@ omnigraph policy explain --actor act-alice --action change --branch main | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | | Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). Continuous in-process recovery (no restart needed between Phase B failure and recovery) is the goal of a future background reconciler. Engine writes route through a sealed `TableStorage` trait exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and the migration of every call site completes. | -| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending — recovery may roll back a partial write, so optimize requires `manifest == HEAD` going in); **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) | +| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending); **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair` instead of interpreting it; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) | +| Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | | BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches | | `merge_insert` upsert | ✅ | `LoadMode::Merge`, mutation `update`/`insert`/`delete` lowering | diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 29b55c4..fec75f1 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -283,6 +283,25 @@ enum Command { #[arg(long)] json: bool, }, + /// Classify and explicitly repair manifest/head drift + Repair { + /// Graph URI + uri: Option, + #[arg(long)] + target: Option, + #[arg(long)] + config: Option, + /// Publish verified maintenance drift. Without this flag, repair only + /// previews what it would do. + #[arg(long)] + confirm: bool, + /// Also publish suspicious or unverifiable drift. Requires + /// `--confirm`; use only after operator review. + #[arg(long, requires = "confirm")] + force: bool, + #[arg(long)] + json: bool, + }, /// Remove old Lance versions from every table of the graph (destructive) Cleanup { /// Graph URI @@ -3012,6 +3031,8 @@ async fn main() -> Result<()> { "fragments_added": s.fragments_added, "committed": s.committed, "skipped": s.skipped.map(|r| r.as_str()), + "manifest_version": s.manifest_version, + "lance_head_version": s.lance_head_version, })).collect::>(), }); print_json(&value)?; @@ -3031,6 +3052,89 @@ async fn main() -> Result<()> { } } } + Command::Repair { + uri, + target, + config, + confirm, + force, + json, + } => { + let config = load_cli_config(config.as_ref())?; + let uri = resolve_uri(&config, uri, target.as_deref())?; + let db = Omnigraph::open(&uri).await?; + let stats = db + .repair(omnigraph::db::RepairOptions { confirm, force }) + .await?; + let refused_count = stats + .tables + .iter() + .filter(|s| matches!(s.action, omnigraph::db::RepairAction::Refused)) + .count(); + if json { + let value = serde_json::json!({ + "uri": uri, + "confirm": confirm, + "force": force, + "manifest_version": stats.manifest_version, + "tables": stats.tables.iter().map(|s| serde_json::json!({ + "table_key": s.table_key, + "manifest_version": s.manifest_version, + "lance_head_version": s.lance_head_version, + "classification": s.classification.as_str(), + "action": s.action.as_str(), + "operations": s.operations, + "error": s.error, + })).collect::>(), + }); + print_json(&value)?; + } else { + let mode = if confirm { "confirm" } else { "preview" }; + println!( + "repair {} — {} mode, {} tables", + uri, + mode, + stats.tables.len() + ); + for s in &stats.tables { + let drift = if s.manifest_version == s.lance_head_version { + format!("{}", s.manifest_version) + } else { + format!("{} → {}", s.manifest_version, s.lance_head_version) + }; + let ops = if s.operations.is_empty() { + String::new() + } else { + format!(" [{}]", s.operations.join(", ")) + }; + let err = s + .error + .as_ref() + .map(|err| format!(" ({err})")) + .unwrap_or_default(); + println!( + " {:<40} {:<12} {:<22} {}{}{}", + s.table_key, + s.action.as_str(), + s.classification.as_str(), + drift, + ops, + err + ); + } + if !confirm { + println!("rerun with --confirm to publish verified maintenance drift"); + } + } + if refused_count > 0 { + bail!( + "repair refused {} suspicious or unverifiable table(s); review the preview \ + output and rerun with --force --confirm only if publishing that drift is \ + intentional", + refused_count + ); + } + } Command::Cleanup { uri, target, diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 9682d9a..26a1a65 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1,5 +1,6 @@ use std::fs; +use lance::Dataset; use lance::index::DatasetIndexExt; use omnigraph::db::{Omnigraph, ReadTarget}; use serde_json::Value; @@ -60,6 +61,25 @@ fn manifest_dataset_version(graph: &std::path::Path) -> u64 { }) } +fn forge_person_delete_drift(graph: &std::path::Path) -> (u64, u64) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let uri = graph.to_string_lossy(); + let db = Omnigraph::open(uri.as_ref()).await.unwrap(); + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap(); + let entry = snap.entry("node:Person").unwrap(); + let full_path = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path); + let mut ds = Dataset::open(&full_path).await.unwrap(); + let deleted = ds.delete("name = 'Alice'").await.unwrap(); + assert_eq!(deleted.num_deleted_rows, 1); + let head = deleted.new_dataset.version().version; + assert!(head > entry.table_version); + (entry.table_version, head) + }) +} + fn write_policy_config_fixture(root: &std::path::Path) -> (std::path::PathBuf, std::path::PathBuf) { let config = root.join("omnigraph.yaml"); let policy = root.join("policy.yaml"); @@ -235,6 +255,83 @@ fn init_creates_graph_successfully_on_missing_local_directory() { assert!(temp.path().join("omnigraph.yaml").exists()); } +#[test] +fn repair_json_reports_noop_on_clean_graph() { + let temp = tempdir().unwrap(); + let graph = graph_path(temp.path()); + init_graph(&graph); + load_fixture(&graph); + + let output = output_success(cli().arg("repair").arg("--json").arg(&graph)); + let payload: Value = serde_json::from_slice(&output.stdout).unwrap(); + + assert_eq!(payload["confirm"], false); + assert_eq!(payload["force"], false); + assert_eq!(payload["manifest_version"], Value::Null); + let tables = payload["tables"].as_array().unwrap(); + assert_eq!(tables.len(), 4); + assert!(tables.iter().all(|table| { + table["classification"] == "no_drift" && table["action"] == "no_op" + })); +} + +#[test] +fn repair_confirm_json_refuses_suspicious_drift_with_nonzero_exit_then_force_succeeds() { + let temp = tempdir().unwrap(); + let graph = graph_path(temp.path()); + init_graph(&graph); + load_fixture(&graph); + let graph_manifest_before = manifest_dataset_version(&graph); + let (table_manifest_before, table_head_before) = forge_person_delete_drift(&graph); + + let refused = output_failure( + cli() + .arg("repair") + .arg("--confirm") + .arg("--json") + .arg(&graph), + ); + let refused_payload: Value = serde_json::from_slice(&refused.stdout).unwrap(); + assert_eq!(refused_payload["manifest_version"], Value::Null); + let person = refused_payload["tables"] + .as_array() + .unwrap() + .iter() + .find(|table| table["table_key"] == "node:Person") + .unwrap(); + assert_eq!(person["classification"], "suspicious"); + assert_eq!(person["action"], "refused"); + assert!( + String::from_utf8_lossy(&refused.stderr).contains("repair refused"), + "stderr should explain the non-zero exit; got: {}", + String::from_utf8_lossy(&refused.stderr) + ); + assert_eq!(manifest_dataset_version(&graph), graph_manifest_before); + + let forced = output_success( + cli() + .arg("repair") + .arg("--force") + .arg("--confirm") + .arg("--json") + .arg(&graph), + ); + let forced_payload: Value = serde_json::from_slice(&forced.stdout).unwrap(); + let forced_manifest = forced_payload["manifest_version"].as_u64().unwrap(); + assert!(forced_manifest > graph_manifest_before); + let person = forced_payload["tables"] + .as_array() + .unwrap() + .iter() + .find(|table| table["table_key"] == "node:Person") + .unwrap(); + assert_eq!(person["classification"], "suspicious"); + assert_eq!(person["action"], "forced"); + assert_eq!(person["manifest_version"], table_manifest_before); + assert_eq!(person["lance_head_version"], table_head_before); + assert_eq!(manifest_dataset_version(&graph), forced_manifest); +} + #[test] fn schema_plan_json_reports_supported_additive_change() { let temp = tempdir().unwrap(); diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index 13e1c74..000602a 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -11,8 +11,9 @@ pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, Snapsh pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate}; pub(crate) use omnigraph::ensure_public_branch_ref; pub use omnigraph::{ - CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions, - SchemaApplyResult, SkipReason, TableCleanupStats, TableOptimizeStats, + CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, RepairAction, + RepairClassification, RepairOptions, RepairStats, SchemaApplyOptions, SchemaApplyResult, + SkipReason, TableCleanupStats, TableOptimizeStats, TableRepairStats, }; pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__"; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index ba2b70e..5bcc973 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -30,10 +30,14 @@ use crate::table_store::TableStore; mod export; mod optimize; +mod repair; mod schema_apply; mod table_ops; pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats}; +pub use repair::{ + RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats, +}; pub use schema_apply::SchemaApplyOptions; use super::commit_graph::GraphCommit; @@ -682,6 +686,16 @@ impl Omnigraph { .map(|resolved| resolved.snapshot) } + pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result { + self.ensure_schema_state_valid().await?; + let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string()); + let coord = self.coordinator.read().await; + coord + .resolve_target(&requested) + .await + .map(|resolved| resolved.snapshot) + } + pub(crate) async fn version(&self) -> u64 { self.coordinator.read().await.version() } @@ -999,6 +1013,13 @@ impl Omnigraph { optimize::optimize_all_tables(self).await } + /// Classify and explicitly repair uncovered manifest/head drift. See + /// [`repair`] for the distinction between safe maintenance drift and + /// suspicious/unverifiable drift. + pub async fn repair(&self, options: repair::RepairOptions) -> Result { + repair::repair_all_tables(self, options).await + } + /// Remove Lance manifests (and the fragments they uniquely own) per the /// given [`optimize::CleanupPolicyOptions`]. Destructive to version /// history. See [`optimize`] for details. diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index ee39323..3c37b66 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -75,8 +75,7 @@ pub struct CleanupPolicyOptions { } /// Why `optimize` did not compact a table. Typed so callers branch on the -/// reason rather than sniffing a string. One variant today, gated by -/// [`LANCE_SUPPORTS_BLOB_COMPACTION`]. +/// reason rather than sniffing a string. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[non_exhaustive] pub enum SkipReason { @@ -84,6 +83,12 @@ pub enum SkipReason { /// `BlobHandling::AllBinary`, which mis-decodes blob-v2 columns; see /// [`LANCE_SUPPORTS_BLOB_COMPACTION`] and `docs/dev/lance.md`. BlobColumnsUnsupportedByLance, + /// The Lance dataset HEAD is ahead of the version recorded in + /// `__manifest`, and no recovery sidecar covers that movement. `optimize` + /// cannot infer whether the drift is benign maintenance or an external + /// semantic write, so it leaves the table untouched and points operators at + /// explicit `repair`. + DriftNeedsRepair, } impl SkipReason { @@ -92,6 +97,7 @@ impl SkipReason { pub fn as_str(&self) -> &'static str { match self { SkipReason::BlobColumnsUnsupportedByLance => "blob_columns_unsupported_by_lance", + SkipReason::DriftNeedsRepair => "drift_needs_repair", } } } @@ -103,6 +109,7 @@ impl std::fmt::Display for SkipReason { SkipReason::BlobColumnsUnsupportedByLance => { "blob columns — Lance compaction unsupported" } + SkipReason::DriftNeedsRepair => "manifest/head drift — run omnigraph repair", }; f.write_str(msg) } @@ -125,6 +132,12 @@ pub struct TableOptimizeStats { /// `Some(reason)` if this table was deliberately not compacted. When set, /// `fragments_removed == 0`, `fragments_added == 0`, and `!committed`. pub skipped: Option, + /// Manifest table version observed by optimize for drift skips. `None` for + /// normal compaction/no-op/blob skips. + pub manifest_version: Option, + /// Lance HEAD version observed by optimize for drift skips. `None` for + /// normal compaction/no-op/blob skips. + pub lance_head_version: Option, } impl TableOptimizeStats { @@ -136,6 +149,8 @@ impl TableOptimizeStats { fragments_added: metrics.fragments_added, committed, skipped: None, + manifest_version: None, + lance_head_version: None, } } @@ -147,6 +162,25 @@ impl TableOptimizeStats { fragments_added: 0, committed: false, skipped: Some(reason), + manifest_version: None, + lance_head_version: None, + } + } + + /// Stat for a table skipped because the manifest and Lance HEAD disagree. + fn skipped_for_drift( + table_key: String, + manifest_version: u64, + lance_head_version: u64, + ) -> Self { + Self { + table_key, + fragments_removed: 0, + fragments_added: 0, + committed: false, + skipped: Some(SkipReason::DriftNeedsRepair), + manifest_version: Some(manifest_version), + lance_head_version: Some(lance_head_version), } } } @@ -185,8 +219,7 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result Result { // Lance `compact_files` mis-decodes blob-v2 columns under the forced // `BlobHandling::AllBinary` read (see LANCE_SUPPORTS_BLOB_COMPACTION). Skip - // blob-bearing tables and report it rather than aborting the whole sweep. + // blob-bearing tables before acquiring the write queue; `repair` is the + // operator tool for full manifest/head drift classification. if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION { tracing::warn!( target: "omnigraph::optimize", @@ -291,20 +325,41 @@ async fn optimize_one_table( // CAS baseline: the table's current manifest version, read under the queue // (in-memory coordinator snapshot, no storage I/O — stable for this section). let expected_version = db - .snapshot() - .await + .fresh_snapshot_for_branch(None) + .await? .entry(&table_key) .map(|e| e.table_version) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + let lance_head_version = ds.version().version; + if lance_head_version < expected_version { + return Err(OmniError::manifest_internal(format!( + "table '{}' Lance HEAD version {} is behind manifest version {}", + table_key, lance_head_version, expected_version + ))); + } + if lance_head_version > expected_version { + tracing::warn!( + target: "omnigraph::optimize", + table = %table_key, + manifest_version = expected_version, + lance_head_version, + "skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \ + to classify and publish covered maintenance drift explicitly", + ); + return Ok(TableOptimizeStats::skipped_for_drift( + table_key, + expected_version, + lance_head_version, + )); + } + // Precise "will it compact?" check — `plan_compaction` also accounts for // deletion materialization (which can rewrite even a single fragment). A // steady-state already-compacted table yields an empty plan and is never // pinned in a sidecar (a zero-commit pin would classify NoMovement on - // recovery and force an all-or-nothing rollback). There is no drift to - // reconcile here: optimize runs only on a recovered graph (the pending- - // sidecar guard above), and recovery roll-back now publishes, so - // `HEAD == manifest` holds going in. + // recovery and force an all-or-nothing rollback). Uncovered pre-existing + // drift is skipped above and must go through explicit repair. let options = CompactionOptions::default(); let plan = plan_compaction(&ds, &options) .await @@ -641,7 +696,7 @@ fn orphan_branches(present: Vec, keep: &std::collections::HashSet Vec { +pub(super) fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec { let mut keys: Vec = catalog .node_types .keys() diff --git a/crates/omnigraph/src/db/omnigraph/repair.rs b/crates/omnigraph/src/db/omnigraph/repair.rs new file mode 100644 index 0000000..aaef2ba --- /dev/null +++ b/crates/omnigraph/src/db/omnigraph/repair.rs @@ -0,0 +1,332 @@ +//! Explicit repair for uncovered manifest/head drift. +//! +//! Recovery sidecars handle deterministic crash residuals automatically. This +//! module is for the different case: a table's Lance HEAD is ahead of the +//! version recorded in `__manifest` and there is no sidecar encoding writer +//! intent. `repair` classifies that uncovered drift from Lance transactions and +//! only auto-publishes maintenance-only drift when the operator confirms. + +use std::collections::HashMap; + +use lance::Dataset; +use lance::dataset::transaction::Operation; + +use super::*; + +/// Options for [`Omnigraph::repair`]. +#[derive(Debug, Clone, Copy, Default)] +pub struct RepairOptions { + /// Preview by default. With `confirm`, verified maintenance drift is + /// published to `__manifest`. + pub confirm: bool, + /// Also publish suspicious/unverifiable drift. Requires `confirm`. + pub force: bool, +} + +/// Classification of a table's manifest/head state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum RepairClassification { + /// Lance HEAD equals the manifest pin. + NoDrift, + /// Every uncovered Lance transaction is maintenance-only (`Rewrite` or + /// `ReserveFragments`), so publishing the HEAD is content-preserving. + VerifiedMaintenance, + /// At least one uncovered transaction is semantic (`Append`, `Delete`, + /// `Update`, etc.). + Suspicious, + /// A needed transaction could not be read, so the drift cannot be judged. + Unverifiable, +} + +impl RepairClassification { + /// Stable machine-readable token for serialized output. + pub fn as_str(&self) -> &'static str { + match self { + Self::NoDrift => "no_drift", + Self::VerifiedMaintenance => "verified_maintenance", + Self::Suspicious => "suspicious", + Self::Unverifiable => "unverifiable", + } + } +} + +impl std::fmt::Display for RepairClassification { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// What repair did for a table. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum RepairAction { + /// Nothing to do. + NoOp, + /// Drift was reported but not published because this was a preview. + Preview, + /// Verified maintenance drift was published to `__manifest`. + Healed, + /// Suspicious/unverifiable drift was published because `force` was set. + Forced, + /// Drift was left untouched because it was not safe to publish without + /// `force`. + Refused, +} + +impl RepairAction { + /// Stable machine-readable token for serialized output. + pub fn as_str(&self) -> &'static str { + match self { + Self::NoOp => "no_op", + Self::Preview => "preview", + Self::Healed => "healed", + Self::Forced => "forced", + Self::Refused => "refused", + } + } +} + +impl std::fmt::Display for RepairAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Per-table repair outcome. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct TableRepairStats { + pub table_key: String, + pub manifest_version: u64, + pub lance_head_version: u64, + pub classification: RepairClassification, + pub action: RepairAction, + pub operations: Vec, + pub error: Option, +} + +/// Whole-graph repair outcome. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct RepairStats { + pub tables: Vec, + /// New graph manifest version if repair published any table pins. + pub manifest_version: Option, +} + +struct ClassificationResult { + classification: RepairClassification, + operations: Vec, + error: Option, +} + +pub async fn repair_all_tables(db: &Omnigraph, options: RepairOptions) -> Result { + if options.force && !options.confirm { + return Err(OmniError::manifest("repair --force requires --confirm")); + } + + db.ensure_schema_state_valid().await?; + db.ensure_schema_apply_idle("repair").await?; + ensure_no_pending_recovery_sidecars(db, "repair").await?; + + let snapshot = db.fresh_snapshot_for_branch(None).await?; + let table_tasks: Vec<(String, String)> = { + let catalog = db.catalog(); + let mut tasks = Vec::new(); + for table_key in optimize::all_table_keys(&catalog) { + let Some(entry) = snapshot.entry(&table_key) else { + continue; + }; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + tasks.push((table_key, full_path)); + } + tasks + }; + + if table_tasks.is_empty() { + return Ok(RepairStats { + tables: Vec::new(), + manifest_version: None, + }); + } + + let queue_keys: Vec<(String, Option)> = table_tasks + .iter() + .map(|(table_key, _)| (table_key.clone(), None)) + .collect(); + let _guards = db.write_queue().acquire_many(&queue_keys).await; + ensure_no_pending_recovery_sidecars(db, "repair").await?; + + let snapshot = db.fresh_snapshot_for_branch(None).await?; + let mut tables = Vec::with_capacity(table_tasks.len()); + let mut updates = Vec::new(); + let mut expected = HashMap::new(); + let mut any_forced = false; + + for (table_key, full_path) in table_tasks { + let ds = db + .table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?; + let manifest_version = snapshot + .entry(&table_key) + .map(|e| e.table_version) + .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + let lance_head_version = ds.version().version; + + if lance_head_version < manifest_version { + return Err(OmniError::manifest_internal(format!( + "table '{}' Lance HEAD version {} is behind manifest version {}", + table_key, lance_head_version, manifest_version + ))); + } + + if lance_head_version == manifest_version { + tables.push(TableRepairStats { + table_key, + manifest_version, + lance_head_version, + classification: RepairClassification::NoDrift, + action: RepairAction::NoOp, + operations: Vec::new(), + error: None, + }); + continue; + } + + let classification = classify_drift(&ds, manifest_version, lance_head_version).await; + let action = match ( + options.confirm, + options.force, + classification.classification, + ) { + (false, _, _) => RepairAction::Preview, + (true, _, RepairClassification::VerifiedMaintenance) => RepairAction::Healed, + (true, true, RepairClassification::Suspicious | RepairClassification::Unverifiable) => { + any_forced = true; + RepairAction::Forced + } + (true, _, RepairClassification::Suspicious | RepairClassification::Unverifiable) => { + RepairAction::Refused + } + (true, _, RepairClassification::NoDrift) => RepairAction::NoOp, + }; + + if matches!(action, RepairAction::Healed | RepairAction::Forced) { + let state = db.table_store.table_state(&full_path, &ds).await?; + updates.push(crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }); + expected.insert(table_key.clone(), manifest_version); + } + + tables.push(TableRepairStats { + table_key, + manifest_version, + lance_head_version, + classification: classification.classification, + action, + operations: classification.operations, + error: classification.error, + }); + } + + let manifest_version = if updates.is_empty() { + None + } else { + let actor = if any_forced { + Some("omnigraph:repair:force") + } else { + Some("omnigraph:repair") + }; + let PublishedSnapshot { + manifest_version, + _snapshot_id: _, + } = db + .coordinator + .write() + .await + .commit_updates_with_actor_with_expected(&updates, &expected, actor) + .await?; + db.runtime_cache.invalidate_all().await; + if updates + .iter() + .any(|update| update.table_key.starts_with("edge:")) + { + db.invalidate_graph_index().await; + } + Some(manifest_version) + }; + + Ok(RepairStats { + tables, + manifest_version, + }) +} + +async fn ensure_no_pending_recovery_sidecars(db: &Omnigraph, operation: &str) -> Result<()> { + if !crate::db::manifest::list_sidecars(db.root_uri(), db.storage_adapter()) + .await? + .is_empty() + { + return Err(OmniError::manifest_conflict(format!( + "{operation} requires a clean recovery state; reopen the graph to run the \ + recovery sweep before repairing" + ))); + } + Ok(()) +} + +async fn classify_drift( + ds: &Dataset, + manifest_version: u64, + lance_head_version: u64, +) -> ClassificationResult { + let mut operations = Vec::new(); + let mut saw_suspicious = false; + let mut error = None; + + for version in manifest_version.saturating_add(1)..=lance_head_version { + match ds.read_transaction_by_version(version).await { + Ok(Some(transaction)) => { + let operation = transaction.operation; + operations.push(operation.name().to_string()); + if !matches!( + operation, + Operation::Rewrite { .. } | Operation::ReserveFragments { .. } + ) { + saw_suspicious = true; + } + } + Ok(None) => { + error = Some(format!("missing Lance transaction for version {version}")); + break; + } + Err(err) => { + error = Some(format!( + "failed to read Lance transaction for version {version}: {err}" + )); + break; + } + } + } + + let classification = if error.is_some() { + RepairClassification::Unverifiable + } else if saw_suspicious { + RepairClassification::Suspicious + } else { + RepairClassification::VerifiedMaintenance + }; + + ClassificationResult { + classification, + operations, + error, + } +} diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 02b2a21..985889a 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -569,7 +569,8 @@ use super::staging::{MutationStaging, PendingMode}; /// via `open_for_mutation_on_branch`, which compares Lance HEAD against /// the manifest's pinned version — that fence is the engine's /// publisher-style OCC catching cross-writer drift before we make any -/// changes. +/// changes. For delete-only queries, this strict open is also the uncovered +/// drift guard that runs before `delete_where` can inline-commit. /// /// On subsequent touches *within the same query*, behavior depends on /// whether the table has already been inline-committed by a delete op: diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 0d26fd3..264ab59 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -495,25 +495,21 @@ impl StagedMutation { // until `ensure_path` learns how to bump expected_version on // op-kind upgrade. // - // Why per-branch (and not the bound-branch `db.snapshot()`): - // when the caller mutates a branch other than the engine's - // bound branch (e.g., feature-branch ingest from a server - // handle bound to main), `db.snapshot()` returns the bound - // branch's view of each table — which is the wrong pin for - // the publisher's CAS on a different branch. Using - // `snapshot_for_branch(branch)` resolves the per-branch - // entries correctly. The cost is one fresh manifest read per - // mutation; PR 1b's regression came from this same read, but - // that read is now strictly necessary for cross-branch - // correctness. Single-table same-branch mutations could still - // skip this read (queue exclusivity makes the publisher CAS a - // no-op), but the conditional adds complexity for marginal - // gain — left as a follow-up perf optimization. + // Why a fresh per-branch snapshot (and not the bound-branch + // `db.snapshot()` / `snapshot_for_branch()` fast path): a stale + // engine handle may be bound to the same branch it is writing. For + // non-strict Insert/Merge, that stale local view is allowed to rebase + // to the live manifest pin under the queue; only uncovered Lance + // HEAD>manifest drift is refused. For writes targeting a branch other + // than the engine's bound branch (e.g., feature-branch ingest from a + // server handle bound to main), the same helper also resolves the + // correct branch pin. The cost is one fresh manifest read per mutation + // plus one Lance HEAD open per staged table for the drift guard below. // // Multi-coordinator deployments (§VI.27 aspirational) get // genuine cross-process drift detection from this read for // free. - let snapshot = db.snapshot_for_branch(branch).await?; + let snapshot = db.fresh_snapshot_for_branch(branch).await?; for entry in staged.iter_mut() { let current = snapshot .entry(&entry.table_key) @@ -541,6 +537,35 @@ impl StagedMutation { )); } + // Separate manifest-visible concurrency from uncovered Lance drift. + // Non-strict inserts/merges are allowed to rebase from their staged + // read version to the fresh manifest pin above, but only if the + // live Lance HEAD still equals that manifest pin. If an external + // raw Lance write or a pre-fix maintenance path moved HEAD without + // publishing `__manifest`, this write must not silently fold it. + let head = db + .table_store() + .open_dataset_head_for_write( + &entry.table_key, + &entry.path.full_path, + entry.path.table_branch.as_deref(), + ) + .await? + .version() + .version; + if head < current { + return Err(OmniError::manifest_internal(format!( + "table '{}' Lance HEAD version {} is behind manifest version {}", + entry.table_key, head, current + ))); + } + if head > current { + return Err(OmniError::manifest_conflict(format!( + "table '{}' has Lance HEAD version {} ahead of manifest version {}; run `omnigraph repair` before writing", + entry.table_key, head, current + ))); + } + entry.expected_version = current; expected_versions.insert(entry.table_key.clone(), current); } diff --git a/crates/omnigraph/tests/lance_surface_guards.rs b/crates/omnigraph/tests/lance_surface_guards.rs index 1d60c08..65efc4e 100644 --- a/crates/omnigraph/tests/lance_surface_guards.rs +++ b/crates/omnigraph/tests/lance_surface_guards.rs @@ -30,6 +30,7 @@ use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; use lance::dataset::builder::DatasetBuilder; use lance::dataset::optimize::{CompactionOptions, compact_files}; +use lance::dataset::transaction::Operation; use lance::dataset::write::delete::DeleteResult; use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams}; use lance_file::version::LanceFileVersion; @@ -222,6 +223,33 @@ async fn _compile_compact_files_signature() -> lance::Result<()> { Ok(()) } +// --- Guard 7b: transaction history exposes repair's classification surface - +// +// `db/omnigraph/repair.rs` reads Lance transactions between manifest and HEAD +// and treats only `ReserveFragments` + `Rewrite` as safe maintenance drift. +// Compile-only. + +#[allow( + dead_code, + unreachable_code, + unused_variables, + unused_mut, + clippy::diverging_sub_expression +)] +async fn _compile_transaction_history_for_repair_signature() -> lance::Result<()> { + let ds: Dataset = unimplemented!(); + let tx = ds.read_transaction_by_version(1u64).await?; + if let Some(tx) = tx { + let operation = tx.operation; + let _name: &str = operation.name(); + match operation { + Operation::Rewrite { .. } | Operation::ReserveFragments { .. } => {} + _ => {} + } + } + Ok(()) +} + // --- Guard 8: Dataset::delete returns DeleteResult { new_dataset, num_deleted_rows } --- // // `table_store.rs::delete_where` consumes both fields. When MR-A migrates @@ -329,7 +357,10 @@ async fn compact_files_still_fails_on_blob_columns() { ])); RecordBatch::try_new( schema, - vec![Arc::new(StringArray::from(ids)) as _, Arc::new(content) as _], + vec![ + Arc::new(StringArray::from(ids)) as _, + Arc::new(content) as _, + ], ) .unwrap() } diff --git a/crates/omnigraph/tests/maintenance.rs b/crates/omnigraph/tests/maintenance.rs index 2a5a659..13c9de7 100644 --- a/crates/omnigraph/tests/maintenance.rs +++ b/crates/omnigraph/tests/maintenance.rs @@ -8,7 +8,11 @@ mod helpers; use std::time::Duration; use lance::Dataset; -use omnigraph::db::{CleanupPolicyOptions, Omnigraph, ReadTarget, SkipReason}; +use lance::dataset::optimize::{CompactionOptions, compact_files}; +use omnigraph::db::{ + CleanupPolicyOptions, Omnigraph, ReadTarget, RepairAction, RepairClassification, RepairOptions, + SkipReason, +}; use omnigraph::loader::{LoadMode, load_jsonl}; use helpers::{ @@ -27,11 +31,64 @@ fn node_table_uri(root: &str, type_name: &str) -> String { format!("{}/nodes/{hash:016x}", root.trim_end_matches('/')) } +async fn person_manifest_and_head(db: &Omnigraph, root: &str) -> (u64, u64, String) { + let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let entry = snap.entry("node:Person").unwrap(); + let full = format!("{}/{}", root.trim_end_matches('/'), entry.table_path); + let head = Dataset::open(&full).await.unwrap().version().version; + (entry.table_version, head, full) +} + +async fn add_person_fragments(db: &mut Omnigraph) { + for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42), ("Heidi", 43)] { + mutate_main( + db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age as i64)]), + ) + .await + .expect("insert"); + } +} + +async fn forge_person_compaction_drift(db: &mut Omnigraph, root: &str) -> (u64, u64, String) { + add_person_fragments(db).await; + let (manifest_version, _, full) = person_manifest_and_head(db, root).await; + let mut ds = Dataset::open(&full).await.unwrap(); + let metrics = compact_files(&mut ds, CompactionOptions::default(), None) + .await + .expect("raw Lance compaction"); + let lance_head_version = ds.version().version; + assert!( + lance_head_version > manifest_version, + "raw Lance compaction should advance HEAD beyond manifest" + ); + assert!( + metrics.fragments_removed > 0 || metrics.fragments_added > 0, + "test precondition: raw compaction should rewrite fragments" + ); + (manifest_version, lance_head_version, full) +} + +async fn forge_person_delete_drift(db: &Omnigraph, root: &str) -> (u64, u64, String) { + let (manifest_version, _, full) = person_manifest_and_head(db, root).await; + let mut ds = Dataset::open(&full).await.unwrap(); + let deleted = ds.delete("name = 'Alice'").await.expect("raw Lance delete"); + assert_eq!(deleted.num_deleted_rows, 1, "fixture should delete Alice"); + let lance_head_version = deleted.new_dataset.version().version; + assert!( + lance_head_version > manifest_version, + "raw Lance delete should advance HEAD beyond manifest" + ); + (manifest_version, lance_head_version, full) +} + #[tokio::test] async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); - let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); let stats = db.optimize().await.unwrap(); @@ -47,7 +104,7 @@ async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() { #[tokio::test] async fn optimize_after_load_then_again_is_idempotent() { let dir = tempfile::tempdir().unwrap(); - let mut db = init_and_load(&dir).await; + let db = init_and_load(&dir).await; // First pass may compact (load wrote real fragments). let _first = db.optimize().await.unwrap(); @@ -180,7 +237,12 @@ node Tag {\n slug: String @key\n}\n"; #[tokio::test] async fn optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds() { let dir = tempfile::tempdir().unwrap(); - let root = dir.path().to_str().unwrap().trim_end_matches('/').to_string(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); let mut db = init_and_load(&dir).await; // Several separate inserts → multiple Person fragments, so `compact_files` @@ -234,6 +296,281 @@ async fn optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds() { assert!(result.applied, "schema apply should report applied=true"); } +#[tokio::test] +async fn optimize_skips_preexisting_manifest_head_drift() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let stats = db.optimize().await.unwrap(); + let person = stats + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person stat present"); + assert_eq!(person.skipped, Some(SkipReason::DriftNeedsRepair)); + assert!(!person.committed); + assert_eq!(person.manifest_version, Some(manifest_before)); + assert_eq!(person.lance_head_version, Some(head_before)); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!( + manifest_after, manifest_before, + "optimize must not publish uncovered drift" + ); + assert_eq!( + head_after, head_before, + "optimize must not move drifted HEAD" + ); +} + +#[tokio::test] +async fn repair_preview_reports_verified_maintenance_drift_without_healing() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: false, + force: false, + }) + .await + .unwrap(); + assert_eq!(stats.manifest_version, None); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!( + person.classification, + RepairClassification::VerifiedMaintenance + ); + assert_eq!(person.action, RepairAction::Preview); + assert_eq!(person.manifest_version, manifest_before); + assert_eq!(person.lance_head_version, head_before); + assert!( + person + .operations + .iter() + .all(|op| op == "ReserveFragments" || op == "Rewrite"), + "maintenance drift should only include Lance maintenance operations: {:?}", + person.operations + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!(head_after, head_before); +} + +#[tokio::test] +async fn repair_confirm_heals_verified_maintenance_drift() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (_, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: true, + force: false, + }) + .await + .unwrap(); + assert!( + stats.manifest_version.is_some(), + "confirmed repair should publish one manifest commit" + ); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!( + person.classification, + RepairClassification::VerifiedMaintenance + ); + assert_eq!(person.action, RepairAction::Healed); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, head_before); + assert_eq!(head_after, head_before); + + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + let result = db + .apply_schema(&desired) + .await + .expect("strict schema apply should succeed after repair"); + assert!(result.applied); +} + +#[tokio::test] +async fn repair_refuses_raw_delete_without_force() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_delete_drift(&db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: true, + force: false, + }) + .await + .unwrap(); + assert_eq!(stats.manifest_version, None); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!(person.classification, RepairClassification::Suspicious); + assert_eq!(person.action, RepairAction::Refused); + assert!( + person.operations.iter().any(|op| op == "Delete"), + "raw Lance delete should be reported as a suspicious operation: {:?}", + person.operations + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!(head_after, head_before); + assert_eq!( + count_rows(&db, "node:Person").await, + 4, + "manifest-pinned reads should still see the pre-delete version" + ); +} + +#[tokio::test] +async fn repair_force_heals_suspicious_drift() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let db = init_and_load(&dir).await; + let (_, head_before, _) = forge_person_delete_drift(&db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: true, + force: true, + }) + .await + .unwrap(); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!(person.classification, RepairClassification::Suspicious); + assert_eq!(person.action, RepairAction::Forced); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, head_before); + assert_eq!(head_after, head_before); + assert_eq!( + count_rows(&db, "node:Person").await, + 3, + "forced repair publishes the raw delete's HEAD" + ); +} + +#[tokio::test] +async fn non_strict_load_refuses_uncovered_drift_before_folding_it() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let err = load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Ivan\",\"age\":44}}", + LoadMode::Merge, + ) + .await + .expect_err("merge load must not silently fold uncovered drift"); + assert!( + err.to_string().contains("omnigraph repair"), + "error should point at explicit repair; got: {err}" + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!(head_after, head_before); +} + +#[tokio::test] +async fn delete_only_mutation_refuses_uncovered_drift_before_inline_commit() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let err = mutate_main( + &mut db, + MUTATION_QUERIES, + "remove_person", + &mixed_params(&[("$name", "Alice")], &[]), + ) + .await + .expect_err("strict delete must reject uncovered drift before delete_where"); + assert!( + err.to_string().contains("expected"), + "delete should fail as a strict stale-version write; got: {err}" + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!( + head_after, head_before, + "delete_where must not run after the strict drift guard fails" + ); + assert_eq!( + count_rows(&db, "node:Person").await, + 8, + "manifest-pinned reads should still see all rows present before the failed delete" + ); +} + // Regression: `optimize` must REFUSE when an unresolved recovery sidecar is // pending. Operating on an unrecovered graph could publish a partial write that // the all-or-nothing recovery sweep would roll back; the operator must reopen diff --git a/crates/omnigraph/tests/writes.rs b/crates/omnigraph/tests/writes.rs index 0a309c9..d76ad46 100644 --- a/crates/omnigraph/tests/writes.rs +++ b/crates/omnigraph/tests/writes.rs @@ -6,8 +6,8 @@ //! What this file covers: //! - No `__run__*` branches are created by load or mutate. //! - Cancellation of a mutation future leaves no graph-level state. -//! - Concurrent writers to the same table land exactly one publish; the -//! loser surfaces `ManifestConflictDetails::ExpectedVersionMismatch`. +//! - Concurrent non-strict inserts/merges rebase under the per-table queue; +//! strict updates/deletes surface `ExpectedVersionMismatch` on stale state. //! - Failed mutations and loads leave the target unchanged. //! - Multi-statement mutations are atomic (one commit per query). //! - actor_id propagates through to the commit graph. @@ -17,7 +17,7 @@ mod helpers; use arrow_array::Array; use omnigraph::db::commit_graph::CommitGraph; use omnigraph::db::{Omnigraph, ReadTarget}; -use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError}; +use omnigraph::error::OmniError; use omnigraph::loader::{LoadMode, load_jsonl}; use helpers::*; @@ -241,18 +241,11 @@ async fn partial_failure_leaves_target_queryable_and_unblocks_next_mutation() { assert_eq!(frank.num_rows(), 1, "Frank must be visible after publish"); } -/// Concurrent writers to the same `(table, branch)` produce exactly one -/// success and one `ExpectedVersionMismatch`. The replacement for the old -/// `concurrent_conflicting_run_publish_fails_cleanly` test — the OCC fence -/// has moved from a graph-level run-publish merge into the publisher's -/// per-table CAS. -/// -/// Drives the race by interleaving two handles that captured the same -/// pre-write manifest snapshot: A commits first; B's commit then sees -/// `expected_versions[node:Person] = pre` while the manifest is at -/// `pre + 1`, and the publisher rejects. +/// Stale non-strict writers rebase to the live manifest pin under the +/// per-table queue instead of folding raw drift or returning a false 409. +/// Strict update/delete semantics are covered by the consistency/server tests. #[tokio::test] -async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() { +async fn stale_non_strict_insert_rebases_to_live_manifest_pin() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_string_lossy().into_owned(); @@ -281,40 +274,30 @@ async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() { .unwrap(); } - // Writer B's coordinator is still at the pre-A snapshot. Its mutation - // captures expected_versions[node:Person] = pre (stale), then publishes - // — the publisher's CAS pre-check sees the manifest is now at post and - // rejects with ExpectedVersionMismatch. - let result_b = db_b - .mutate( - "main", - MUTATION_QUERIES, - "insert_person", - &mixed_params(&[("$name", "WriterB")], &[("$age", 42)]), - ) - .await; + // Writer B's coordinator is still at the pre-A snapshot, but Insert is + // non-strict: commit_all re-reads the live manifest pin under the queue, + // verifies Lance HEAD equals that pin, and then lets Lance rebase the + // staged append. + db_b.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "WriterB")], &[("$age", 42)]), + ) + .await + .unwrap(); - let err = result_b.expect_err("stale writer must hit ExpectedVersionMismatch"); - let OmniError::Manifest(manifest_err) = err else { - panic!("expected Manifest error, got {err:?}"); - }; - assert_eq!(manifest_err.kind, ManifestErrorKind::Conflict); - let Some(ManifestConflictDetails::ExpectedVersionMismatch { - ref table_key, - expected, - actual, - }) = manifest_err.details - else { - panic!( - "expected ExpectedVersionMismatch, got {:?}", - manifest_err.details, - ); - }; - assert_eq!(table_key, "node:Person"); - assert!( - actual > expected, - "actual ({actual}) should be ahead of expected ({expected})", - ); + for name in ["WriterA", "WriterB"] { + let person = query_main( + &mut db_b, + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", name)]), + ) + .await + .unwrap(); + assert_eq!(person.num_rows(), 1, "{name} should be visible"); + } } /// The cancellation hole that motivated removing the Run state machine: dropping a mutation future diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 5ee4f17..b29d740 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -139,6 +139,20 @@ them explicit. Remove the skip when the upstream Lance fix lands — the `lance_surface_guards.rs::compact_files_still_fails_on_blob_columns` guard turns red on that bump to force it. +- **Manifest→commit-graph publish atomicity:** a graph commit advances + `__manifest` (the visibility authority) and then appends `_graph_commits` as + two separate writes (`commit_updates_with_actor_with_expected`, failpoint + `graph_publish.before_commit_append`). A crash between them leaves the manifest + at version N with no commit-graph row for N. Live reads and durability are + unaffected — the live version resolves via the manifest + (`GraphCoordinator::version()`), not the commit-graph head — and the open-time + recovery sweep does NOT repair it (`lance_head == manifest_pinned` classifies + `NoMovement`; a recovery sidecar would not change this). Impact is bounded to + commit history: `commit list` misses N, time-travel by commit id to N fails, + and merge-base loses a node (a likely-benign off-by-one re-merge). This affects + every publish, not a specific maintenance command. Eventual fix: make the + commit graph reconcilable from the manifest (or the two writes atomic) — not a + recovery-sidecar concern. - **Planner capability/stat surfaces:** cost-aware planning, complete capability advertisement, and explain-with-cost are roadmap. Do not describe them as implemented. diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 8974a9f..1ec7038 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -20,7 +20,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `end_to_end.rs` | Full init → load → query/mutate flow | | `branching.rs` | Branch create / list / delete, lazy fork | | `merge_truth_table.rs` | Merge-pair truth table (MR-786): all 9×9 `(left_op, right_op)` cells from `{noop, addNode, removeNode, addEdge, removeEdge, setProperty, dropProperty, addLabel, removeLabel}`. Adding a new op to `OpVariant` forces a compile error in `build_case` until the new row + column are dispositioned. 36 executable cells run through real `branch_merge` with a structured oracle (`MergeOutcome` / `MergeConflictKind` + graph-state assert); 45 cells involving `dropProperty`/`addLabel`/`removeLabel` are recorded as `Unsupported` until the mutation grammar grows. | -| `writes.rs` | Direct-publish writes: cancellation, concurrent-writer CAS, multi-statement atomicity, MR-794 staged-write rewire (D₂ rejection, insert+update coalesce, multi-append coalesce, partial-failure recovery, load RI/cardinality recovery) | +| `writes.rs` | Direct-publish writes: cancellation, non-strict insert/merge rebase under the per-table queue, strict stale-write conflicts, multi-statement atomicity, MR-794 staged-write rewire (D₂ rejection, insert+update coalesce, multi-append coalesce, partial-failure recovery, load RI/cardinality recovery) | | `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead | | `lifecycle.rs` | Graph lifecycle, schema state | | `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) | @@ -34,7 +34,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `s3_storage.rs` | S3-backed graph (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) | | `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior | | `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths | -| `maintenance.rs` | `optimize` (compaction) + `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation; `optimize` publishes the compacted version so the manifest tracks the Lance HEAD and a subsequent schema apply succeeds (`optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds`), and refuses to run while a `__recovery` sidecar is pending so optimize only ever operates on a recovered graph (`optimize_defers_when_recovery_sidecar_is_pending`) | +| `maintenance.rs` | `optimize` (compaction), `repair` (explicit uncovered-drift publish), and `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation; `optimize` publishes its own compaction (`optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds`), skips pre-existing uncovered drift (`optimize_skips_preexisting_manifest_head_drift`), and refuses to run while a `__recovery` sidecar is pending (`optimize_defers_when_recovery_sidecar_is_pending`); `repair` previews/heals verified maintenance drift, refuses raw semantic drift without `--force`, and forced repair publishes only by explicit operator choice | | `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`). | | `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path | | `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). | diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 8263919..a88d253 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -2,7 +2,7 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` schema. For a quick-start guide, see [cli.md](cli.md). -17 top-level command families, 40+ subcommands. All commands accept either a positional `URI`, `--uri`, or a `--target ` resolved against `omnigraph.yaml`. +Top-level command families and subcommands. Graph-targeting commands accept either a positional `URI`, `--uri`, or a `--target ` resolved against `omnigraph.yaml`. ## Top-level commands @@ -17,11 +17,11 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` sc | `export` | dump to JSONL on stdout (`--type T`, `--table K` filters) | | `branch create \| list \| delete \| merge` | branching ops | | `commit list \| show` | inspect commit graph | -| `run list \| show \| publish \| abort` | transactional run ops | | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | | `queries validate \| list` | operate on the server-side stored-query registry (the `queries:` block). `validate` type-checks every stored query against the live schema offline (opens the selected graph; exits non-zero on any breakage), catching schema drift without restarting the server; `list` prints the selected registry's query names, MCP exposure, and typed params. For per-graph registries, pass `--target ` or set `cli.graph`; with no graph selection, `list` shows only top-level `queries:`. Distinct from `lint`, which validates a single `.gq` file | -| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns; `--json` reports a `skipped` field) | +| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) | +| `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | | `embed` | offline JSONL embedding pipeline | | `policy validate \| test \| explain` | Cedar tooling. Selects `cli.graph`, else `server.graph`, else top-level `policy.file` | diff --git a/docs/user/maintenance.md b/docs/user/maintenance.md index a835799..e69bba3 100644 --- a/docs/user/maintenance.md +++ b/docs/user/maintenance.md @@ -1,17 +1,26 @@ -# Maintenance: Optimize & Cleanup +# Maintenance: Optimize, Repair & Cleanup -`db/omnigraph/optimize.rs`. +`db/omnigraph/optimize.rs` and `db/omnigraph/repair.rs`. ## `optimize_all_tables(db)` — non-destructive - Lance `compact_files()` on every node + edge table on `main`, then **publishes the compacted version to the `__manifest`** so the manifest's `table_version` tracks the compacted Lance HEAD. Reads pin the manifest version, so without this publish compaction would be invisible to readers *and* would break the HEAD-vs-manifest precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually compacted. - Rewrites small fragments into fewer large ones; old fragments remain reachable via older manifests until `cleanup` runs. - Each table's compact→publish runs under its per-`(table, main)` write queue (serializing with concurrent mutations — compaction is a Lance `Rewrite` op that retryable-conflicts with a concurrent merge/update/delete on overlapping fragments). The Lance-HEAD-before-manifest-publish gap is covered by a `SidecarKind::Optimize` recovery sidecar (loose-match): a crash in that window rolls the compacted version forward on the next `Omnigraph::open` (compaction is content-preserving, so roll-forward is always safe). -- **Requires a recovered graph.** `optimize` refuses (errors) when an unresolved recovery sidecar is present under `__recovery` — operating on an unrecovered graph could publish a partial write the open-time recovery sweep would roll back. Reopen the graph to run the recovery sweep, then re-run `optimize`. (Recovery roll-back now publishes its restored version, so a recovered graph always satisfies `manifest == Lance HEAD` going in; there is no leftover drift for `optimize` to interpret.) +- **Requires a recovered graph.** `optimize` refuses (errors) when an unresolved recovery sidecar is present under `__recovery` — operating on an unrecovered graph could publish a partial write the open-time recovery sweep would roll back. Reopen the graph to run the recovery sweep, then re-run `optimize`. +- **Uncovered drift is skipped, not interpreted.** If a table's Lance HEAD is ahead of the version recorded in `__manifest` and no recovery sidecar covers that movement, `optimize` reports `skipped: Some(DriftNeedsRepair)` with the manifest/head versions and leaves the table untouched. Run `omnigraph repair` to classify and explicitly publish that drift. - Bounded by `OMNIGRAPH_MAINTENANCE_CONCURRENCY` (default 8). -- Returns `[TableOptimizeStats { table_key, fragments_removed, fragments_added, committed, skipped }]`. +- Returns `[TableOptimizeStats { table_key, fragments_removed, fragments_added, committed, skipped, manifest_version, lance_head_version }]`. - **Blob tables are skipped.** A table that declares any `Blob` property is not compacted: it is reported with `skipped: Some(BlobColumnsUnsupportedByLance)` (and logged via `tracing::warn`) instead of compacted, and the rest of the sweep proceeds normally. The current Lance `compact_files` mis-decodes blob-v2 columns under its forced `BlobHandling::AllBinary` read; **reads and writes are unaffected** — only compaction is. This is gated by `LANCE_SUPPORTS_BLOB_COMPACTION` (`db/omnigraph/optimize.rs`) and removed when the upstream Lance fix lands (see [docs/dev/lance.md](../dev/lance.md)). Consequence: fragment count and deleted-row space on blob tables are not reclaimed until then; query results are never affected. +## `repair_all_tables(db, options)` — explicit + +- Handles **uncovered manifest/head drift**: a table's Lance HEAD is ahead of the manifest pin and no recovery sidecar records the writer intent. +- Preview by default. `omnigraph repair --json ` reports each table's `classification`, `action`, manifest/head versions, Lance operation names, and any classification error. `--confirm` publishes only verified maintenance drift; if any suspicious or unverifiable table is refused, the CLI prints the per-table output and exits non-zero. `--force --confirm` also publishes suspicious or unverifiable drift after operator review. +- Classifies drift by reading Lance transactions from `manifest_version + 1` through `lance_head_version`. Only `ReserveFragments` and `Rewrite` are verified maintenance. Semantic operations such as `Append`, `Delete`, `Update`, `Merge`, or missing transaction history are not auto-healed. +- Publishes repair by advancing `__manifest` to the existing Lance HEAD; it does **not** rewrite Lance data. If the publish succeeds, normal reads and strict writes use the repaired version. If it fails, no new data-side partial state was created. +- Requires a clean recovery state. Pending `__recovery` sidecars still belong to automatic sidecar recovery, not manual repair. + ## `cleanup_all_tables(db, options)` — destructive - Lance `cleanup_old_versions()` per table.