Add maintenance + destructive-migration test coverage

The audit of test coverage flagged three holes:

- `omnigraph optimize` and `omnigraph cleanup` had no integration tests
  (no `maintenance.rs`). Add one covering empty/idempotent edges, the
  policy-validation contract on `cleanup`, and head preservation under
  aggressive policies.
- `apply_schema` only covered I32 -> I64 type-change rejection. Add the
  symmetric narrowing case plus rejections for the other destructive
  shapes (drop property with data, drop node type, drop edge type, add
  required property without backfill) and assert the manifest version
  doesn't advance. Add a positive `@rename_from` case to pin the
  stable-type-id contract preserves rows through a rename.
- `docs/testing.md` was missing `validators.rs` and the new
  `maintenance.rs` from its file table; bump the count and add rows.
This commit is contained in:
Claude 2026-04-28 23:31:52 +00:00 committed by Andrew Altshuler
parent 6914e0256e
commit e22d468e27
3 changed files with 345 additions and 1 deletions

View file

@ -0,0 +1,137 @@
// Maintenance tests: `optimize` (Lance compact_files) and `cleanup`
// (Lance cleanup_old_versions) at the graph level. Covers no-op edges
// (empty repo, already-optimized repo), the policy-validation contract on
// `cleanup`, and the keep-versions cap that protects head.
mod helpers;
use std::time::Duration;
use omnigraph::db::{CleanupPolicyOptions, Omnigraph};
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::{TEST_DATA, TEST_SCHEMA, count_rows, init_and_load};
#[tokio::test]
async fn optimize_on_empty_repo_returns_stats_per_table_with_no_changes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let stats = db.optimize().await.unwrap();
// Schema declares 2 nodes + 2 edges = 4 tables. Compaction should run on
// each but find nothing to merge.
assert_eq!(stats.len(), 4);
for s in &stats {
assert_eq!(s.fragments_removed, 0, "{} should not remove", s.table_key);
assert_eq!(s.fragments_added, 0, "{} should not add", s.table_key);
}
}
#[tokio::test]
async fn optimize_after_load_then_again_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// First pass may compact (load wrote real fragments).
let _first = db.optimize().await.unwrap();
// Second pass should be a no-op: already-compacted repo produces no
// fragments_removed / fragments_added.
let second = db.optimize().await.unwrap();
for s in &second {
assert_eq!(
s.fragments_removed, 0,
"{} re-optimize should be no-op",
s.table_key
);
assert_eq!(
s.fragments_added, 0,
"{} re-optimize should be no-op",
s.table_key
);
assert!(
!s.committed,
"{} re-optimize should not commit a new version",
s.table_key
);
}
}
#[tokio::test]
async fn cleanup_without_any_policy_option_errors() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let err = db
.cleanup(CleanupPolicyOptions::default())
.await
.expect_err("cleanup with no policy options must error");
let msg = format!("{}", err);
assert!(
msg.contains("keep_versions") && msg.contains("older_than"),
"error should name the two policy fields, got: {msg}"
);
}
#[tokio::test]
async fn cleanup_keep_one_preserves_head_and_table_remains_readable() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
assert!(
people_before > 0,
"fixture should seed Person rows for this test to be meaningful"
);
// Most aggressive version-based cleanup short of forcing keep=0. Lance's
// contract is that head is always preserved regardless, so the table
// must remain openable and rows must still be visible.
let _stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, people_before);
}
#[tokio::test]
async fn cleanup_older_than_zero_preserves_head() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// Aggressive policy: every version is "older than zero seconds ago".
// Lance must still preserve the head manifest, so the table is openable
// afterwards and a subsequent load still works.
let _stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: None,
older_than: Some(Duration::from_secs(0)),
})
.await
.unwrap();
// Smoke test: after aggressive cleanup, we can still read and write the
// graph — head wasn't pruned.
load_jsonl(&mut db, TEST_DATA, LoadMode::Merge).await.unwrap();
}
#[tokio::test]
async fn cleanup_then_optimize_succeed_in_sequence() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
db.optimize().await.unwrap();
}

View file

@ -3,6 +3,7 @@ mod helpers;
use std::fs;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::{SchemaMigrationStep, SchemaTypeKind};
use helpers::*;
@ -99,3 +100,207 @@ async fn apply_schema_unsupported_plan_does_not_advance_manifest() {
before_version
);
}
// ─── Destructive / safety-tier rejections ────────────────────────────────────
//
// Schema migration v1 only accepts additive change: add type, add nullable
// property, add index, rename. Every other shape returns an
// `UnsupportedChange` step that surfaces as an error from `apply_schema`,
// without advancing the manifest. These tests pin that contract for the
// destructive shapes (drop type, drop property, narrow type, add required,
// remove constraint) so a regression in the planner can't silently allow them.
#[tokio::test]
async fn apply_schema_rejects_dropping_a_property_with_data() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
let before_version = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version();
// Drop `age` from Person. v1 doesn't support property removal even when
// the column is nullable — it would silently destroy data.
let desired = TEST_SCHEMA.replace(" age: I32?\n", "");
let err = db.apply_schema(&desired).await.unwrap_err();
assert!(
err.to_string().contains("removing property"),
"expected 'removing property' in error, got: {err}"
);
// Manifest didn't advance and existing rows are untouched.
assert_eq!(
db.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version(),
before_version
);
assert_eq!(count_rows(&db, "node:Person").await, people_before);
}
#[tokio::test]
async fn apply_schema_rejects_dropping_a_node_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let before_version = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version();
// Drop the `Company` node type and its outgoing edge that references it.
let desired = r#"
node Person {
name: String @key
age: I32?
}
edge Knows: Person -> Person {
since: Date?
}
"#;
let err = db.apply_schema(desired).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("removing node type") || msg.contains("removing edge type"),
"expected drop-type error, got: {msg}"
);
assert_eq!(
db.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version(),
before_version
);
}
#[tokio::test]
async fn apply_schema_rejects_dropping_an_edge_type() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let before_version = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version();
// Drop only the `WorksAt` edge.
let desired = TEST_SCHEMA.replace("\nedge WorksAt: Person -> Company", "");
let err = db.apply_schema(&desired).await.unwrap_err();
assert!(
err.to_string().contains("removing edge type"),
"expected 'removing edge type' error, got: {err}"
);
assert_eq!(
db.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version(),
before_version
);
}
#[tokio::test]
async fn apply_schema_rejects_adding_a_required_property_without_backfill() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let before_version = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version();
// Add `email: String` (required, non-nullable, no @rename_from). Existing
// rows have no value to fill in, so this is unsupported in v1.
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n email: String\n}",
);
let err = db.apply_schema(&desired).await.unwrap_err();
assert!(
err.to_string().contains("adding required property"),
"expected 'adding required property' error, got: {err}"
);
assert_eq!(
db.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version(),
before_version
);
}
#[tokio::test]
async fn plan_schema_for_property_type_narrowing_is_not_supported() {
// Symmetric companion to `apply_schema_unsupported_plan_does_not_advance_manifest`,
// which exercises widening (I32 -> I64). Narrowing (I64 -> I32) is also
// unsupported in v1, and should be flagged at plan time so callers can
// route to a manual-migration path before invoking apply.
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let initial = TEST_SCHEMA.replace("age: I32?", "age: I64?");
let mut db = Omnigraph::init(uri, &initial).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let plan = db.plan_schema(TEST_SCHEMA).await.unwrap();
assert!(!plan.supported, "narrowing I64 -> I32 must not be supported");
assert!(plan.steps.iter().any(|step| matches!(
step,
SchemaMigrationStep::UnsupportedChange { reason, .. }
if reason.contains("changing property type")
)));
}
#[tokio::test]
async fn apply_schema_renames_node_type_via_rename_from_and_preserves_rows() {
// Covers the stable-type-id contract: renaming a type preserves the
// underlying Lance dataset (by stable id), so existing rows survive the
// rename. This is the "supported" half of the destructive-vs-supported
// boundary that the rejections above cover.
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
assert!(
people_before > 0,
"fixture should seed Person rows for this test to be meaningful"
);
let desired = r#"
node Person @rename_from("Person") {
full_name: String @key @rename_from("name")
age: I32?
}
node Company {
name: String @key
}
edge Knows: Person -> Person {
since: Date?
}
edge WorksAt: Person -> Company
"#;
let result = db.apply_schema(desired).await.unwrap();
assert!(result.supported && result.applied);
assert!(result.steps.iter().any(|step| matches!(
step,
SchemaMigrationStep::RenameProperty {
type_kind: SchemaTypeKind::Node,
type_name,
from,
to,
} if type_name == "Person" && from == "name" && to == "full_name"
)));
// Rows survive the property rename.
assert_eq!(count_rows(&db, "node:Person").await, people_before);
}

View file

@ -6,7 +6,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
| Crate | Path | Style |
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (15 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |
@ -33,6 +33,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
| `export.rs` | NDJSON streaming export filters |
| `s3_storage.rs` | S3-backed repo (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) |
| `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior |
| `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths |
| `maintenance.rs` | `optimize` (compaction) + `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the four per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`). |
| `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path |
| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories). |