diff --git a/.github/branch-protection.json b/.github/branch-protection.json index 7ca46b9..c039e32 100644 --- a/.github/branch-protection.json +++ b/.github/branch-protection.json @@ -1,5 +1,5 @@ { - "_comment": "Branch protection policy for main. Applied via scripts/apply-branch-protection.sh. See docs/branch-protection.md for rationale.", + "_comment": "Branch protection policy for main. Applied via scripts/apply-branch-protection.sh. See docs/branch-protection.md for rationale. NOTE: bypass_pull_request_allowances.users must mirror the engineering owners in .github/codeowners-roles.yml — code owners merge their own PRs without a second review; non-owners still need a code-owner approval. (render-codeowners.py does NOT generate this list; keep it in sync by hand.)", "required_status_checks": { "strict": true, "contexts": [ @@ -17,7 +17,12 @@ "dismiss_stale_reviews": true, "require_code_owner_reviews": true, "required_approving_review_count": 1, - "require_last_push_approval": false + "require_last_push_approval": false, + "bypass_pull_request_allowances": { + "users": ["ragnorc", "aaltshuler"], + "teams": [], + "apps": [] + } }, "restrictions": null, "required_linear_history": true, diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5b7b7b2..bbe5893 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -261,63 +261,6 @@ jobs: if: needs.classify_changes.outputs.run_full_ci == 'true' run: cargo test --locked -p omnigraph-server --features aws - test_windows_binaries: - name: Test Windows release binaries - needs: classify_changes - runs-on: windows-latest - timeout-minutes: 75 - permissions: - contents: read - env: - CARGO_TERM_COLOR: always - steps: - - name: Skip for text-only changes - if: needs.classify_changes.outputs.run_full_ci != 'true' - run: Write-Host "Text-only change detected; skipping Windows binary build." - - - name: Checkout source - if: needs.classify_changes.outputs.run_full_ci == 'true' - uses: actions/checkout@v5.0.1 - - - name: Install system dependencies - if: needs.classify_changes.outputs.run_full_ci == 'true' - run: choco install protoc -y - - - name: Install Rust stable - if: needs.classify_changes.outputs.run_full_ci == 'true' - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - - name: Cache Rust build data - if: needs.classify_changes.outputs.run_full_ci == 'true' - uses: Swatinem/rust-cache@v2 - with: - workspaces: | - . -> target - key: windows-release-binaries - - - name: Build Windows binaries - if: needs.classify_changes.outputs.run_full_ci == 'true' - run: cargo build --release --locked -p omnigraph-cli -p omnigraph-server - - - name: Smoke test Windows binaries - if: needs.classify_changes.outputs.run_full_ci == 'true' - run: | - & ./target/release/omnigraph.exe version - & ./target/release/omnigraph-server.exe --help - - - name: Check PowerShell installer syntax - if: needs.classify_changes.outputs.run_full_ci == 'true' - run: | - $tokens = $null - $errors = $null - [System.Management.Automation.Language.Parser]::ParseFile("scripts/install.ps1", [ref]$tokens, [ref]$errors) | Out-Null - if ($errors.Count -gt 0) { - $errors | Format-List - exit 1 - } - rustfs_integration: name: RustFS S3 Integration needs: diff --git a/AGENTS.md b/AGENTS.md index 26172ff..e4ac297 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -16,7 +16,7 @@ Tools that support `@`-imports (Claude Code) auto-include all three files via th `CLAUDE.md` is a symlink to this file — there is exactly one source of truth. Edit `AGENTS.md`. -**Version surveyed:** 0.6.1 +**Version surveyed:** 0.6.2 **Workspace crates:** `omnigraph-compiler`, `omnigraph` (engine), `omnigraph-policy`, `omnigraph-cluster`, `omnigraph-cli`, `omnigraph-server` **Storage substrate:** Lance 6.x (columnar, versioned, branchable) **License:** MIT @@ -214,8 +214,12 @@ omnigraph schema apply --schema ./next.pg s3://my-bucket/graph.omni --json # Merge review branch back omnigraph branch merge review/2026-04-25 --into main s3://my-bucket/graph.omni -# Compact + GC (preview, then confirm) +# Compact, preview any uncovered drift, then repair/GC after review omnigraph optimize s3://my-bucket/graph.omni +omnigraph repair s3://my-bucket/graph.omni +omnigraph repair --confirm s3://my-bucket/graph.omni +# For suspicious/unverifiable drift only after deliberate review: +# omnigraph repair --force --confirm s3://my-bucket/graph.omni omnigraph cleanup --keep 10 --older-than 7d s3://my-bucket/graph.omni omnigraph cleanup --keep 10 --older-than 7d --confirm s3://my-bucket/graph.omni @@ -236,8 +240,9 @@ omnigraph policy explain --actor act-alice --action change --branch main | Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing | | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | -| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore`, and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). Continuous in-process recovery (no restart needed between Phase B failure and recovery) is the goal of a future background reconciler. Engine writes route through a sealed `TableStorage` trait exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and the migration of every call site completes. | -| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) | +| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). Continuous in-process recovery (no restart needed between Phase B failure and recovery) is the goal of a future background reconciler. Engine writes route through a sealed `TableStorage` trait exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and the migration of every call site completes. | +| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending); **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair` instead of interpreting it; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) | +| Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | | BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches | | `merge_insert` upsert | ✅ | `LoadMode::Merge`, mutation `update`/`insert`/`delete` lowering | diff --git a/Cargo.lock b/Cargo.lock index ebe5565..3064196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4543,7 +4543,7 @@ dependencies = [ [[package]] name = "omnigraph-cli" -version = "0.6.1" +version = "0.6.2" dependencies = [ "assert_cmd", "clap", @@ -4566,7 +4566,7 @@ dependencies = [ [[package]] name = "omnigraph-cluster" -version = "0.6.1" +version = "0.6.2" dependencies = [ "omnigraph-compiler", "serde", @@ -4581,7 +4581,7 @@ dependencies = [ [[package]] name = "omnigraph-compiler" -version = "0.6.1" +version = "0.6.2" dependencies = [ "ahash", "arrow-array", @@ -4602,7 +4602,7 @@ dependencies = [ [[package]] name = "omnigraph-engine" -version = "0.6.1" +version = "0.6.2" dependencies = [ "arc-swap", "arrow-array", @@ -4643,7 +4643,7 @@ dependencies = [ [[package]] name = "omnigraph-policy" -version = "0.6.1" +version = "0.6.2" dependencies = [ "cedar-policy", "clap", @@ -4656,7 +4656,7 @@ dependencies = [ [[package]] name = "omnigraph-server" -version = "0.6.1" +version = "0.6.2" dependencies = [ "arc-swap", "async-trait", diff --git a/crates/omnigraph-cli/Cargo.toml b/crates/omnigraph-cli/Cargo.toml index bc50551..901d69c 100644 --- a/crates/omnigraph-cli/Cargo.toml +++ b/crates/omnigraph-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-cli" -version = "0.6.1" +version = "0.6.2" edition = "2024" description = "CLI for the Omnigraph graph database." license = "MIT" @@ -13,11 +13,11 @@ name = "omnigraph" path = "src/main.rs" [dependencies] -omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" } -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } -omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.1" } -omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" } -omnigraph-server = { path = "../omnigraph-server", version = "0.6.1" } +omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } +omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.2" } +omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" } +omnigraph-server = { path = "../omnigraph-server", version = "0.6.2" } clap = { workspace = true } color-eyre = { workspace = true } serde = { workspace = true } diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 4ca4a4a..38ea0de 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -287,6 +287,25 @@ enum Command { #[arg(long)] json: bool, }, + /// Classify and explicitly repair manifest/head drift + Repair { + /// Graph URI + uri: Option, + #[arg(long)] + target: Option, + #[arg(long)] + config: Option, + /// Publish verified maintenance drift. Without this flag, repair only + /// previews what it would do. + #[arg(long)] + confirm: bool, + /// Also publish suspicious or unverifiable drift. Requires + /// `--confirm`; use only after operator review. + #[arg(long, requires = "confirm")] + force: bool, + #[arg(long)] + json: bool, + }, /// Remove old Lance versions from every table of the graph (destructive) Cleanup { /// Graph URI @@ -3160,6 +3179,8 @@ async fn main() -> Result<()> { "fragments_added": s.fragments_added, "committed": s.committed, "skipped": s.skipped.map(|r| r.as_str()), + "manifest_version": s.manifest_version, + "lance_head_version": s.lance_head_version, })).collect::>(), }); print_json(&value)?; @@ -3179,6 +3200,89 @@ async fn main() -> Result<()> { } } } + Command::Repair { + uri, + target, + config, + confirm, + force, + json, + } => { + let config = load_cli_config(config.as_ref())?; + let uri = resolve_uri(&config, uri, target.as_deref())?; + let db = Omnigraph::open(&uri).await?; + let stats = db + .repair(omnigraph::db::RepairOptions { confirm, force }) + .await?; + let refused_count = stats + .tables + .iter() + .filter(|s| matches!(s.action, omnigraph::db::RepairAction::Refused)) + .count(); + if json { + let value = serde_json::json!({ + "uri": uri, + "confirm": confirm, + "force": force, + "manifest_version": stats.manifest_version, + "tables": stats.tables.iter().map(|s| serde_json::json!({ + "table_key": s.table_key, + "manifest_version": s.manifest_version, + "lance_head_version": s.lance_head_version, + "classification": s.classification.as_str(), + "action": s.action.as_str(), + "operations": s.operations, + "error": s.error, + })).collect::>(), + }); + print_json(&value)?; + } else { + let mode = if confirm { "confirm" } else { "preview" }; + println!( + "repair {} — {} mode, {} tables", + uri, + mode, + stats.tables.len() + ); + for s in &stats.tables { + let drift = if s.manifest_version == s.lance_head_version { + format!("{}", s.manifest_version) + } else { + format!("{} → {}", s.manifest_version, s.lance_head_version) + }; + let ops = if s.operations.is_empty() { + String::new() + } else { + format!(" [{}]", s.operations.join(", ")) + }; + let err = s + .error + .as_ref() + .map(|err| format!(" ({err})")) + .unwrap_or_default(); + println!( + " {:<40} {:<12} {:<22} {}{}{}", + s.table_key, + s.action.as_str(), + s.classification.as_str(), + drift, + ops, + err + ); + } + if !confirm { + println!("rerun with --confirm to publish verified maintenance drift"); + } + } + if refused_count > 0 { + bail!( + "repair refused {} suspicious or unverifiable table(s); review the preview \ + output and rerun with --force --confirm only if publishing that drift is \ + intentional", + refused_count + ); + } + } Command::Cleanup { uri, target, diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 920ceda..627fd87 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1,5 +1,6 @@ use std::fs; +use lance::Dataset; use lance::index::DatasetIndexExt; use omnigraph::db::{Omnigraph, ReadTarget}; use serde_json::Value; @@ -60,6 +61,25 @@ fn manifest_dataset_version(graph: &std::path::Path) -> u64 { }) } +fn forge_person_delete_drift(graph: &std::path::Path) -> (u64, u64) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let uri = graph.to_string_lossy(); + let db = Omnigraph::open(uri.as_ref()).await.unwrap(); + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap(); + let entry = snap.entry("node:Person").unwrap(); + let full_path = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path); + let mut ds = Dataset::open(&full_path).await.unwrap(); + let deleted = ds.delete("name = 'Alice'").await.unwrap(); + assert_eq!(deleted.num_deleted_rows, 1); + let head = deleted.new_dataset.version().version; + assert!(head > entry.table_version); + (entry.table_version, head) + }) +} + fn write_policy_config_fixture(root: &std::path::Path) -> (std::path::PathBuf, std::path::PathBuf) { let config = root.join("omnigraph.yaml"); let policy = root.join("policy.yaml"); @@ -542,6 +562,83 @@ fn init_creates_graph_successfully_on_missing_local_directory() { assert!(temp.path().join("omnigraph.yaml").exists()); } +#[test] +fn repair_json_reports_noop_on_clean_graph() { + let temp = tempdir().unwrap(); + let graph = graph_path(temp.path()); + init_graph(&graph); + load_fixture(&graph); + + let output = output_success(cli().arg("repair").arg("--json").arg(&graph)); + let payload: Value = serde_json::from_slice(&output.stdout).unwrap(); + + assert_eq!(payload["confirm"], false); + assert_eq!(payload["force"], false); + assert_eq!(payload["manifest_version"], Value::Null); + let tables = payload["tables"].as_array().unwrap(); + assert_eq!(tables.len(), 4); + assert!(tables.iter().all(|table| { + table["classification"] == "no_drift" && table["action"] == "no_op" + })); +} + +#[test] +fn repair_confirm_json_refuses_suspicious_drift_with_nonzero_exit_then_force_succeeds() { + let temp = tempdir().unwrap(); + let graph = graph_path(temp.path()); + init_graph(&graph); + load_fixture(&graph); + let graph_manifest_before = manifest_dataset_version(&graph); + let (table_manifest_before, table_head_before) = forge_person_delete_drift(&graph); + + let refused = output_failure( + cli() + .arg("repair") + .arg("--confirm") + .arg("--json") + .arg(&graph), + ); + let refused_payload: Value = serde_json::from_slice(&refused.stdout).unwrap(); + assert_eq!(refused_payload["manifest_version"], Value::Null); + let person = refused_payload["tables"] + .as_array() + .unwrap() + .iter() + .find(|table| table["table_key"] == "node:Person") + .unwrap(); + assert_eq!(person["classification"], "suspicious"); + assert_eq!(person["action"], "refused"); + assert!( + String::from_utf8_lossy(&refused.stderr).contains("repair refused"), + "stderr should explain the non-zero exit; got: {}", + String::from_utf8_lossy(&refused.stderr) + ); + assert_eq!(manifest_dataset_version(&graph), graph_manifest_before); + + let forced = output_success( + cli() + .arg("repair") + .arg("--force") + .arg("--confirm") + .arg("--json") + .arg(&graph), + ); + let forced_payload: Value = serde_json::from_slice(&forced.stdout).unwrap(); + let forced_manifest = forced_payload["manifest_version"].as_u64().unwrap(); + assert!(forced_manifest > graph_manifest_before); + let person = forced_payload["tables"] + .as_array() + .unwrap() + .iter() + .find(|table| table["table_key"] == "node:Person") + .unwrap(); + assert_eq!(person["classification"], "suspicious"); + assert_eq!(person["action"], "forced"); + assert_eq!(person["manifest_version"], table_manifest_before); + assert_eq!(person["lance_head_version"], table_head_before); + assert_eq!(manifest_dataset_version(&graph), forced_manifest); +} + #[test] fn schema_plan_json_reports_supported_additive_change() { let temp = tempdir().unwrap(); diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index d210b1c..3e14430 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-cluster" -version = "0.6.1" +version = "0.6.2" edition = "2024" description = "Read-only cluster configuration validation and planning for Omnigraph." license = "MIT" @@ -9,7 +9,7 @@ homepage = "https://github.com/ModernRelay/omnigraph" documentation = "https://docs.rs/omnigraph-cluster" [dependencies] -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } diff --git a/crates/omnigraph-compiler/Cargo.toml b/crates/omnigraph-compiler/Cargo.toml index 545db83..8db46e6 100644 --- a/crates/omnigraph-compiler/Cargo.toml +++ b/crates/omnigraph-compiler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-compiler" -version = "0.6.1" +version = "0.6.2" edition = "2024" description = "Schema/query compiler for Omnigraph. Zero Lance dependency." license = "MIT" diff --git a/crates/omnigraph-policy/Cargo.toml b/crates/omnigraph-policy/Cargo.toml index 3d14fc5..0df2a12 100644 --- a/crates/omnigraph-policy/Cargo.toml +++ b/crates/omnigraph-policy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-policy" -version = "0.6.1" +version = "0.6.2" edition = "2024" description = "Policy / authorization layer for Omnigraph — Cedar-backed PolicyEngine, PolicyChecker trait, ResourceScope enum." license = "MIT" diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index 5994aa1..5f87082 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-server" -version = "0.6.1" +version = "0.6.2" edition = "2024" description = "HTTP server for the Omnigraph graph database." license = "MIT" @@ -19,9 +19,9 @@ default = [] aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"] [dependencies] -omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" } -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } -omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" } +omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } +omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" } axum = { workspace = true } clap = { workspace = true } color-eyre = { workspace = true } diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index 70f51d8..24b0c9c 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "omnigraph-engine" -version = "0.6.1" +version = "0.6.2" edition = "2024" description = "Runtime engine for the Omnigraph graph database." license = "MIT" @@ -16,8 +16,8 @@ default = [] failpoints = ["dep:fail", "fail/failpoints"] [dependencies] -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } -omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } +omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" } lance = { workspace = true } lance-datafusion = { workspace = true } datafusion = { workspace = true } @@ -51,7 +51,7 @@ chrono = { workspace = true } arc-swap = { workspace = true } [dev-dependencies] -omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } tokio = { workspace = true } lance-namespace-impls = { workspace = true } serial_test = "3" diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 3b2886f..5bf1f87 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -36,7 +36,7 @@ use publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; pub(crate) use recovery::{ RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, - new_sidecar, recover_manifest_drift, write_sidecar, + list_sidecars, new_sidecar, recover_manifest_drift, write_sidecar, }; pub use state::SubTableEntry; #[cfg(test)] diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 4c1b987..3119531 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -106,6 +106,12 @@ pub(crate) enum SidecarKind { BranchMerge, /// `ensure_indices_for_branch` — index lifecycle commits. EnsureIndices, + /// `optimize_all_tables` — Lance `compact_files` (reserve-fragments + + /// rewrite commits) followed by a manifest publish of the compacted + /// version. Loose-match like the other multi-commit writers; roll-forward + /// is always safe because compaction is content-preserving (Lance + /// `Operation::Rewrite` "reorganizes data without semantic modification"). + Optimize, } /// One table's contribution to a sidecar's intended commit. The classifier @@ -412,11 +418,13 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result manifest_pinned` as `RolledPastExpected` when /// `pin.expected_version == manifest_pinned` (the writer's CAS /// target matches what the manifest currently shows). The risk this @@ -494,9 +502,12 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision /// Skipping the restore in those cases would leave Lance HEAD ahead of /// the manifest with no recovery artifact left. /// -/// Cost: under repeated mid-rollback crashes (rare), Lance HEAD -/// accumulates extra restore commits that `omnigraph cleanup` reclaims. -/// Bounded by the number of recovery iterations — typically 1. +/// Cost: a successful roll-back appends one restore commit and then publishes +/// the manifest to match (`roll_back_sidecar`), so the table converges +/// (`manifest == HEAD`) in one pass. Only repeated crashes *between* the restore +/// and that publish (rare) accumulate extra restore commits; each re-classified +/// roll-back restores again and `omnigraph cleanup` reclaims the surplus. +/// Bounded by the number of interrupted recovery iterations — typically 0. pub(crate) async fn restore_table_to_version( table_path: &str, branch: Option<&str>, @@ -801,13 +812,24 @@ async fn roll_back_sidecar( sidecar: &RecoverySidecar, states: &[ClassifiedTable], ) -> Result<()> { - // Restore every table whose Lance HEAD has drifted from the - // manifest pin (RolledPastExpected, UnexpectedAtP1, - // UnexpectedMultistep). NoMovement tables are already at the - // manifest pin — no action. Restore is unconditional; repeated - // mid-rollback crashes accumulate a few extra Lance commits that - // `omnigraph cleanup` reclaims. + // Restore every drifted table (RolledPastExpected / UnexpectedAtP1 / + // UnexpectedMultistep) to its manifest-pinned content, then PUBLISH so + // `manifest == Lance HEAD` for each — symmetric with roll-forward. The + // restore commit's content equals the manifest-pinned version, so re-pinning + // the manifest to the new (restored) HEAD is content-correct and closes the + // orphaned-drift class (`HEAD > manifest` with no covering sidecar). This is + // what makes a failed-then-retried schema_apply converge: after one + // roll-back `manifest == HEAD`, so the retry's precondition passes instead of + // failing one version higher each iteration. + // + // NoMovement tables are already at the pin — excluded from both the restore + // and the publish. The audit `to_version` stays the *logical* rolled-back-to + // version (`manifest_pinned`), while the manifest is published at + // `manifest_pinned + 1` (the restore commit, same content) — keep that + // asymmetry so the audit records the drift (`from_version > to_version`). let mut outcomes = Vec::with_capacity(sidecar.tables.len()); + let mut updates: Vec = Vec::with_capacity(sidecar.tables.len()); + let mut expected: HashMap = HashMap::with_capacity(sidecar.tables.len()); for (pin, state) in sidecar.tables.iter().zip(states.iter()) { if matches!( state.classification, @@ -821,10 +843,20 @@ async fn roll_back_sidecar( state.manifest_pinned, ) .await?; - // `from_version` records the Lance HEAD observed BEFORE the - // restore (the actual drift), not the manifest pin. Operators - // reading `_graph_commit_recoveries.lance` see "rolled back - // from v7 to v5" rather than "v5 → v5". + // Publish the post-restore HEAD, CAS against the current (unmoved) + // manifest pin — the same helper roll-forward uses. + push_table_update_at_head( + root_uri, + &pin.table_key, + &pin.table_path, + pin.table_branch.as_deref(), + state.manifest_pinned, + &mut updates, + &mut expected, + ) + .await?; + // `from_version` records the Lance HEAD observed BEFORE the restore + // (the actual drift); `to_version` the logical pin we rolled back to. outcomes.push(TableOutcome { table_key: pin.table_key.clone(), from_version: state.lance_head, @@ -832,13 +864,23 @@ async fn roll_back_sidecar( }); } } - // Manifest pin doesn't move on rollback; record an audit-only - // commit at the existing version so operators can correlate via - // `omnigraph commit list --filter actor=omnigraph:recovery`. + // Publish the restored HEADs so manifest == HEAD. A degenerate all-NoMovement + // roll-back restores nothing — there's nothing to publish, and the audit + // records the unchanged snapshot version. + let manifest_version = if updates.is_empty() { + snapshot.version() + } else { + let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref()); + publisher + .publish(&updates, &expected) + .await? + .version() + .version + }; record_audit( root_uri, sidecar, - snapshot.version(), + manifest_version, RecoveryKind::RolledBack, outcomes, ) @@ -919,44 +961,20 @@ async fn roll_forward_all( HashMap::with_capacity(sidecar.tables.len() + sidecar.additional_registrations.len()); for pin in &sidecar.tables { - // Open the dataset at its CURRENT Lance HEAD on the pin's branch - // (not at the sidecar's post_commit_pin). For strict-match writers - // (Mutation/Load) HEAD == post_commit_pin by construction. For - // loose-match writers (SchemaApply/EnsureIndices/BranchMerge) HEAD - // may be higher than post_commit_pin (multiple commit_staged - // calls per table); we want to publish to the actual current HEAD. - let head_ds = Dataset::open(&pin.table_path) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - let head_ds = match pin.table_branch.as_deref() { - Some(b) if b != "main" => head_ds - .checkout_branch(b) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?, - _ => head_ds, - }; - let head_version = head_ds.version().version; - - let row_count = head_ds - .count_rows(None) - .await - .map_err(|e| OmniError::Lance(e.to_string()))? as u64; - - let table_relative_path = super::table_path_for_table_key(&pin.table_key)?; - let version_metadata = super::metadata::TableVersionMetadata::from_dataset( + // Publish to the table's CURRENT Lance HEAD on the pin's branch (not the + // sidecar's `post_commit_pin`, a lower bound for loose-match writers that + // run multiple commit_staged calls per table). CAS against the pin's + // pre-write `expected_version`. + let head_version = push_table_update_at_head( root_uri, - &table_relative_path, - &head_ds, - )?; - - updates.push(ManifestChange::Update(SubTableUpdate { - table_key: pin.table_key.clone(), - table_version: head_version, - table_branch: pin.table_branch.clone(), - row_count, - version_metadata, - })); - expected.insert(pin.table_key.clone(), pin.expected_version); + &pin.table_key, + &pin.table_path, + pin.table_branch.as_deref(), + pin.expected_version, + &mut updates, + &mut expected, + ) + .await?; published_versions.insert(pin.table_key.clone(), head_version); } @@ -1047,6 +1065,57 @@ async fn roll_forward_all( Ok((new_dataset.version().version, published_versions)) } +/// Open `table_path` at its branch HEAD, read the current Lance HEAD version, +/// row count, and version metadata, and push a `ManifestChange::Update` (plus +/// its CAS `expected` entry) that re-pins the manifest to that HEAD. Returns the +/// published HEAD version. +/// +/// Shared by `roll_forward_all` (where `expected_version` is the sidecar's +/// pre-write pin) and `roll_back_sidecar` (where it is the manifest-pinned +/// version the table was just restored to). The HEAD is read AFTER any restore +/// in the same single-threaded sweep, so no concurrent writer can have advanced +/// it. +async fn push_table_update_at_head( + root_uri: &str, + table_key: &str, + table_path: &str, + branch: Option<&str>, + expected_version: u64, + updates: &mut Vec, + expected: &mut HashMap, +) -> Result { + let head_ds = Dataset::open(table_path) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let head_ds = match branch { + Some(b) if b != "main" => head_ds + .checkout_branch(b) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?, + _ => head_ds, + }; + let head_version = head_ds.version().version; + let row_count = head_ds + .count_rows(None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))? as u64; + let table_relative_path = super::table_path_for_table_key(table_key)?; + let version_metadata = super::metadata::TableVersionMetadata::from_dataset( + root_uri, + &table_relative_path, + &head_ds, + )?; + updates.push(ManifestChange::Update(SubTableUpdate { + table_key: table_key.to_string(), + table_version: head_version, + table_branch: branch.map(str::to_string), + row_count, + version_metadata, + })); + expected.insert(table_key.to_string(), expected_version); + Ok(head_version) +} + /// Append the audit row describing this recovery action. /// /// Two-part write: (a) `_graph_commits.lance` row anchored on the recovery diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index 13e1c74..000602a 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -11,8 +11,9 @@ pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, Snapsh pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate}; pub(crate) use omnigraph::ensure_public_branch_ref; pub use omnigraph::{ - CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions, - SchemaApplyResult, SkipReason, TableCleanupStats, TableOptimizeStats, + CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, RepairAction, + RepairClassification, RepairOptions, RepairStats, SchemaApplyOptions, SchemaApplyResult, + SkipReason, TableCleanupStats, TableOptimizeStats, TableRepairStats, }; pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__"; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index ba2b70e..5bcc973 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -30,10 +30,14 @@ use crate::table_store::TableStore; mod export; mod optimize; +mod repair; mod schema_apply; mod table_ops; pub use optimize::{CleanupPolicyOptions, SkipReason, TableCleanupStats, TableOptimizeStats}; +pub use repair::{ + RepairAction, RepairClassification, RepairOptions, RepairStats, TableRepairStats, +}; pub use schema_apply::SchemaApplyOptions; use super::commit_graph::GraphCommit; @@ -682,6 +686,16 @@ impl Omnigraph { .map(|resolved| resolved.snapshot) } + pub(crate) async fn fresh_snapshot_for_branch(&self, branch: Option<&str>) -> Result { + self.ensure_schema_state_valid().await?; + let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string()); + let coord = self.coordinator.read().await; + coord + .resolve_target(&requested) + .await + .map(|resolved| resolved.snapshot) + } + pub(crate) async fn version(&self) -> u64 { self.coordinator.read().await.version() } @@ -999,6 +1013,13 @@ impl Omnigraph { optimize::optimize_all_tables(self).await } + /// Classify and explicitly repair uncovered manifest/head drift. See + /// [`repair`] for the distinction between safe maintenance drift and + /// suspicious/unverifiable drift. + pub async fn repair(&self, options: repair::RepairOptions) -> Result { + repair::repair_all_tables(self, options).await + } + /// Remove Lance manifests (and the fragments they uniquely own) per the /// given [`optimize::CleanupPolicyOptions`]. Destructive to version /// history. See [`optimize`] for details. diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index fff3f54..3c37b66 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -8,8 +8,14 @@ //! Two dials: //! //! * `optimize_all_tables` — Lance `compact_files` on every table. Rewrites -//! small fragments into fewer large ones. Non-destructive (creates a new -//! version; old fragments remain reachable via older manifest versions). +//! small fragments into fewer large ones, then **publishes the compacted +//! version to the `__manifest`** so the manifest's `table_version` tracks the +//! compacted Lance HEAD (reads pin the manifest version, so without the +//! publish compaction would be invisible to readers and would break the +//! HEAD-vs-manifest precondition of schema apply / strict writes). Compaction +//! is content-preserving (Lance `Operation::Rewrite` "reorganizes data +//! without semantic modification"), so old fragments remain reachable via +//! older manifest versions until `cleanup` runs. //! * `cleanup_all_tables` — Lance `cleanup_old_versions` on every table. //! Removes manifests (and their unique fragments) older than the configured //! retention. Destructive to version history — callers should gate this @@ -23,7 +29,9 @@ use std::time::Duration; use chrono::Utc; use futures::stream::StreamExt; use lance::dataset::cleanup::{CleanupPolicy, RemovalStats}; -use lance::dataset::optimize::{CompactionMetrics, CompactionOptions, compact_files}; +use lance::dataset::optimize::{ + CompactionMetrics, CompactionOptions, compact_files, plan_compaction, +}; use super::*; @@ -67,8 +75,7 @@ pub struct CleanupPolicyOptions { } /// Why `optimize` did not compact a table. Typed so callers branch on the -/// reason rather than sniffing a string. One variant today, gated by -/// [`LANCE_SUPPORTS_BLOB_COMPACTION`]. +/// reason rather than sniffing a string. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[non_exhaustive] pub enum SkipReason { @@ -76,6 +83,12 @@ pub enum SkipReason { /// `BlobHandling::AllBinary`, which mis-decodes blob-v2 columns; see /// [`LANCE_SUPPORTS_BLOB_COMPACTION`] and `docs/dev/lance.md`. BlobColumnsUnsupportedByLance, + /// The Lance dataset HEAD is ahead of the version recorded in + /// `__manifest`, and no recovery sidecar covers that movement. `optimize` + /// cannot infer whether the drift is benign maintenance or an external + /// semantic write, so it leaves the table untouched and points operators at + /// explicit `repair`. + DriftNeedsRepair, } impl SkipReason { @@ -84,6 +97,7 @@ impl SkipReason { pub fn as_str(&self) -> &'static str { match self { SkipReason::BlobColumnsUnsupportedByLance => "blob_columns_unsupported_by_lance", + SkipReason::DriftNeedsRepair => "drift_needs_repair", } } } @@ -95,6 +109,7 @@ impl std::fmt::Display for SkipReason { SkipReason::BlobColumnsUnsupportedByLance => { "blob columns — Lance compaction unsupported" } + SkipReason::DriftNeedsRepair => "manifest/head drift — run omnigraph repair", }; f.write_str(msg) } @@ -111,11 +126,18 @@ pub struct TableOptimizeStats { pub fragments_removed: usize, /// Number of new, larger fragments Lance produced. pub fragments_added: usize, - /// Did this table get a new Lance manifest version from the compaction? + /// Did this table get a new manifest version from the compaction? True when + /// compaction ran and its compacted version was published to `__manifest`. pub committed: bool, /// `Some(reason)` if this table was deliberately not compacted. When set, /// `fragments_removed == 0`, `fragments_added == 0`, and `!committed`. pub skipped: Option, + /// Manifest table version observed by optimize for drift skips. `None` for + /// normal compaction/no-op/blob skips. + pub manifest_version: Option, + /// Lance HEAD version observed by optimize for drift skips. `None` for + /// normal compaction/no-op/blob skips. + pub lance_head_version: Option, } impl TableOptimizeStats { @@ -127,6 +149,8 @@ impl TableOptimizeStats { fragments_added: metrics.fragments_added, committed, skipped: None, + manifest_version: None, + lance_head_version: None, } } @@ -138,6 +162,25 @@ impl TableOptimizeStats { fragments_added: 0, committed: false, skipped: Some(reason), + manifest_version: None, + lance_head_version: None, + } + } + + /// Stat for a table skipped because the manifest and Lance HEAD disagree. + fn skipped_for_drift( + table_key: String, + manifest_version: u64, + lance_head_version: u64, + ) -> Self { + Self { + table_key, + fragments_removed: 0, + fragments_added: 0, + committed: false, + skipped: Some(SkipReason::DriftNeedsRepair), + manifest_version: Some(manifest_version), + lance_head_version: Some(lance_head_version), } } } @@ -153,14 +196,30 @@ pub struct TableCleanupStats { pub error: Option, } -/// Run Lance `compact_files` on every node + edge table on `main`. -/// Tables run in parallel (bounded concurrency). +/// Run Lance `compact_files` on every node + edge table on `main`, publishing +/// each compacted table's new version to the `__manifest`. Tables run in +/// parallel (bounded concurrency); each is fault-isolated only at the Lance +/// level — a publish error is propagated (the recovery sidecar covers it). pub async fn optimize_all_tables(db: &Omnigraph) -> Result> { db.ensure_schema_state_valid().await?; db.ensure_schema_apply_idle("optimize").await?; - let resolved = db.resolved_branch_target(None).await?; - let snapshot = resolved.snapshot; + // Refuse on an unrecovered graph. A pending recovery sidecar means a failed + // write left partial state that the open-time sweep must resolve (roll + // forward/back) first; compacting + publishing a table covered by such a + // sidecar could commit a partial write the sweep would roll back. Reopen the + // graph to run recovery, then re-run optimize. + if !crate::db::manifest::list_sidecars(db.root_uri(), db.storage_adapter()) + .await? + .is_empty() + { + return Err(OmniError::manifest_conflict( + "optimize requires a clean recovery state; reopen the graph to run the \ + recovery sweep before optimizing", + )); + } + + let snapshot = db.fresh_snapshot_for_branch(None).await?; // Compute per-table state (path + whether it has blob columns) up front, in // a scope that drops the catalog handle before the async stream starts. @@ -183,49 +242,201 @@ pub async fn optimize_all_tables(db: &Omnigraph) -> Result> = futures::stream::iter(table_tasks.into_iter()) - .map(|(table_key, full_path, has_blob)| async move { - // Lance `compact_files` mis-decodes blob-v2 columns under the forced - // `BlobHandling::AllBinary` read (see LANCE_SUPPORTS_BLOB_COMPACTION). - // Skip blob-bearing tables and report it rather than aborting the - // whole sweep — the other tables still compact. - if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION { - tracing::warn!( - target: "omnigraph::optimize", - table = %table_key, - "skipping compaction: table has blob columns the current Lance \ - cannot rewrite (blob-v2 AllBinary decode bug); other tables \ - unaffected — rerun after the Lance fix", - ); - return Ok(TableOptimizeStats::skipped( - table_key, - SkipReason::BlobColumnsUnsupportedByLance, - )); - } - let mut ds = table_store - .open_dataset_head_for_write(&table_key, &full_path, None) - .await?; - let version_before = ds.version().version; - let metrics: CompactionMetrics = - compact_files(&mut ds, CompactionOptions::default(), None) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - let version_after = ds.version().version; - Ok(TableOptimizeStats::compacted( - table_key, - &metrics, - version_after != version_before, - )) + .map(move |(table_key, full_path, has_blob)| async move { + optimize_one_table(db, table_key, full_path, has_blob).await }) .buffer_unordered(concurrency) .collect() .await; + // Invalidate caches for any table that published a compaction — done BEFORE + // propagating a sibling table's error, since the published versions are + // durable and reads must observe the new fragment layout (Lance invalidates + // the original row addresses on rewrite). The CSR/CSC graph topology index + // is rebuilt only when an edge table moved. Mirrors schema_apply's + // post-publish invalidation. + let any_committed = stats + .iter() + .any(|s| matches!(s, Ok(st) if st.committed)); + let edge_committed = stats + .iter() + .any(|s| matches!(s, Ok(st) if st.committed && st.table_key.starts_with("edge:"))); + if any_committed { + db.runtime_cache.invalidate_all().await; + if edge_committed { + db.invalidate_graph_index().await; + } + } + stats.into_iter().collect() } +/// Compact one table and publish the compacted version to the `__manifest`. +/// +/// Compaction (`compact_files`) advances the *dataset's* Lance HEAD via a +/// reserve-fragments + rewrite commit, but Lance knows nothing about the +/// `__manifest`. To keep the manifest the single authority for each table's +/// visible version (invariant 2), optimize must publish the compacted version. +/// The Lance-HEAD-before-manifest-publish gap is unavoidable (Lance has no +/// staged/uncommitted compaction), so it is covered by a recovery sidecar like +/// the other multi-commit writers; roll-forward is always safe because +/// compaction is content-preserving. +async fn optimize_one_table( + db: &Omnigraph, + table_key: String, + full_path: String, + has_blob: bool, +) -> Result { + // Lance `compact_files` mis-decodes blob-v2 columns under the forced + // `BlobHandling::AllBinary` read (see LANCE_SUPPORTS_BLOB_COMPACTION). Skip + // blob-bearing tables before acquiring the write queue; `repair` is the + // operator tool for full manifest/head drift classification. + if has_blob && !LANCE_SUPPORTS_BLOB_COMPACTION { + tracing::warn!( + target: "omnigraph::optimize", + table = %table_key, + "skipping compaction: table has blob columns the current Lance \ + cannot rewrite (blob-v2 AllBinary decode bug); other tables \ + unaffected — rerun after the Lance fix", + ); + return Ok(TableOptimizeStats::skipped( + table_key, + SkipReason::BlobColumnsUnsupportedByLance, + )); + } + + // Serialize the whole compact→publish against concurrent mutations on this + // (table, main): compaction is a Rewrite op that retryable-conflicts with a + // concurrent Merge/Update/Delete on overlapping fragments, and an + // interleaved write would also move the manifest version out from under the + // CAS below. Holding the queue makes the CAS baseline read under it exact. + let _guard = db + .write_queue() + .acquire_many(&[(table_key.clone(), None)]) + .await; + + let mut ds = db + .table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?; + + // CAS baseline: the table's current manifest version, read under the queue + // (in-memory coordinator snapshot, no storage I/O — stable for this section). + let expected_version = db + .fresh_snapshot_for_branch(None) + .await? + .entry(&table_key) + .map(|e| e.table_version) + .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + + let lance_head_version = ds.version().version; + if lance_head_version < expected_version { + return Err(OmniError::manifest_internal(format!( + "table '{}' Lance HEAD version {} is behind manifest version {}", + table_key, lance_head_version, expected_version + ))); + } + if lance_head_version > expected_version { + tracing::warn!( + target: "omnigraph::optimize", + table = %table_key, + manifest_version = expected_version, + lance_head_version, + "skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \ + to classify and publish covered maintenance drift explicitly", + ); + return Ok(TableOptimizeStats::skipped_for_drift( + table_key, + expected_version, + lance_head_version, + )); + } + + // Precise "will it compact?" check — `plan_compaction` also accounts for + // deletion materialization (which can rewrite even a single fragment). A + // steady-state already-compacted table yields an empty plan and is never + // pinned in a sidecar (a zero-commit pin would classify NoMovement on + // recovery and force an all-or-nothing rollback). Uncovered pre-existing + // drift is skipped above and must go through explicit repair. + let options = CompactionOptions::default(); + let plan = plan_compaction(&ds, &options) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + if plan.num_tasks() == 0 { + return Ok(TableOptimizeStats::compacted( + table_key, + &CompactionMetrics::default(), + false, + )); + } + + // Phase A: recovery sidecar BEFORE compaction advances the Lance HEAD, so a + // crash before the manifest publish rolls forward on next open. + let sidecar = crate::db::manifest::new_sidecar( + crate::db::manifest::SidecarKind::Optimize, + None, + // optimize is system-attributed (no `optimize_as` actor API today). + None, + vec![crate::db::manifest::SidecarTablePin { + table_key: table_key.clone(), + table_path: full_path.clone(), + expected_version, + // Lower bound — compaction commits N≥1 versions (reserve + rewrite); + // the classifier loose-matches SidecarKind::Optimize. + post_commit_pin: expected_version + 1, + table_branch: None, + }], + ); + let handle = + crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?; + + // Phase B: compaction (reserve-fragments + rewrite commits advance HEAD). + let version_before = ds.version().version; + let metrics: CompactionMetrics = compact_files(&mut ds, options, None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let version_after = ds.version().version; + let committed = version_after != version_before; + + // Pin the per-writer Phase B → Phase C residual for optimize: Lance HEAD has + // advanced but the manifest publish below hasn't run. + crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?; + + // Phase C: publish the compacted version to the manifest (one CAS commit, + // expected = the version observed under the queue). On failure the sidecar + // is intentionally left for the open-time recovery sweep to roll forward. + if committed { + let state = db.table_store.table_state(&full_path, &ds).await?; + let update = crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }; + let mut expected = std::collections::HashMap::new(); + expected.insert(table_key.clone(), expected_version); + db.coordinator + .write() + .await + .commit_updates_with_actor_with_expected(&[update], &expected, None) + .await?; + } + + // Phase D: delete the sidecar (best-effort; recovery resolves a leftover). + if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await { + tracing::warn!( + error = %err, + operation_id = handle.operation_id.as_str(), + "optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it" + ); + } + + Ok(TableOptimizeStats::compacted(table_key, &metrics, committed)) +} + /// Run Lance `cleanup_old_versions` on every node + edge table on `main`, /// using [`CleanupPolicyOptions`]. The latest manifest is always preserved /// regardless (Lance invariant). @@ -485,7 +696,7 @@ fn orphan_branches(present: Vec, keep: &std::collections::HashSet Vec { +pub(super) fn all_table_keys(catalog: &omnigraph_compiler::catalog::Catalog) -> Vec { let mut keys: Vec = catalog .node_types .keys() diff --git a/crates/omnigraph/src/db/omnigraph/repair.rs b/crates/omnigraph/src/db/omnigraph/repair.rs new file mode 100644 index 0000000..aaef2ba --- /dev/null +++ b/crates/omnigraph/src/db/omnigraph/repair.rs @@ -0,0 +1,332 @@ +//! Explicit repair for uncovered manifest/head drift. +//! +//! Recovery sidecars handle deterministic crash residuals automatically. This +//! module is for the different case: a table's Lance HEAD is ahead of the +//! version recorded in `__manifest` and there is no sidecar encoding writer +//! intent. `repair` classifies that uncovered drift from Lance transactions and +//! only auto-publishes maintenance-only drift when the operator confirms. + +use std::collections::HashMap; + +use lance::Dataset; +use lance::dataset::transaction::Operation; + +use super::*; + +/// Options for [`Omnigraph::repair`]. +#[derive(Debug, Clone, Copy, Default)] +pub struct RepairOptions { + /// Preview by default. With `confirm`, verified maintenance drift is + /// published to `__manifest`. + pub confirm: bool, + /// Also publish suspicious/unverifiable drift. Requires `confirm`. + pub force: bool, +} + +/// Classification of a table's manifest/head state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum RepairClassification { + /// Lance HEAD equals the manifest pin. + NoDrift, + /// Every uncovered Lance transaction is maintenance-only (`Rewrite` or + /// `ReserveFragments`), so publishing the HEAD is content-preserving. + VerifiedMaintenance, + /// At least one uncovered transaction is semantic (`Append`, `Delete`, + /// `Update`, etc.). + Suspicious, + /// A needed transaction could not be read, so the drift cannot be judged. + Unverifiable, +} + +impl RepairClassification { + /// Stable machine-readable token for serialized output. + pub fn as_str(&self) -> &'static str { + match self { + Self::NoDrift => "no_drift", + Self::VerifiedMaintenance => "verified_maintenance", + Self::Suspicious => "suspicious", + Self::Unverifiable => "unverifiable", + } + } +} + +impl std::fmt::Display for RepairClassification { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// What repair did for a table. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum RepairAction { + /// Nothing to do. + NoOp, + /// Drift was reported but not published because this was a preview. + Preview, + /// Verified maintenance drift was published to `__manifest`. + Healed, + /// Suspicious/unverifiable drift was published because `force` was set. + Forced, + /// Drift was left untouched because it was not safe to publish without + /// `force`. + Refused, +} + +impl RepairAction { + /// Stable machine-readable token for serialized output. + pub fn as_str(&self) -> &'static str { + match self { + Self::NoOp => "no_op", + Self::Preview => "preview", + Self::Healed => "healed", + Self::Forced => "forced", + Self::Refused => "refused", + } + } +} + +impl std::fmt::Display for RepairAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Per-table repair outcome. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct TableRepairStats { + pub table_key: String, + pub manifest_version: u64, + pub lance_head_version: u64, + pub classification: RepairClassification, + pub action: RepairAction, + pub operations: Vec, + pub error: Option, +} + +/// Whole-graph repair outcome. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct RepairStats { + pub tables: Vec, + /// New graph manifest version if repair published any table pins. + pub manifest_version: Option, +} + +struct ClassificationResult { + classification: RepairClassification, + operations: Vec, + error: Option, +} + +pub async fn repair_all_tables(db: &Omnigraph, options: RepairOptions) -> Result { + if options.force && !options.confirm { + return Err(OmniError::manifest("repair --force requires --confirm")); + } + + db.ensure_schema_state_valid().await?; + db.ensure_schema_apply_idle("repair").await?; + ensure_no_pending_recovery_sidecars(db, "repair").await?; + + let snapshot = db.fresh_snapshot_for_branch(None).await?; + let table_tasks: Vec<(String, String)> = { + let catalog = db.catalog(); + let mut tasks = Vec::new(); + for table_key in optimize::all_table_keys(&catalog) { + let Some(entry) = snapshot.entry(&table_key) else { + continue; + }; + let full_path = format!("{}/{}", db.root_uri, entry.table_path); + tasks.push((table_key, full_path)); + } + tasks + }; + + if table_tasks.is_empty() { + return Ok(RepairStats { + tables: Vec::new(), + manifest_version: None, + }); + } + + let queue_keys: Vec<(String, Option)> = table_tasks + .iter() + .map(|(table_key, _)| (table_key.clone(), None)) + .collect(); + let _guards = db.write_queue().acquire_many(&queue_keys).await; + ensure_no_pending_recovery_sidecars(db, "repair").await?; + + let snapshot = db.fresh_snapshot_for_branch(None).await?; + let mut tables = Vec::with_capacity(table_tasks.len()); + let mut updates = Vec::new(); + let mut expected = HashMap::new(); + let mut any_forced = false; + + for (table_key, full_path) in table_tasks { + let ds = db + .table_store + .open_dataset_head_for_write(&table_key, &full_path, None) + .await?; + let manifest_version = snapshot + .entry(&table_key) + .map(|e| e.table_version) + .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + let lance_head_version = ds.version().version; + + if lance_head_version < manifest_version { + return Err(OmniError::manifest_internal(format!( + "table '{}' Lance HEAD version {} is behind manifest version {}", + table_key, lance_head_version, manifest_version + ))); + } + + if lance_head_version == manifest_version { + tables.push(TableRepairStats { + table_key, + manifest_version, + lance_head_version, + classification: RepairClassification::NoDrift, + action: RepairAction::NoOp, + operations: Vec::new(), + error: None, + }); + continue; + } + + let classification = classify_drift(&ds, manifest_version, lance_head_version).await; + let action = match ( + options.confirm, + options.force, + classification.classification, + ) { + (false, _, _) => RepairAction::Preview, + (true, _, RepairClassification::VerifiedMaintenance) => RepairAction::Healed, + (true, true, RepairClassification::Suspicious | RepairClassification::Unverifiable) => { + any_forced = true; + RepairAction::Forced + } + (true, _, RepairClassification::Suspicious | RepairClassification::Unverifiable) => { + RepairAction::Refused + } + (true, _, RepairClassification::NoDrift) => RepairAction::NoOp, + }; + + if matches!(action, RepairAction::Healed | RepairAction::Forced) { + let state = db.table_store.table_state(&full_path, &ds).await?; + updates.push(crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata, + }); + expected.insert(table_key.clone(), manifest_version); + } + + tables.push(TableRepairStats { + table_key, + manifest_version, + lance_head_version, + classification: classification.classification, + action, + operations: classification.operations, + error: classification.error, + }); + } + + let manifest_version = if updates.is_empty() { + None + } else { + let actor = if any_forced { + Some("omnigraph:repair:force") + } else { + Some("omnigraph:repair") + }; + let PublishedSnapshot { + manifest_version, + _snapshot_id: _, + } = db + .coordinator + .write() + .await + .commit_updates_with_actor_with_expected(&updates, &expected, actor) + .await?; + db.runtime_cache.invalidate_all().await; + if updates + .iter() + .any(|update| update.table_key.starts_with("edge:")) + { + db.invalidate_graph_index().await; + } + Some(manifest_version) + }; + + Ok(RepairStats { + tables, + manifest_version, + }) +} + +async fn ensure_no_pending_recovery_sidecars(db: &Omnigraph, operation: &str) -> Result<()> { + if !crate::db::manifest::list_sidecars(db.root_uri(), db.storage_adapter()) + .await? + .is_empty() + { + return Err(OmniError::manifest_conflict(format!( + "{operation} requires a clean recovery state; reopen the graph to run the \ + recovery sweep before repairing" + ))); + } + Ok(()) +} + +async fn classify_drift( + ds: &Dataset, + manifest_version: u64, + lance_head_version: u64, +) -> ClassificationResult { + let mut operations = Vec::new(); + let mut saw_suspicious = false; + let mut error = None; + + for version in manifest_version.saturating_add(1)..=lance_head_version { + match ds.read_transaction_by_version(version).await { + Ok(Some(transaction)) => { + let operation = transaction.operation; + operations.push(operation.name().to_string()); + if !matches!( + operation, + Operation::Rewrite { .. } | Operation::ReserveFragments { .. } + ) { + saw_suspicious = true; + } + } + Ok(None) => { + error = Some(format!("missing Lance transaction for version {version}")); + break; + } + Err(err) => { + error = Some(format!( + "failed to read Lance transaction for version {version}: {err}" + )); + break; + } + } + } + + let classification = if error.is_some() { + RepairClassification::Unverifiable + } else if saw_suspicious { + RepairClassification::Suspicious + } else { + RepairClassification::VerifiedMaintenance + }; + + ClassificationResult { + classification, + operations, + error, + } +} diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index eb6c4a3..0e6434b 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -697,7 +697,7 @@ fn update_unique_constraints( if any_null { continue; } - let value = parts.join("|"); + let value = crate::loader::composite_unique_key(&parts); let row_id = row_id_at(batch, row)?; if let Some(first_row_id) = seen.insert(value.clone(), row_id.clone()) { conflicts.push(MergeConflict { diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 02b2a21..0e7ded7 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -569,7 +569,8 @@ use super::staging::{MutationStaging, PendingMode}; /// via `open_for_mutation_on_branch`, which compares Lance HEAD against /// the manifest's pinned version — that fence is the engine's /// publisher-style OCC catching cross-writer drift before we make any -/// changes. +/// changes. For delete-only queries, this strict open is also the uncovered +/// drift guard that runs before `delete_where` can inline-commit. /// /// On subsequent touches *within the same query*, behavior depends on /// whether the table has already been inline-committed by a delete op: @@ -904,12 +905,12 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; crate::loader::validate_value_constraints(&batch, node_type)?; crate::loader::validate_enum_constraints(&batch, &node_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_node(node_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &batch, type_name, - &unique_props, + &unique_groups, )?; } let has_key = node_type.key_property().is_some(); @@ -945,12 +946,12 @@ impl Omnigraph { let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?; crate::loader::validate_enum_constraints(&batch, &edge_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_edge(edge_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_edge(edge_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &batch, type_name, - &unique_props, + &unique_groups, )?; } let table_key = format!("edge:{}", type_name); @@ -1093,12 +1094,12 @@ impl Omnigraph { let node_type = &self.catalog().node_types[type_name]; crate::loader::validate_value_constraints(&updated, node_type)?; crate::loader::validate_enum_constraints(&updated, &node_type.properties, type_name)?; - let unique_props = crate::loader::unique_property_names_for_node(node_type); - if !unique_props.is_empty() { + let unique_groups = crate::loader::unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { crate::loader::enforce_unique_constraints_intra_batch( &updated, type_name, - &unique_props, + &unique_groups, )?; } diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 0d26fd3..264ab59 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -495,25 +495,21 @@ impl StagedMutation { // until `ensure_path` learns how to bump expected_version on // op-kind upgrade. // - // Why per-branch (and not the bound-branch `db.snapshot()`): - // when the caller mutates a branch other than the engine's - // bound branch (e.g., feature-branch ingest from a server - // handle bound to main), `db.snapshot()` returns the bound - // branch's view of each table — which is the wrong pin for - // the publisher's CAS on a different branch. Using - // `snapshot_for_branch(branch)` resolves the per-branch - // entries correctly. The cost is one fresh manifest read per - // mutation; PR 1b's regression came from this same read, but - // that read is now strictly necessary for cross-branch - // correctness. Single-table same-branch mutations could still - // skip this read (queue exclusivity makes the publisher CAS a - // no-op), but the conditional adds complexity for marginal - // gain — left as a follow-up perf optimization. + // Why a fresh per-branch snapshot (and not the bound-branch + // `db.snapshot()` / `snapshot_for_branch()` fast path): a stale + // engine handle may be bound to the same branch it is writing. For + // non-strict Insert/Merge, that stale local view is allowed to rebase + // to the live manifest pin under the queue; only uncovered Lance + // HEAD>manifest drift is refused. For writes targeting a branch other + // than the engine's bound branch (e.g., feature-branch ingest from a + // server handle bound to main), the same helper also resolves the + // correct branch pin. The cost is one fresh manifest read per mutation + // plus one Lance HEAD open per staged table for the drift guard below. // // Multi-coordinator deployments (§VI.27 aspirational) get // genuine cross-process drift detection from this read for // free. - let snapshot = db.snapshot_for_branch(branch).await?; + let snapshot = db.fresh_snapshot_for_branch(branch).await?; for entry in staged.iter_mut() { let current = snapshot .entry(&entry.table_key) @@ -541,6 +537,35 @@ impl StagedMutation { )); } + // Separate manifest-visible concurrency from uncovered Lance drift. + // Non-strict inserts/merges are allowed to rebase from their staged + // read version to the fresh manifest pin above, but only if the + // live Lance HEAD still equals that manifest pin. If an external + // raw Lance write or a pre-fix maintenance path moved HEAD without + // publishing `__manifest`, this write must not silently fold it. + let head = db + .table_store() + .open_dataset_head_for_write( + &entry.table_key, + &entry.path.full_path, + entry.path.table_branch.as_deref(), + ) + .await? + .version() + .version; + if head < current { + return Err(OmniError::manifest_internal(format!( + "table '{}' Lance HEAD version {} is behind manifest version {}", + entry.table_key, head, current + ))); + } + if head > current { + return Err(OmniError::manifest_conflict(format!( + "table '{}' has Lance HEAD version {} ahead of manifest version {}; run `omnigraph repair` before writing", + entry.table_key, head, current + ))); + } + entry.expected_version = current; expected_versions.insert(entry.table_key.clone(), current); } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index d5d74c0..9a80b39 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -399,9 +399,9 @@ async fn load_jsonl_reader( let batch = build_node_batch(node_type, rows)?; validate_value_constraints(&batch, node_type)?; validate_enum_constraints(&batch, &node_type.properties, type_name)?; - let unique_props = unique_property_names_for_node(node_type); - if !unique_props.is_empty() { - enforce_unique_constraints_intra_batch(&batch, type_name, &unique_props)?; + let unique_groups = unique_constraint_groups_for_node(node_type); + if !unique_groups.is_empty() { + enforce_unique_constraints_intra_batch(&batch, type_name, &unique_groups)?; } let loaded_count = batch.num_rows(); let table_key = format!("node:{}", type_name); @@ -510,9 +510,9 @@ async fn load_jsonl_reader( let edge_type = &catalog.edge_types[edge_name]; let batch = build_edge_batch(edge_type, rows)?; validate_enum_constraints(&batch, &edge_type.properties, edge_name)?; - let unique_props = unique_property_names_for_edge(edge_type); - if !unique_props.is_empty() { - enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_props)?; + let unique_groups = unique_constraint_groups_for_edge(edge_type); + if !unique_groups.is_empty() { + enforce_unique_constraints_intra_batch(&batch, edge_name, &unique_groups)?; } let loaded_count = batch.num_rows(); let table_key = format!("edge:{}", edge_name); @@ -1425,8 +1425,16 @@ pub(crate) fn validate_enum_constraints( Ok(()) } -/// Detect duplicate values within a single `RecordBatch` for any of the named -/// `unique_properties`. Returns an error on the first duplicate found. +/// Detect duplicate values within a single `RecordBatch` for any of the +/// `unique_constraints` groups. Each group is a list of one or more columns +/// that together form a uniqueness key: a violation occurs when two rows share +/// the same tuple of values across *all* columns in a group, so a composite +/// `@unique(a, b)` only conflicts when both `a` and `b` match. Returns an +/// error on the first duplicate found. +/// +/// Rows where any column in a group is null are exempt (standard SQL semantics +/// for uniqueness over nullable columns), as is any group whose columns are +/// not all present in the batch (e.g. a partial-schema load). /// /// Note: this only catches duplicates *within* the batch. Cross-batch /// uniqueness against already-committed rows is not enforced here — that @@ -1434,22 +1442,39 @@ pub(crate) fn validate_enum_constraints( pub(crate) fn enforce_unique_constraints_intra_batch( batch: &RecordBatch, type_name: &str, - unique_properties: &[String], + unique_constraints: &[Vec], ) -> Result<()> { - for property in unique_properties { - let Some(col_idx) = batch.schema().index_of(property).ok() else { + for columns in unique_constraints { + let Some(col_indices) = columns + .iter() + .map(|name| batch.schema().index_of(name).ok()) + .collect::>>() + else { continue; }; - let arr = batch.column(col_idx); let mut seen: HashMap = HashMap::new(); for row in 0..batch.num_rows() { - let Some(value) = scalar_to_string(arr, row) else { + let mut parts = Vec::with_capacity(col_indices.len()); + let mut any_null = false; + for &col_idx in &col_indices { + let Some(value) = scalar_to_string(batch.column(col_idx), row) else { + any_null = true; + break; + }; + parts.push(value); + } + if any_null { continue; - }; + } + let value = composite_unique_key(&parts); if let Some(prev_row) = seen.insert(value.clone(), row) { return Err(OmniError::manifest(format!( "@unique violation on {}.{}: value '{}' appears in rows {} and {}", - type_name, property, value, prev_row, row + type_name, + format_unique_columns(columns), + value, + prev_row, + row ))); } } @@ -1457,6 +1482,27 @@ pub(crate) fn enforce_unique_constraints_intra_batch( Ok(()) } +/// Join one row's rendered, non-null column values into a single composite +/// uniqueness key. The separator is the unit separator (U+001F) — a control +/// char highly unlikely to occur in real data, so distinct tuples like +/// `("a|b", "c")` and `("a", "b|c")` stay distinct rather than colliding. +/// +/// Shared by the intake path (`enforce_unique_constraints_intra_batch`) and +/// the branch-merge path (`exec/merge.rs::update_unique_constraints`) so the +/// two cannot silently drift to incompatible keyings. +pub(crate) fn composite_unique_key(parts: &[String]) -> String { + parts.join("\u{1f}") +} + +/// Render a unique constraint's columns for error messages: a single column +/// as `col`, a composite as `(a, b)`. +fn format_unique_columns(columns: &[String]) -> String { + match columns { + [single] => single.clone(), + _ => format!("({})", columns.join(", ")), + } +} + /// Reduce a single Arrow scalar at (`array`, `row`) to a `String` for /// uniqueness comparison. Returns `None` for null values (nulls are exempt /// from uniqueness in standard SQL semantics). @@ -1498,39 +1544,30 @@ fn scalar_to_string(array: &ArrayRef, row: usize) -> Option { None } -/// Build the flat list of property names that must be checked for uniqueness -/// on a node type. Includes both `@unique` properties (from -/// `NodeType.unique_constraints`) and the `@key` (which implies uniqueness). -pub(crate) fn unique_property_names_for_node( +/// Build the list of uniqueness constraint groups to enforce on a node type. +/// Each group is the column tuple of one constraint. Includes every +/// `@unique(...)` constraint (from `NodeType.unique_constraints`) and the +/// `@key` (which implies uniqueness over its column tuple). Grouping is +/// preserved so a composite `@unique(a, b)` is enforced as a composite key +/// rather than degraded into independent single-field checks. +pub(crate) fn unique_constraint_groups_for_node( node_type: &omnigraph_compiler::catalog::NodeType, -) -> Vec { - let mut props: Vec = node_type - .unique_constraints - .iter() - .flatten() - .cloned() - .collect(); - if let Some(key) = &node_type.key { - props.extend(key.iter().cloned()); +) -> Vec> { + let mut groups: Vec> = node_type.unique_constraints.clone(); + if let Some(key) = &node_type.key + && !groups.contains(key) + { + groups.push(key.clone()); } - props.sort(); - props.dedup(); - props + groups } -/// Same as [`unique_property_names_for_node`] but for an edge type. -pub(crate) fn unique_property_names_for_edge( +/// Same as [`unique_constraint_groups_for_node`] but for an edge type (edges +/// have no `@key`). +pub(crate) fn unique_constraint_groups_for_edge( edge_type: &omnigraph_compiler::catalog::EdgeType, -) -> Vec { - let mut props: Vec = edge_type - .unique_constraints - .iter() - .flatten() - .cloned() - .collect(); - props.sort(); - props.dedup(); - props +) -> Vec> { + edge_type.unique_constraints.clone() } fn extract_numeric_value(col: &ArrayRef, row: usize) -> Option { diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 10123b0..4b52db6 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -732,7 +732,7 @@ impl TableStore { // before the FirstSeen setter has a chance to silently collapse // anything): // - Load path: `enforce_unique_constraints_intra_batch` - // (`loader/mod.rs:1453`) errors on intra-batch `@key` dups. + // (`loader/mod.rs:1471`) errors on intra-batch `@key` dups. // - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`) // accumulates and dedupes by `id`. // - Branch-merge path: `compute_source_delta` / diff --git a/crates/omnigraph/tests/branching.rs b/crates/omnigraph/tests/branching.rs index 5a0c47d..108702c 100644 --- a/crates/omnigraph/tests/branching.rs +++ b/crates/omnigraph/tests/branching.rs @@ -39,6 +39,26 @@ query insert_user($name: String, $email: String) { } "#; +const EDGE_UNIQUE_SCHEMA: &str = r#" +node Person { + name: String @key +} + +edge Knows: Person -> Person { + @unique(src, dst) +} +"#; + +const EDGE_UNIQUE_DATA: &str = r#"{"type":"Person","data":{"name":"Alice"}} +{"type":"Person","data":{"name":"Bob"}} +{"type":"Person","data":{"name":"Carol"}}"#; + +const EDGE_UNIQUE_MUTATIONS: &str = r#" +query add_knows($from: String, $to: String) { + insert Knows { from: $from, to: $to } +} +"#; + const CARDINALITY_SCHEMA: &str = r#" node Person { name: String @key @@ -1119,6 +1139,87 @@ async fn branch_merge_reports_unique_violation_conflict() { } } +/// Regression for the MR-983 follow-up: the branch-merge path must enforce an +/// edge composite `@unique(src, dst)` as a true composite key, consistent with +/// the intake path. Two branches inserting the *same* (src, dst) pair must +/// conflict on merge. +#[tokio::test] +async fn branch_merge_reports_composite_unique_violation_conflict() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await; + main.branch_create("feature").await.unwrap(); + + let mut feature = Omnigraph::open(uri).await.unwrap(); + + mutate_main( + &mut main, + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Bob")]), + ) + .await + .unwrap(); + + mutate_branch( + &mut feature, + "feature", + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Bob")]), + ) + .await + .unwrap(); + + let err = main.branch_merge("feature", "main").await.unwrap_err(); + match err { + OmniError::MergeConflicts(conflicts) => { + assert!(conflicts.iter().any(|conflict| { + conflict.table_key == "edge:Knows" + && conflict.kind == MergeConflictKind::UniqueViolation + })); + } + other => panic!("expected merge conflicts, got {other:?}"), + } +} + +/// Sibling to the above: pairs sharing `src` but differing on `dst` are unique +/// on the (src, dst) tuple and must merge cleanly. Guards against the composite +/// degrading back into a single-field `@unique(src)` on the merge path. +#[tokio::test] +async fn branch_merge_allows_distinct_composite_unique_pairs() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut main = init_db_from_schema_and_data(&dir, EDGE_UNIQUE_SCHEMA, EDGE_UNIQUE_DATA).await; + main.branch_create("feature").await.unwrap(); + + let mut feature = Omnigraph::open(uri).await.unwrap(); + + mutate_main( + &mut main, + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Bob")]), + ) + .await + .unwrap(); + + mutate_branch( + &mut feature, + "feature", + EDGE_UNIQUE_MUTATIONS, + "add_knows", + ¶ms(&[("$from", "Alice"), ("$to", "Carol")]), + ) + .await + .unwrap(); + + main.branch_merge("feature", "main") + .await + .expect("distinct (src, dst) pairs are unique on the composite and must merge cleanly"); + assert_eq!(count_rows(&main, "edge:Knows").await, 2); +} + #[tokio::test] async fn branch_merge_reports_cardinality_violation_conflict() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/tests/composite_flow.rs b/crates/omnigraph/tests/composite_flow.rs index 6c720da..dd41310 100644 --- a/crates/omnigraph/tests/composite_flow.rs +++ b/crates/omnigraph/tests/composite_flow.rs @@ -294,21 +294,19 @@ async fn composite_flow_canonical_lifecycle() { ); // ───────────────────────────────────────────────────────────────── - // Step 10: optimize the post-merge graph — verify indices stay - // valid and queryable. + // Step 10: optimize the post-merge graph — verify compaction is + // published to the manifest (so the manifest pin tracks the compacted + // Lance HEAD), indices stay valid and queryable, and a post-optimize + // strict write commits. // - // **Known limitation**: `optimize_all_tables` calls Lance - // `compact_files` directly — it advances per-table Lance HEAD - // without updating the omnigraph `__manifest` pin. After optimize, - // the next writer's expected_table_versions captures the - // pre-optimize manifest pin, but the publisher's pre-check reads - // a higher version from the manifest dataset (because some other - // path — possibly schema-state recovery on reopen — wrote a newer - // __manifest row). The `ExpectedVersionMismatch` is benign - // (re-issuing the mutation after a snapshot refresh succeeds), but - // a composite test cannot reliably exercise post-optimize mutations - // until that path is investigated. Coverage of post-optimize - // mutations is left to a focused optimize+cleanup integration test. + // This step used to carry a "Known limitation": `optimize_all_tables` + // ran Lance `compact_files` without publishing the new version to + // `__manifest`, so the manifest pin lagged the Lance HEAD and the next + // strict write / schema apply failed with `ExpectedVersionMismatch` + // ("stale view … refresh and retry") — so post-optimize mutations were + // deliberately omitted here. optimize now publishes the compacted + // version, and this flow exercises exactly that previously-failing + // write below. // ───────────────────────────────────────────────────────────────── let optimize_stats = db.optimize().await.unwrap(); assert!( @@ -331,6 +329,28 @@ async fn composite_flow_canonical_lifecycle() { "row counts unchanged by optimize" ); + // A strict update on a compacted table is exactly the write that + // failed with "stale view" before optimize published its compaction. + // It must now commit (Alice is one of the seed Persons; an update + // leaves the row count at 6). + let post_optimize_update = mutate_main( + &mut db, + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "Alice")], &[("$age", 41)]), + ) + .await + .expect("post-optimize strict update must commit — optimize published the manifest"); + assert_eq!( + post_optimize_update.affected_nodes, 1, + "post-optimize update must affect exactly Alice" + ); + assert_eq!( + count_rows(&db, "node:Person").await, + 6, + "an update must not change the Person row count" + ); + // ───────────────────────────────────────────────────────────────── // Step 11: cleanup — keep last 10 versions, only purge versions // older than 1 hour. With this small test, we have well under 10 @@ -373,14 +393,27 @@ async fn composite_flow_canonical_lifecycle() { branches, ); - // Final query exercise — full read path works post-reopen, - // post-cleanup. Post-cleanup mutation is omitted here pending - // resolution of the optimize-vs-manifest-pin interaction documented - // in Step 10. + // Final exercise — full read AND write path works post-reopen, + // post-cleanup. (The post-cleanup mutation was previously omitted + // pending resolution of the optimize-vs-manifest-pin interaction in + // Step 10; that is now fixed, so a strict write here must commit.) let final_total = query_main(&mut db, TEST_QUERIES, "total_people", &ParamMap::default()) .await .unwrap(); assert!(!final_total.batches().is_empty()); + + let post_reopen_update = mutate_main( + &mut db, + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "Alice")], &[("$age", 42)]), + ) + .await + .expect("post-reopen, post-cleanup strict update must commit"); + assert_eq!( + post_reopen_update.affected_nodes, 1, + "post-reopen update must affect exactly Alice" + ); } /// Cross-handle sequence that exercises operations after a schema_apply diff --git a/crates/omnigraph/tests/consistency.rs b/crates/omnigraph/tests/consistency.rs index 26517db..729f2e8 100644 --- a/crates/omnigraph/tests/consistency.rs +++ b/crates/omnigraph/tests/consistency.rs @@ -188,7 +188,7 @@ node Thing { /// /// Defense in depth: /// 1. The loader's `enforce_unique_constraints_intra_batch` -/// (`loader/mod.rs:1453`), invoked unconditionally on any node type +/// (`loader/mod.rs:1471`), invoked unconditionally on any node type /// with a `@key`, errors on intra-batch duplicate `@key` values at /// intake — pinned by this test across every `LoadMode`. /// 2. The `check_batch_unique_by_keys` precondition at the top of @@ -229,6 +229,57 @@ node Thing { } } +/// Regression for MR-983: a node-level composite `@unique(a, b)` must be +/// enforced as a true composite key, not degraded into independent +/// single-field checks. Pre-fix, `unique_property_names_for_node` flattened +/// every constraint group into one property list, so `@unique(source, +/// external_id)` was enforced as `@unique(source)` *and* `@unique(external_id)` +/// — rejecting rows that were unique on the composite key and naming only the +/// first field in the error. +#[tokio::test] +async fn loader_enforces_composite_unique_as_composite_key() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let schema = r#" +node ExternalID { + slug: String @key + source: String @index + external_id: String @index + @unique(source, external_id) +} +"#; + let mut db = Omnigraph::init(uri, schema).await.unwrap(); + + // Same `source`, different `external_id` → unique on the composite key. + // This is the exact repro from MR-983 and must be accepted. + let composite_ok = r#"{"type":"ExternalID","data":{"slug":"a","source":"whatsapp","external_id":"+E.164"}} +{"type":"ExternalID","data":{"slug":"b","source":"whatsapp","external_id":"pn:12345"}} +"#; + load_jsonl(&mut db, composite_ok, LoadMode::Overwrite) + .await + .expect("rows unique on the composite (source, external_id) must be accepted"); + assert_eq!(count_rows(&db, "node:ExternalID").await, 2); + + // Both composite columns equal → genuine violation. The error must name + // the whole composite, not just the first field. + let composite_dupe = r#"{"type":"ExternalID","data":{"slug":"c","source":"whatsapp","external_id":"dup"}} +{"type":"ExternalID","data":{"slug":"d","source":"whatsapp","external_id":"dup"}} +"#; + let err = load_jsonl(&mut db, composite_dupe, LoadMode::Overwrite) + .await + .unwrap_err(); + let msg = err.to_string(); + // Columns are canonicalized to sorted order in the catalog, so the + // message reads `(external_id, source)`; assert order-agnostically that + // both composite columns are named (not just the first, as pre-fix). + assert!( + msg.contains("@unique violation") + && msg.contains("source") + && msg.contains("external_id"), + "composite violation must name both columns (got: {msg})" + ); +} + /// Canary for the upstream Lance gap that the `FirstSeen` workaround /// in `table_store.rs` masks. The bug class is "Window 2": load → /// indices built explicitly → merge → merge. Even with the engine diff --git a/crates/omnigraph/tests/end_to_end.rs b/crates/omnigraph/tests/end_to_end.rs index a0fdb0e..ea11d0e 100644 --- a/crates/omnigraph/tests/end_to_end.rs +++ b/crates/omnigraph/tests/end_to_end.rs @@ -1933,3 +1933,87 @@ query docs_with_tag($tag: String) { "contains-pushdown should return exactly the rows whose tags list contains 'red'" ); } + +// ─── Maintenance in the full lifecycle: optimize (compaction) ──────────────── + +/// `optimize` (Lance compaction) is part of a realistic graph lifecycle: it +/// advances the Lance HEAD and publishes the compacted version to the manifest. +/// The rest of the flow must keep working across that boundary — reads observe +/// the compacted data, strict updates (which check Lance HEAD == manifest +/// version) still commit, inserts still commit, and the state survives a reopen +/// (the open-time recovery sweep finds no leftover drift). Before optimize +/// published its compaction, the manifest lagged the Lance HEAD here and the +/// post-optimize update below failed with "stale view ... refresh and retry". +#[tokio::test] +async fn full_flow_optimize_then_query_update_and_reopen() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut db = init_and_load(&dir).await; + + // Build several Person fragments so compaction has something to merge. + for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42)] { + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age)]), + ) + .await + .unwrap(); + } + + let stats = db.optimize().await.unwrap(); + assert!( + stats.iter().any(|s| s.committed), + "a multi-fragment table should have compacted in this flow" + ); + + // Reads observe the compacted data. + let qr = query_main( + &mut db, + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", "Alice")]), + ) + .await + .unwrap(); + assert_eq!(qr.num_rows(), 1); + + // Strict update after optimize commits (previously failed with "stale view" + // because the manifest lagged the compacted Lance HEAD). + let upd = mutate_main( + &mut db, + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "Alice")], &[("$age", 31)]), + ) + .await + .unwrap(); + assert_eq!(upd.affected_nodes, 1); + + // Insert after optimize also commits. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Ivan")], &[("$age", 50)]), + ) + .await + .unwrap(); + assert_eq!(count_rows(&db, "node:Person").await, 8); // 4 seed + Eve/Frank/Grace + Ivan + + // State survives a reopen — the recovery sweep runs and finds no drift. + drop(db); + let reopened = Omnigraph::open(&uri).await.unwrap(); + assert_eq!(count_rows(&reopened, "node:Person").await, 8); + let alice = reopened + .entity_at_target(ReadTarget::branch("main"), "node:Person", "Alice") + .await + .unwrap() + .unwrap(); + assert_eq!( + alice["age"], + serde_json::json!(31), + "Alice's post-optimize age update must persist across reopen" + ); +} diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 149c63a..d240108 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -1245,7 +1245,7 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() { // the rollback (will use Dataset::restore safely; no concurrent // writers at open time). drop(db); - let _db = Omnigraph::open(&uri).await.unwrap(); + let db = Omnigraph::open(&uri).await.unwrap(); // After full-sweep recovery, the sidecar should be processed // (deleted). Sidecar's tables are eligible for rollback (UnexpectedAtP1): // restore happens on Person (HEAD advances by 1). @@ -1268,6 +1268,19 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() { "full sweep must run Dataset::restore (head advances); \ post_head={post_head}, final_head={final_head}", ); + // Convergence: roll-back published the restored HEAD, so the manifest pin + // tracks Lance HEAD afterward (no residual drift). + let entry_version = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap() + .entry("node:Person") + .unwrap() + .table_version; + assert_eq!( + entry_version, final_head, + "full-sweep roll-back must publish so manifest pin ({entry_version}) == Lance HEAD ({final_head})", + ); } /// Companion to the above — confirms that a finalize→publisher failure @@ -1461,10 +1474,15 @@ edge WorksAt: Person -> Company } let db = Omnigraph::open(&uri).await.unwrap(); - assert_eq!( - version_main(&db).await.unwrap(), - pre_failure_version, - "manifest must remain on the old schema when no schema staging files existed" + // Roll-back now publishes the restored version, so the manifest version + // advances — but to the OLD-schema content: the migration never applied + // (asserted by count_rows + the `_schema.pg` checks below), and the sweep + // converges (`manifest == Lance HEAD`, asserted by + // assert_post_recovery_invariants's RolledBack arm). + assert!( + version_main(&db).await.unwrap() > pre_failure_version, + "roll-back publishes the restored (old-schema) version, advancing the manifest; \ + pre={pre_failure_version}", ); assert_eq!( helpers::count_rows(&db, "node:Person").await, @@ -1637,6 +1655,100 @@ edge WorksAt: Person -> Company ); } +/// `optimize` Phase B → Phase C residual: `compact_files` advanced the Lance +/// HEAD but the manifest publish hasn't run. The `Optimize` recovery sidecar +/// (loose-match, like SchemaApply/EnsureIndices) must roll the compacted version +/// forward on next open so the manifest tracks the Lance HEAD — and the healed +/// table must then accept a schema apply (the original bug's victim). +#[tokio::test] +async fn optimize_phase_b_failure_recovered_on_next_open() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let operation_id; + + // Seed: several separate Person inserts → multiple fragments, so compaction + // has real work and advances the Lance HEAD. + { + let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] { + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age)]), + ) + .await + .unwrap(); + } + } + + let pre_failure_version = { + let db = Omnigraph::open(&uri).await.unwrap(); + version_main(&db).await.unwrap() + }; + + // Failpoint fires AFTER compact_files advanced the Lance HEAD but BEFORE the + // manifest publish. The Optimize sidecar persists (only node:Person has + // compactable fragments, so exactly one sidecar is written). + { + let db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = + ScopedFailPoint::new("optimize.post_phase_b_pre_manifest_commit", "return"); + let err = db.optimize().await.unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: optimize.post_phase_b_pre_manifest_commit"), + "unexpected error: {err}" + ); + + let recovery_dir = dir.path().join("__recovery"); + let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert_eq!( + sidecars.len(), + 1, + "exactly one Optimize sidecar must persist after optimize failure" + ); + operation_id = single_sidecar_operation_id(dir.path()); + } + + // Recovery: reopen runs the sweep. The Optimize sidecar classifies + // RolledPastExpected (loose-match) → RollForward → manifest extends to the + // compacted Lance HEAD. + let db = Omnigraph::open(&uri).await.unwrap(); + let post_recovery_version = version_main(&db).await.unwrap(); + assert!( + post_recovery_version > pre_failure_version, + "manifest version must advance post-recovery (compaction rolled forward); \ + pre={pre_failure_version}, post={post_recovery_version}", + ); + drop(db); + + assert_post_recovery_invariants( + dir.path(), + &operation_id, + RecoveryExpectation::RolledForward { + tables: vec![TableExpectation::main("node:Person")], + }, + ) + .await + .unwrap(); + + // The healed table accepts an additive schema apply — its HEAD-vs-manifest + // precondition is satisfied because recovery published the compacted version. + let db = Omnigraph::open(&uri).await.unwrap(); + let desired = helpers::TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + db.apply_schema(&desired) + .await + .expect("schema apply after optimize recovery must succeed"); +} + #[tokio::test] async fn branch_merge_phase_b_failure_recovered_on_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; diff --git a/crates/omnigraph/tests/helpers/recovery.rs b/crates/omnigraph/tests/helpers/recovery.rs index c76009e..90d9a25 100644 --- a/crates/omnigraph/tests/helpers/recovery.rs +++ b/crates/omnigraph/tests/helpers/recovery.rs @@ -181,6 +181,9 @@ pub async fn assert_post_recovery_invariants( "audit row for {operation_id} recorded the wrong recovery_kind", ); assert_rollback_outcomes_record_drift(&audit); + // Roll-back now publishes the restored HEAD, so manifest == Lance + // HEAD afterward (symmetric with roll-forward) — no residual drift. + assert_manifest_pins_match_lance_heads(graph_root, &tables).await?; assert_recovery_commit_shape(graph_root, &audit, &tables).await?; assert_non_main_did_not_move_main(graph_root, &tables).await?; assert_idempotent_reopen(graph_root, operation_id).await?; diff --git a/crates/omnigraph/tests/lance_surface_guards.rs b/crates/omnigraph/tests/lance_surface_guards.rs index 1d60c08..65efc4e 100644 --- a/crates/omnigraph/tests/lance_surface_guards.rs +++ b/crates/omnigraph/tests/lance_surface_guards.rs @@ -30,6 +30,7 @@ use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; use lance::dataset::builder::DatasetBuilder; use lance::dataset::optimize::{CompactionOptions, compact_files}; +use lance::dataset::transaction::Operation; use lance::dataset::write::delete::DeleteResult; use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams}; use lance_file::version::LanceFileVersion; @@ -222,6 +223,33 @@ async fn _compile_compact_files_signature() -> lance::Result<()> { Ok(()) } +// --- Guard 7b: transaction history exposes repair's classification surface - +// +// `db/omnigraph/repair.rs` reads Lance transactions between manifest and HEAD +// and treats only `ReserveFragments` + `Rewrite` as safe maintenance drift. +// Compile-only. + +#[allow( + dead_code, + unreachable_code, + unused_variables, + unused_mut, + clippy::diverging_sub_expression +)] +async fn _compile_transaction_history_for_repair_signature() -> lance::Result<()> { + let ds: Dataset = unimplemented!(); + let tx = ds.read_transaction_by_version(1u64).await?; + if let Some(tx) = tx { + let operation = tx.operation; + let _name: &str = operation.name(); + match operation { + Operation::Rewrite { .. } | Operation::ReserveFragments { .. } => {} + _ => {} + } + } + Ok(()) +} + // --- Guard 8: Dataset::delete returns DeleteResult { new_dataset, num_deleted_rows } --- // // `table_store.rs::delete_where` consumes both fields. When MR-A migrates @@ -329,7 +357,10 @@ async fn compact_files_still_fails_on_blob_columns() { ])); RecordBatch::try_new( schema, - vec![Arc::new(StringArray::from(ids)) as _, Arc::new(content) as _], + vec![ + Arc::new(StringArray::from(ids)) as _, + Arc::new(content) as _, + ], ) .unwrap() } diff --git a/crates/omnigraph/tests/maintenance.rs b/crates/omnigraph/tests/maintenance.rs index 3e61677..13c9de7 100644 --- a/crates/omnigraph/tests/maintenance.rs +++ b/crates/omnigraph/tests/maintenance.rs @@ -8,10 +8,16 @@ mod helpers; use std::time::Duration; use lance::Dataset; -use omnigraph::db::{CleanupPolicyOptions, Omnigraph, SkipReason}; +use lance::dataset::optimize::{CompactionOptions, compact_files}; +use omnigraph::db::{ + CleanupPolicyOptions, Omnigraph, ReadTarget, RepairAction, RepairClassification, RepairOptions, + SkipReason, +}; use omnigraph::loader::{LoadMode, load_jsonl}; -use helpers::{TEST_DATA, TEST_SCHEMA, count_rows, init_and_load}; +use helpers::{ + MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, count_rows, init_and_load, mixed_params, mutate_main, +}; /// Filesystem URI of a node sub-table, mirroring the engine's layout /// (FNV-1a of the type name under `nodes/`). Matches the helper in @@ -25,11 +31,64 @@ fn node_table_uri(root: &str, type_name: &str) -> String { format!("{}/nodes/{hash:016x}", root.trim_end_matches('/')) } +async fn person_manifest_and_head(db: &Omnigraph, root: &str) -> (u64, u64, String) { + let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let entry = snap.entry("node:Person").unwrap(); + let full = format!("{}/{}", root.trim_end_matches('/'), entry.table_path); + let head = Dataset::open(&full).await.unwrap().version().version; + (entry.table_version, head, full) +} + +async fn add_person_fragments(db: &mut Omnigraph) { + for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42), ("Heidi", 43)] { + mutate_main( + db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age as i64)]), + ) + .await + .expect("insert"); + } +} + +async fn forge_person_compaction_drift(db: &mut Omnigraph, root: &str) -> (u64, u64, String) { + add_person_fragments(db).await; + let (manifest_version, _, full) = person_manifest_and_head(db, root).await; + let mut ds = Dataset::open(&full).await.unwrap(); + let metrics = compact_files(&mut ds, CompactionOptions::default(), None) + .await + .expect("raw Lance compaction"); + let lance_head_version = ds.version().version; + assert!( + lance_head_version > manifest_version, + "raw Lance compaction should advance HEAD beyond manifest" + ); + assert!( + metrics.fragments_removed > 0 || metrics.fragments_added > 0, + "test precondition: raw compaction should rewrite fragments" + ); + (manifest_version, lance_head_version, full) +} + +async fn forge_person_delete_drift(db: &Omnigraph, root: &str) -> (u64, u64, String) { + let (manifest_version, _, full) = person_manifest_and_head(db, root).await; + let mut ds = Dataset::open(&full).await.unwrap(); + let deleted = ds.delete("name = 'Alice'").await.expect("raw Lance delete"); + assert_eq!(deleted.num_deleted_rows, 1, "fixture should delete Alice"); + let lance_head_version = deleted.new_dataset.version().version; + assert!( + lance_head_version > manifest_version, + "raw Lance delete should advance HEAD beyond manifest" + ); + (manifest_version, lance_head_version, full) +} + #[tokio::test] async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); - let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); let stats = db.optimize().await.unwrap(); @@ -45,7 +104,7 @@ async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() { #[tokio::test] async fn optimize_after_load_then_again_is_idempotent() { let dir = tempfile::tempdir().unwrap(); - let mut db = init_and_load(&dir).await; + let db = init_and_load(&dir).await; // First pass may compact (load wrote real fragments). let _first = db.optimize().await.unwrap(); @@ -163,6 +222,404 @@ node Tag {\n slug: String @key\n}\n"; assert_eq!(tag.skipped, None, "non-blob table must not be skipped"); } +// Regression: `optimize` must publish its compaction to the `__manifest` so the +// manifest's recorded `table_version` tracks the compacted Lance HEAD. +// +// Lance `compact_files` advances the *dataset's* version (reserve-fragments + +// rewrite commits) but knows nothing about OmniGraph's `__manifest`. If optimize +// does not publish a manifest update, the manifest's `table_version` lags the +// Lance HEAD: reads stay pinned to the pre-compaction version (compaction is +// invisible to them) and any subsequent schema apply / strict update/delete +// fails its HEAD-vs-manifest precondition with +// "stale view of '': expected manifest table version X but current is Y". +// This pins the fix — optimize publishes the compacted version, so manifest == +// HEAD and migrations after a compaction succeed. +#[tokio::test] +async fn optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + + // Several separate inserts → multiple Person fragments, so `compact_files` + // actually merges and moves the Lance HEAD (a single fragment is a no-op). + for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42), ("Heidi", 43)] { + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age as i64)]), + ) + .await + .expect("insert"); + } + + let stats = db.optimize().await.unwrap(); + let person = stats + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person stat present"); + assert!( + person.committed, + "Person is multi-fragment, so optimize must have compacted it" + ); + + // After optimize, the manifest's recorded table_version must equal the actual + // Lance HEAD — optimize published its compaction, so there is no drift. + let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let entry = snap.entry("node:Person").unwrap(); + let manifest_version = entry.table_version; + let full = format!("{}/{}", root, entry.table_path); + let lance_head = Dataset::open(&full).await.unwrap().version().version; + assert_eq!( + manifest_version, lance_head, + "after optimize, manifest table_version ({manifest_version}) must equal Lance HEAD ({lance_head})", + ); + + // Reads observe the compacted version with rows preserved (4 seed + 4 inserts). + assert_eq!(count_rows(&db, "node:Person").await, 8); + + // The headline: an additive (nullable property) migration touching the + // just-compacted table succeeds, where it previously failed with "stale view". + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + let result = db + .apply_schema(&desired) + .await + .expect("additive schema apply after optimize must succeed"); + assert!(result.applied, "schema apply should report applied=true"); +} + +#[tokio::test] +async fn optimize_skips_preexisting_manifest_head_drift() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let stats = db.optimize().await.unwrap(); + let person = stats + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person stat present"); + assert_eq!(person.skipped, Some(SkipReason::DriftNeedsRepair)); + assert!(!person.committed); + assert_eq!(person.manifest_version, Some(manifest_before)); + assert_eq!(person.lance_head_version, Some(head_before)); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!( + manifest_after, manifest_before, + "optimize must not publish uncovered drift" + ); + assert_eq!( + head_after, head_before, + "optimize must not move drifted HEAD" + ); +} + +#[tokio::test] +async fn repair_preview_reports_verified_maintenance_drift_without_healing() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: false, + force: false, + }) + .await + .unwrap(); + assert_eq!(stats.manifest_version, None); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!( + person.classification, + RepairClassification::VerifiedMaintenance + ); + assert_eq!(person.action, RepairAction::Preview); + assert_eq!(person.manifest_version, manifest_before); + assert_eq!(person.lance_head_version, head_before); + assert!( + person + .operations + .iter() + .all(|op| op == "ReserveFragments" || op == "Rewrite"), + "maintenance drift should only include Lance maintenance operations: {:?}", + person.operations + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!(head_after, head_before); +} + +#[tokio::test] +async fn repair_confirm_heals_verified_maintenance_drift() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (_, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: true, + force: false, + }) + .await + .unwrap(); + assert!( + stats.manifest_version.is_some(), + "confirmed repair should publish one manifest commit" + ); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!( + person.classification, + RepairClassification::VerifiedMaintenance + ); + assert_eq!(person.action, RepairAction::Healed); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, head_before); + assert_eq!(head_after, head_before); + + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + let result = db + .apply_schema(&desired) + .await + .expect("strict schema apply should succeed after repair"); + assert!(result.applied); +} + +#[tokio::test] +async fn repair_refuses_raw_delete_without_force() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_delete_drift(&db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: true, + force: false, + }) + .await + .unwrap(); + assert_eq!(stats.manifest_version, None); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!(person.classification, RepairClassification::Suspicious); + assert_eq!(person.action, RepairAction::Refused); + assert!( + person.operations.iter().any(|op| op == "Delete"), + "raw Lance delete should be reported as a suspicious operation: {:?}", + person.operations + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!(head_after, head_before); + assert_eq!( + count_rows(&db, "node:Person").await, + 4, + "manifest-pinned reads should still see the pre-delete version" + ); +} + +#[tokio::test] +async fn repair_force_heals_suspicious_drift() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let db = init_and_load(&dir).await; + let (_, head_before, _) = forge_person_delete_drift(&db, &root).await; + + let stats = db + .repair(RepairOptions { + confirm: true, + force: true, + }) + .await + .unwrap(); + let person = stats + .tables + .iter() + .find(|s| s.table_key == "node:Person") + .expect("Person repair stat present"); + assert_eq!(person.classification, RepairClassification::Suspicious); + assert_eq!(person.action, RepairAction::Forced); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, head_before); + assert_eq!(head_after, head_before); + assert_eq!( + count_rows(&db, "node:Person").await, + 3, + "forced repair publishes the raw delete's HEAD" + ); +} + +#[tokio::test] +async fn non_strict_load_refuses_uncovered_drift_before_folding_it() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let err = load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Ivan\",\"age\":44}}", + LoadMode::Merge, + ) + .await + .expect_err("merge load must not silently fold uncovered drift"); + assert!( + err.to_string().contains("omnigraph repair"), + "error should point at explicit repair; got: {err}" + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!(head_after, head_before); +} + +#[tokio::test] +async fn delete_only_mutation_refuses_uncovered_drift_before_inline_commit() { + let dir = tempfile::tempdir().unwrap(); + let root = dir + .path() + .to_str() + .unwrap() + .trim_end_matches('/') + .to_string(); + let mut db = init_and_load(&dir).await; + let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await; + + let err = mutate_main( + &mut db, + MUTATION_QUERIES, + "remove_person", + &mixed_params(&[("$name", "Alice")], &[]), + ) + .await + .expect_err("strict delete must reject uncovered drift before delete_where"); + assert!( + err.to_string().contains("expected"), + "delete should fail as a strict stale-version write; got: {err}" + ); + + let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await; + assert_eq!(manifest_after, manifest_before); + assert_eq!( + head_after, head_before, + "delete_where must not run after the strict drift guard fails" + ); + assert_eq!( + count_rows(&db, "node:Person").await, + 8, + "manifest-pinned reads should still see all rows present before the failed delete" + ); +} + +// Regression: `optimize` must REFUSE when an unresolved recovery sidecar is +// pending. Operating on an unrecovered graph could publish a partial write that +// the all-or-nothing recovery sweep would roll back; the operator must reopen +// (run the recovery sweep) first. +#[tokio::test] +async fn optimize_defers_when_recovery_sidecar_is_pending() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let db = init_and_load(&dir).await; + + // Simulate an in-process failed write that left a recovery sidecar on disk. + let recovery_dir = dir.path().join("__recovery"); + std::fs::create_dir_all(&recovery_dir).unwrap(); + let person_path = node_table_uri(uri, "Person"); + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H000000000000000000DEFR", + "started_at": "0", + "branch": null, + "actor_id": "act-test", + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{}", + "expected_version": 1, + "post_commit_pin": 2 + }} + ] + }}"#, + person_path + ); + std::fs::write( + recovery_dir.join("01H000000000000000000DEFR.json"), + sidecar_json, + ) + .unwrap(); + + let err = db + .optimize() + .await + .expect_err("optimize must defer (error) while a recovery sidecar is pending"); + assert!( + err.to_string().to_lowercase().contains("recovery"), + "optimize defer error should mention recovery; got: {err}", + ); +} + #[tokio::test] async fn cleanup_without_any_policy_option_errors() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index a090178..f6b19e8 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -278,6 +278,97 @@ async fn recovery_rolls_back_synthetic_drift_on_open() { ); } +/// Regression: recovery roll-back must PUBLISH the restored version so +/// `manifest == Lance HEAD` afterward (no residual "orphaned drift"). Before the +/// fix, roll-back restored via `Dataset::restore` but left the manifest pin +/// behind HEAD, so a subsequent strict write / schema apply failed its +/// HEAD-vs-manifest precondition ("stale view … refresh and retry") — and a +/// failed schema apply's own roll-back leaked +1 each retry (the original bug's +/// loop). With convergence, one roll-back leaves `manifest == HEAD` and the +/// follow-up succeeds. +#[tokio::test] +async fn recovery_rollback_converges_manifest_so_schema_apply_succeeds() { + use omnigraph::db::ReadTarget; + use omnigraph::loader::{LoadMode, load_jsonl}; + use omnigraph::table_store::TableStore; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +{"type":"Person","data":{"name":"bob","age":25}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + drop(db); + + // Forge a Phase-B residual: advance Person's Lance HEAD without publishing to + // the manifest (the manifest pin stays at the load's committed version). + let person_uri = node_table_uri(uri, "Person"); + let store = TableStore::new(uri); + let mut ds = Dataset::open(&person_uri).await.unwrap(); + let manifest_pin = ds.version().version; + let _ = store + .delete_where(&person_uri, &mut ds, "1 = 2") + .await + .unwrap(); + drop(ds); + + // Roll-back-classified sidecar (post_commit_pin != observed head ⇒ + // UnexpectedAtP1 ⇒ RollBack). + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H0000000000000000000CVG", + "started_at": "0", + "branch": null, + "actor_id": "act-test", + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{}", + "expected_version": {}, + "post_commit_pin": {} + }} + ] + }}"#, + person_uri, manifest_pin, manifest_pin + ); + write_sidecar_file(dir.path(), "01H0000000000000000000CVG", &sidecar_json); + + // Reopen runs the sweep: restore Person to manifest_pin, then PUBLISH so the + // manifest tracks the restored Lance HEAD. + let db = Omnigraph::open(uri).await.unwrap(); + + // Convergence: manifest pin == Lance HEAD. Fails before the fix — the + // manifest stays at manifest_pin while HEAD advanced past it. + let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let entry = snap.entry("node:Person").unwrap(); + let lance_head = Dataset::open(&person_uri).await.unwrap().version().version; + assert_eq!( + entry.table_version, lance_head, + "roll-back must publish so manifest pin ({}) == Lance HEAD ({})", + entry.table_version, lance_head, + ); + + // The +1-loop victim: an additive schema apply must now succeed (its + // HEAD-vs-manifest precondition is satisfied). Before the fix this failed + // with "stale view … refresh and retry". + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n nickname: String?\n}", + ); + db.apply_schema(&desired) + .await + .expect("schema apply after a converging roll-back must succeed"); +} + // ===================================================================== // Phase 4 — roll-forward path + audit row recording // ===================================================================== diff --git a/crates/omnigraph/tests/writes.rs b/crates/omnigraph/tests/writes.rs index 0a309c9..d76ad46 100644 --- a/crates/omnigraph/tests/writes.rs +++ b/crates/omnigraph/tests/writes.rs @@ -6,8 +6,8 @@ //! What this file covers: //! - No `__run__*` branches are created by load or mutate. //! - Cancellation of a mutation future leaves no graph-level state. -//! - Concurrent writers to the same table land exactly one publish; the -//! loser surfaces `ManifestConflictDetails::ExpectedVersionMismatch`. +//! - Concurrent non-strict inserts/merges rebase under the per-table queue; +//! strict updates/deletes surface `ExpectedVersionMismatch` on stale state. //! - Failed mutations and loads leave the target unchanged. //! - Multi-statement mutations are atomic (one commit per query). //! - actor_id propagates through to the commit graph. @@ -17,7 +17,7 @@ mod helpers; use arrow_array::Array; use omnigraph::db::commit_graph::CommitGraph; use omnigraph::db::{Omnigraph, ReadTarget}; -use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError}; +use omnigraph::error::OmniError; use omnigraph::loader::{LoadMode, load_jsonl}; use helpers::*; @@ -241,18 +241,11 @@ async fn partial_failure_leaves_target_queryable_and_unblocks_next_mutation() { assert_eq!(frank.num_rows(), 1, "Frank must be visible after publish"); } -/// Concurrent writers to the same `(table, branch)` produce exactly one -/// success and one `ExpectedVersionMismatch`. The replacement for the old -/// `concurrent_conflicting_run_publish_fails_cleanly` test — the OCC fence -/// has moved from a graph-level run-publish merge into the publisher's -/// per-table CAS. -/// -/// Drives the race by interleaving two handles that captured the same -/// pre-write manifest snapshot: A commits first; B's commit then sees -/// `expected_versions[node:Person] = pre` while the manifest is at -/// `pre + 1`, and the publisher rejects. +/// Stale non-strict writers rebase to the live manifest pin under the +/// per-table queue instead of folding raw drift or returning a false 409. +/// Strict update/delete semantics are covered by the consistency/server tests. #[tokio::test] -async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() { +async fn stale_non_strict_insert_rebases_to_live_manifest_pin() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_string_lossy().into_owned(); @@ -281,40 +274,30 @@ async fn concurrent_writers_one_succeeds_one_gets_expected_version_mismatch() { .unwrap(); } - // Writer B's coordinator is still at the pre-A snapshot. Its mutation - // captures expected_versions[node:Person] = pre (stale), then publishes - // — the publisher's CAS pre-check sees the manifest is now at post and - // rejects with ExpectedVersionMismatch. - let result_b = db_b - .mutate( - "main", - MUTATION_QUERIES, - "insert_person", - &mixed_params(&[("$name", "WriterB")], &[("$age", 42)]), - ) - .await; + // Writer B's coordinator is still at the pre-A snapshot, but Insert is + // non-strict: commit_all re-reads the live manifest pin under the queue, + // verifies Lance HEAD equals that pin, and then lets Lance rebase the + // staged append. + db_b.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "WriterB")], &[("$age", 42)]), + ) + .await + .unwrap(); - let err = result_b.expect_err("stale writer must hit ExpectedVersionMismatch"); - let OmniError::Manifest(manifest_err) = err else { - panic!("expected Manifest error, got {err:?}"); - }; - assert_eq!(manifest_err.kind, ManifestErrorKind::Conflict); - let Some(ManifestConflictDetails::ExpectedVersionMismatch { - ref table_key, - expected, - actual, - }) = manifest_err.details - else { - panic!( - "expected ExpectedVersionMismatch, got {:?}", - manifest_err.details, - ); - }; - assert_eq!(table_key, "node:Person"); - assert!( - actual > expected, - "actual ({actual}) should be ahead of expected ({expected})", - ); + for name in ["WriterA", "WriterB"] { + let person = query_main( + &mut db_b, + TEST_QUERIES, + "get_person", + ¶ms(&[("$name", name)]), + ) + .await + .unwrap(); + assert_eq!(person.num_rows(), 1, "{name} should be visible"); + } } /// The cancellation hole that motivated removing the Run state machine: dropping a mutation future diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 5ee4f17..b29d740 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -139,6 +139,20 @@ them explicit. Remove the skip when the upstream Lance fix lands — the `lance_surface_guards.rs::compact_files_still_fails_on_blob_columns` guard turns red on that bump to force it. +- **Manifest→commit-graph publish atomicity:** a graph commit advances + `__manifest` (the visibility authority) and then appends `_graph_commits` as + two separate writes (`commit_updates_with_actor_with_expected`, failpoint + `graph_publish.before_commit_append`). A crash between them leaves the manifest + at version N with no commit-graph row for N. Live reads and durability are + unaffected — the live version resolves via the manifest + (`GraphCoordinator::version()`), not the commit-graph head — and the open-time + recovery sweep does NOT repair it (`lance_head == manifest_pinned` classifies + `NoMovement`; a recovery sidecar would not change this). Impact is bounded to + commit history: `commit list` misses N, time-travel by commit id to N fails, + and merge-base loses a node (a likely-benign off-by-one re-merge). This affects + every publish, not a specific maintenance command. Eventual fix: make the + commit graph reconcilable from the manifest (or the two writes atomic) — not a + recovery-sidecar concern. - **Planner capability/stat surfaces:** cost-aware planning, complete capability advertisement, and explain-with-cost are roadmap. Do not describe them as implemented. diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 1035d84..d3bba9a 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -21,7 +21,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `end_to_end.rs` | Full init → load → query/mutate flow | | `branching.rs` | Branch create / list / delete, lazy fork | | `merge_truth_table.rs` | Merge-pair truth table (MR-786): all 9×9 `(left_op, right_op)` cells from `{noop, addNode, removeNode, addEdge, removeEdge, setProperty, dropProperty, addLabel, removeLabel}`. Adding a new op to `OpVariant` forces a compile error in `build_case` until the new row + column are dispositioned. 36 executable cells run through real `branch_merge` with a structured oracle (`MergeOutcome` / `MergeConflictKind` + graph-state assert); 45 cells involving `dropProperty`/`addLabel`/`removeLabel` are recorded as `Unsupported` until the mutation grammar grows. | -| `writes.rs` | Direct-publish writes: cancellation, concurrent-writer CAS, multi-statement atomicity, MR-794 staged-write rewire (D₂ rejection, insert+update coalesce, multi-append coalesce, partial-failure recovery, load RI/cardinality recovery) | +| `writes.rs` | Direct-publish writes: cancellation, non-strict insert/merge rebase under the per-table queue, strict stale-write conflicts, multi-statement atomicity, MR-794 staged-write rewire (D₂ rejection, insert+update coalesce, multi-append coalesce, partial-failure recovery, load RI/cardinality recovery) | | `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead | | `lifecycle.rs` | Graph lifecycle, schema state | | `point_in_time.rs` | Snapshots, time travel (`snapshot_at_version`, `entity_at`) | @@ -35,10 +35,10 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `s3_storage.rs` | S3-backed graph (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) | | `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior | | `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths | -| `maintenance.rs` | `optimize` (compaction) + `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation | -| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the four per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`). | +| `maintenance.rs` | `optimize` (compaction), `repair` (explicit uncovered-drift publish), and `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation; `optimize` publishes its own compaction (`optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds`), skips pre-existing uncovered drift (`optimize_skips_preexisting_manifest_head_drift`), and refuses to run while a `__recovery` sidecar is pending (`optimize_defers_when_recovery_sidecar_is_pending`); `repair` previews/heals verified maintenance drift, refuses raw semantic drift without `--force`, and forced repair publishes only by explicit operator choice | +| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`). | | `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path | -| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories). | +| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). | ## Fixtures diff --git a/docs/dev/writes.md b/docs/dev/writes.md index 8b692b4..d2c7c7e 100644 --- a/docs/dev/writes.md +++ b/docs/dev/writes.md @@ -157,10 +157,14 @@ are left at `Lance HEAD = manifest_pinned + 1`. **Recovery protocol** (lifecycle of every staged-write writer — `MutationStaging::finalize`, `schema_apply::apply_schema_with_lock`, -`branch_merge_on_current_target`, `ensure_indices_for_branch`): +`branch_merge_on_current_target`, `ensure_indices_for_branch`, +`optimize_all_tables`): 1. **Phase A**: writer writes a sidecar JSON to - `__recovery/{ulid}.json` BEFORE its first `commit_staged`. The + `__recovery/{ulid}.json` BEFORE its first HEAD-advancing commit + (`commit_staged`, or `compact_files` for `optimize_all_tables`, + which advances the Lance HEAD via a reserve-fragments + rewrite + commit rather than a staged write). The sidecar names every `(table_key, table_path, expected_version, post_commit_pin)` it intends to commit + the writer kind + actor_id. @@ -195,8 +199,13 @@ recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: otherwise full open-time recovery rolls them back and refresh-time recovery leaves them for the next read-write open. - Otherwise **roll back**: per-table `Dataset::restore` to the - manifest-pinned table version for that branch. Rollback records the - actual restore target in the audit row's `to_version`. + manifest-pinned table version, then a single `ManifestBatchPublisher::publish` + of the restored HEAD — symmetric with roll-forward, so `manifest == HEAD` + after recovery (no residual drift). This convergence is what lets a + failed-then-retried schema apply succeed instead of failing one version higher + each iteration. The audit row's `to_version` records the logical + rolled-back-to version (`manifest_pinned`); the manifest is published at the + restore commit (`manifest_pinned + 1`, same content). - After a successful roll-forward or roll-back, an audit row is recorded — `_graph_commits.lance` carries a commit tagged `actor_id = "omnigraph:recovery"`, and a sibling diff --git a/docs/releases/v0.6.2.md b/docs/releases/v0.6.2.md new file mode 100644 index 0000000..2504813 --- /dev/null +++ b/docs/releases/v0.6.2.md @@ -0,0 +1,55 @@ +# Omnigraph v0.6.2 + +v0.6.2 is a maintenance-safety release on top of v0.6.1. It tightens the +`optimize` / recovery boundary, adds an explicit repair path for uncovered +manifest/head drift, accepts pretty-printed JSON load input, and updates the +project governance and release automation around those fixes. + +## Highlights + +- **Explicit `omnigraph repair`.** New `repair` CLI support previews uncovered + manifest/head drift by default and reports each table's classification, + action, manifest version, Lance HEAD version, Lance operations, and any + classification error. `--confirm` publishes verified maintenance-only drift; + `--force --confirm` can publish suspicious or unverifiable drift after + operator review. +- **Optimize skips uncovered drift.** `omnigraph optimize` now refuses to + interpret Lance HEAD movement that is ahead of `__manifest` without a recovery + sidecar. Those tables are reported as `skipped: DriftNeedsRepair` and left + untouched until `omnigraph repair` classifies them. +- **Optimize publishes compaction.** Successful compaction now publishes the + compacted Lance version back through the graph manifest and is covered by an + `Optimize` recovery sidecar. A crash after Lance compaction but before + manifest publish converges through the normal recovery sweep instead of + leaving hidden drift. +- **Recovery roll-back convergence.** Recovery roll-back now aligns the + manifest-visible version after restoring a table, closing the residual where + Lance HEAD and `__manifest` could stay out of sync after recovery. +- **Pretty-printed JSON load input.** `load` accepts multi-line JSON objects in + addition to one-object-per-line JSONL, so formatted fixture or export files no + longer need to be minified before import. + +## Operational Notes + +- `repair` requires a clean recovery state. Pending `__recovery` sidecars still + belong to automatic open-time recovery; reopen the graph first, then run + repair if drift remains. +- `repair --confirm` only auto-publishes drift made of Lance maintenance + operations (`Rewrite` and `ReserveFragments`). Semantic operations such as + append, delete, update, and merge are refused unless the operator uses + `--force --confirm`. +- `optimize` remains non-destructive. It still skips blob-bearing tables while + OmniGraph is pinned to the Lance version with the blob-v2 compaction issue. +- No manual on-disk migration is required. Existing graphs open under v0.6.2; + the internal manifest schema stamp remains v3. + +## Docs, Governance, And CI + +- Added issue, discussion, RFC, and pull-request templates plus governance docs + for the external contribution path. +- Regenerated CODEOWNERS tables and adjusted branch-protection docs so code + owners can bypass required PR review where repository rules allow it. +- Trimmed Windows release builds out of per-PR CI and kept Windows packaging on + tag releases. +- Made Homebrew audit diagnostic-only in the release workflow so a flaky audit + cannot block publishing an otherwise valid formula update. diff --git a/docs/user/branches-commits.md b/docs/user/branches-commits.md index 0565186..a4044cb 100644 --- a/docs/user/branches-commits.md +++ b/docs/user/branches-commits.md @@ -58,6 +58,6 @@ Internal or legacy branch refs: ## L2 — Recovery audit trail -The four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) protect their multi-table commits with a sidecar at `__recovery/{ulid}.json` written before Phase B and deleted after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: classify per-table state, decide all-or-nothing per sidecar, roll forward / back, record an audit row. +The five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) protect their multi-table commits with a sidecar at `__recovery/{ulid}.json` written before Phase B and deleted after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: classify per-table state, decide all-or-nothing per sidecar, roll forward / back, record an audit row. Audit rows live in `_graph_commit_recoveries.lance` (sibling to `_graph_commits.lance`) and reference the commit graph by `graph_commit_id`. The linked recovery commit is identified by that same `graph_commit_id`, and `actor_id="omnigraph:recovery"` is stored in `_graph_commit_actors.lance` (joined by `graph_commit_id`) — `_graph_commits.lance` itself does not carry the `actor_id` column. To find recoveries for a specific original actor: `omnigraph commit list --filter actor=omnigraph:recovery`, then join to `_graph_commit_recoveries.lance` by `graph_commit_id` to read `recovery_for_actor`. Schema: see `crates/omnigraph/src/db/recovery_audit.rs`. diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 92ad303..594f983 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -2,7 +2,7 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` schema. For a quick-start guide, see [cli.md](cli.md). -18 top-level command families, 40+ subcommands. Graph commands accept either a positional `URI`, `--uri`, or a `--target ` resolved against `omnigraph.yaml`; `cluster` commands instead use `--config `. +Top-level command families and subcommands. Graph-targeting commands accept either a positional `URI`, `--uri`, or a `--target ` resolved against `omnigraph.yaml`; `cluster` commands use `--config `. ## Top-level commands @@ -17,12 +17,12 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` sc | `export` | dump to JSONL on stdout (`--type T`, `--table K` filters) | | `branch create \| list \| delete \| merge` | branching ops | | `commit list \| show` | inspect commit graph | -| `run list \| show \| publish \| abort` | transactional run ops | | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | | `queries validate \| list` | operate on the server-side stored-query registry (the `queries:` block). `validate` type-checks every stored query against the live schema offline (opens the selected graph; exits non-zero on any breakage), catching schema drift without restarting the server; `list` prints the selected registry's query names, MCP exposure, and typed params. For per-graph registries, pass `--target ` or set `cli.graph`; with no graph selection, `list` shows only top-level `queries:`. Distinct from `lint`, which validates a single `.gq` file | | `cluster validate \| plan \| status` | read-only cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` while briefly holding `__cluster/lock.json`; `status` reads the state ledger. No apply, graph open, live drift scan, server change, or `state.json` mutation occurs in Stage 2A | -| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns; `--json` reports a `skipped` field) | +| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) | +| `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | | `embed` | offline JSONL embedding pipeline | | `policy validate \| test \| explain` | Cedar tooling. Selects `cli.graph`, else `server.graph`, else top-level `policy.file` | diff --git a/docs/user/maintenance.md b/docs/user/maintenance.md index 3628fa0..e69bba3 100644 --- a/docs/user/maintenance.md +++ b/docs/user/maintenance.md @@ -1,15 +1,26 @@ -# Maintenance: Optimize & Cleanup +# Maintenance: Optimize, Repair & Cleanup -`db/omnigraph/optimize.rs`. +`db/omnigraph/optimize.rs` and `db/omnigraph/repair.rs`. ## `optimize_all_tables(db)` — non-destructive -- Lance `compact_files()` on every node + edge table on `main`. -- Rewrites small fragments into fewer large ones; old fragments remain reachable via older manifests. +- Lance `compact_files()` on every node + edge table on `main`, then **publishes the compacted version to the `__manifest`** so the manifest's `table_version` tracks the compacted Lance HEAD. Reads pin the manifest version, so without this publish compaction would be invisible to readers *and* would break the HEAD-vs-manifest precondition of the next schema apply / strict update/delete ("stale view … refresh and retry"). The publish advances the graph version (a system-attributed commit) only for tables that actually compacted. +- Rewrites small fragments into fewer large ones; old fragments remain reachable via older manifests until `cleanup` runs. +- Each table's compact→publish runs under its per-`(table, main)` write queue (serializing with concurrent mutations — compaction is a Lance `Rewrite` op that retryable-conflicts with a concurrent merge/update/delete on overlapping fragments). The Lance-HEAD-before-manifest-publish gap is covered by a `SidecarKind::Optimize` recovery sidecar (loose-match): a crash in that window rolls the compacted version forward on the next `Omnigraph::open` (compaction is content-preserving, so roll-forward is always safe). +- **Requires a recovered graph.** `optimize` refuses (errors) when an unresolved recovery sidecar is present under `__recovery` — operating on an unrecovered graph could publish a partial write the open-time recovery sweep would roll back. Reopen the graph to run the recovery sweep, then re-run `optimize`. +- **Uncovered drift is skipped, not interpreted.** If a table's Lance HEAD is ahead of the version recorded in `__manifest` and no recovery sidecar covers that movement, `optimize` reports `skipped: Some(DriftNeedsRepair)` with the manifest/head versions and leaves the table untouched. Run `omnigraph repair` to classify and explicitly publish that drift. - Bounded by `OMNIGRAPH_MAINTENANCE_CONCURRENCY` (default 8). -- Returns `[TableOptimizeStats { table_key, fragments_removed, fragments_added, committed, skipped }]`. +- Returns `[TableOptimizeStats { table_key, fragments_removed, fragments_added, committed, skipped, manifest_version, lance_head_version }]`. - **Blob tables are skipped.** A table that declares any `Blob` property is not compacted: it is reported with `skipped: Some(BlobColumnsUnsupportedByLance)` (and logged via `tracing::warn`) instead of compacted, and the rest of the sweep proceeds normally. The current Lance `compact_files` mis-decodes blob-v2 columns under its forced `BlobHandling::AllBinary` read; **reads and writes are unaffected** — only compaction is. This is gated by `LANCE_SUPPORTS_BLOB_COMPACTION` (`db/omnigraph/optimize.rs`) and removed when the upstream Lance fix lands (see [docs/dev/lance.md](../dev/lance.md)). Consequence: fragment count and deleted-row space on blob tables are not reclaimed until then; query results are never affected. +## `repair_all_tables(db, options)` — explicit + +- Handles **uncovered manifest/head drift**: a table's Lance HEAD is ahead of the manifest pin and no recovery sidecar records the writer intent. +- Preview by default. `omnigraph repair --json ` reports each table's `classification`, `action`, manifest/head versions, Lance operation names, and any classification error. `--confirm` publishes only verified maintenance drift; if any suspicious or unverifiable table is refused, the CLI prints the per-table output and exits non-zero. `--force --confirm` also publishes suspicious or unverifiable drift after operator review. +- Classifies drift by reading Lance transactions from `manifest_version + 1` through `lance_head_version`. Only `ReserveFragments` and `Rewrite` are verified maintenance. Semantic operations such as `Append`, `Delete`, `Update`, `Merge`, or missing transaction history are not auto-healed. +- Publishes repair by advancing `__manifest` to the existing Lance HEAD; it does **not** rewrite Lance data. If the publish succeeds, normal reads and strict writes use the repaired version. If it fails, no new data-side partial state was created. +- Requires a clean recovery state. Pending `__recovery` sidecars still belong to automatic sidecar recovery, not manual repair. + ## `cleanup_all_tables(db, options)` — destructive - Lance `cleanup_old_versions()` per table. diff --git a/docs/user/storage.md b/docs/user/storage.md index d1c52b5..2c57a92 100644 --- a/docs/user/storage.md +++ b/docs/user/storage.md @@ -94,7 +94,7 @@ flowchart TB - **`nodes/`** and **`edges/`** are sibling directories holding one Lance dataset per declared type. Names are `fnv1a64-hex` of the type name to keep paths fixed-length and case-safe. - **`_graph_commits.lance`** is an L2 dataset that records the graph-level commit DAG, with a paired `_graph_commit_actors.lance` for the actor map. (Pre-v0.4.0 graphs also have inert `_graph_runs.lance` / `_graph_run_actors.lance` from the removed Run state machine; the v2→v3 migration sweeps their stale `__run__*` branches, and the dataset bytes are reclaimed once `delete_prefix` lands.) - **`_graph_commit_recoveries.lance`** — one row per recovery sweep action. Joined to `_graph_commits.lance` by `graph_commit_id`; the linked commit row carries `actor_id=omnigraph:recovery`. Operators correlate recoveries with the original mutations they rolled forward / back via this join. See `crates/omnigraph/src/db/recovery_audit.rs`. -- **`__recovery/{ulid}.json`** — transient sidecar files written by the four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) before Phase B begins, deleted after Phase C succeeds. A sidecar persisting after process exit means the writer crashed in the Phase B → Phase C window; the next `Omnigraph::open` recovery sweep processes it. Steady-state directory is empty. See `crates/omnigraph/src/db/manifest/recovery.rs`. +- **`__recovery/{ulid}.json`** — transient sidecar files written by the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) before Phase B begins, deleted after Phase C succeeds. A sidecar persisting after process exit means the writer crashed in the Phase B → Phase C window; the next `Omnigraph::open` recovery sweep processes it. Steady-state directory is empty. See `crates/omnigraph/src/db/manifest/recovery.rs`. - **`_refs/branches/{name}.json`** is graph-level branch metadata — pointers from a branch name to the manifest version it heads. - **Inside each Lance dataset** (orange): the standard Lance directory layout. `_versions/{n}.manifest` records every commit; `data/` holds the actual Arrow fragments; `_indices/{uuid}/` holds index segments with their own `fragment_bitmap` for partial coverage; `_refs/` holds Lance-native per-dataset branches and tags. diff --git a/openapi.json b/openapi.json index aced64d..335c0bc 100644 --- a/openapi.json +++ b/openapi.json @@ -7,7 +7,7 @@ "name": "MIT", "identifier": "MIT" }, - "version": "0.6.1" + "version": "0.6.2" }, "paths": { "/branches": {