diff --git a/crates/omnigraph/tests/maintenance.rs b/crates/omnigraph/tests/maintenance.rs new file mode 100644 index 0000000..8bbbe52 --- /dev/null +++ b/crates/omnigraph/tests/maintenance.rs @@ -0,0 +1,137 @@ +// Maintenance tests: `optimize` (Lance compact_files) and `cleanup` +// (Lance cleanup_old_versions) at the graph level. Covers no-op edges +// (empty repo, already-optimized repo), the policy-validation contract on +// `cleanup`, and the keep-versions cap that protects head. + +mod helpers; + +use std::time::Duration; + +use omnigraph::db::{CleanupPolicyOptions, Omnigraph}; +use omnigraph::loader::{LoadMode, load_jsonl}; + +use helpers::{TEST_DATA, TEST_SCHEMA, count_rows, init_and_load}; + +#[tokio::test] +async fn optimize_on_empty_repo_returns_stats_per_table_with_no_changes() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let stats = db.optimize().await.unwrap(); + + // Schema declares 2 nodes + 2 edges = 4 tables. Compaction should run on + // each but find nothing to merge. + assert_eq!(stats.len(), 4); + for s in &stats { + assert_eq!(s.fragments_removed, 0, "{} should not remove", s.table_key); + assert_eq!(s.fragments_added, 0, "{} should not add", s.table_key); + } +} + +#[tokio::test] +async fn optimize_after_load_then_again_is_idempotent() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + // First pass may compact (load wrote real fragments). + let _first = db.optimize().await.unwrap(); + + // Second pass should be a no-op: already-compacted repo produces no + // fragments_removed / fragments_added. + let second = db.optimize().await.unwrap(); + for s in &second { + assert_eq!( + s.fragments_removed, 0, + "{} re-optimize should be no-op", + s.table_key + ); + assert_eq!( + s.fragments_added, 0, + "{} re-optimize should be no-op", + s.table_key + ); + assert!( + !s.committed, + "{} re-optimize should not commit a new version", + s.table_key + ); + } +} + +#[tokio::test] +async fn cleanup_without_any_policy_option_errors() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let err = db + .cleanup(CleanupPolicyOptions::default()) + .await + .expect_err("cleanup with no policy options must error"); + + let msg = format!("{}", err); + assert!( + msg.contains("keep_versions") && msg.contains("older_than"), + "error should name the two policy fields, got: {msg}" + ); +} + +#[tokio::test] +async fn cleanup_keep_one_preserves_head_and_table_remains_readable() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let people_before = count_rows(&db, "node:Person").await; + assert!( + people_before > 0, + "fixture should seed Person rows for this test to be meaningful" + ); + + // Most aggressive version-based cleanup short of forcing keep=0. Lance's + // contract is that head is always preserved regardless, so the table + // must remain openable and rows must still be visible. + let _stats = db + .cleanup(CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .unwrap(); + + assert_eq!(count_rows(&db, "node:Person").await, people_before); +} + +#[tokio::test] +async fn cleanup_older_than_zero_preserves_head() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + // Aggressive policy: every version is "older than zero seconds ago". + // Lance must still preserve the head manifest, so the table is openable + // afterwards and a subsequent load still works. + let _stats = db + .cleanup(CleanupPolicyOptions { + keep_versions: None, + older_than: Some(Duration::from_secs(0)), + }) + .await + .unwrap(); + + // Smoke test: after aggressive cleanup, we can still read and write the + // graph — head wasn't pruned. + load_jsonl(&mut db, TEST_DATA, LoadMode::Merge).await.unwrap(); +} + +#[tokio::test] +async fn cleanup_then_optimize_succeed_in_sequence() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + db.cleanup(CleanupPolicyOptions { + keep_versions: Some(1), + older_than: None, + }) + .await + .unwrap(); + db.optimize().await.unwrap(); +} diff --git a/crates/omnigraph/tests/schema_apply.rs b/crates/omnigraph/tests/schema_apply.rs index 9cdc98e..cb26257 100644 --- a/crates/omnigraph/tests/schema_apply.rs +++ b/crates/omnigraph/tests/schema_apply.rs @@ -3,6 +3,7 @@ mod helpers; use std::fs; use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_compiler::{SchemaMigrationStep, SchemaTypeKind}; use helpers::*; @@ -99,3 +100,207 @@ async fn apply_schema_unsupported_plan_does_not_advance_manifest() { before_version ); } + +// ─── Destructive / safety-tier rejections ──────────────────────────────────── +// +// Schema migration v1 only accepts additive change: add type, add nullable +// property, add index, rename. Every other shape returns an +// `UnsupportedChange` step that surfaces as an error from `apply_schema`, +// without advancing the manifest. These tests pin that contract for the +// destructive shapes (drop type, drop property, narrow type, add required, +// remove constraint) so a regression in the planner can't silently allow them. + +#[tokio::test] +async fn apply_schema_rejects_dropping_a_property_with_data() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let people_before = count_rows(&db, "node:Person").await; + let before_version = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(); + + // Drop `age` from Person. v1 doesn't support property removal even when + // the column is nullable — it would silently destroy data. + let desired = TEST_SCHEMA.replace(" age: I32?\n", ""); + let err = db.apply_schema(&desired).await.unwrap_err(); + assert!( + err.to_string().contains("removing property"), + "expected 'removing property' in error, got: {err}" + ); + + // Manifest didn't advance and existing rows are untouched. + assert_eq!( + db.snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(), + before_version + ); + assert_eq!(count_rows(&db, "node:Person").await, people_before); +} + +#[tokio::test] +async fn apply_schema_rejects_dropping_a_node_type() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + let before_version = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(); + + // Drop the `Company` node type and its outgoing edge that references it. + let desired = r#" +node Person { + name: String @key + age: I32? +} + +edge Knows: Person -> Person { + since: Date? +} +"#; + let err = db.apply_schema(desired).await.unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("removing node type") || msg.contains("removing edge type"), + "expected drop-type error, got: {msg}" + ); + assert_eq!( + db.snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(), + before_version + ); +} + +#[tokio::test] +async fn apply_schema_rejects_dropping_an_edge_type() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + let before_version = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(); + + // Drop only the `WorksAt` edge. + let desired = TEST_SCHEMA.replace("\nedge WorksAt: Person -> Company", ""); + let err = db.apply_schema(&desired).await.unwrap_err(); + assert!( + err.to_string().contains("removing edge type"), + "expected 'removing edge type' error, got: {err}" + ); + assert_eq!( + db.snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(), + before_version + ); +} + +#[tokio::test] +async fn apply_schema_rejects_adding_a_required_property_without_backfill() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + let before_version = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(); + + // Add `email: String` (required, non-nullable, no @rename_from). Existing + // rows have no value to fill in, so this is unsupported in v1. + let desired = TEST_SCHEMA.replace( + " age: I32?\n}", + " age: I32?\n email: String\n}", + ); + let err = db.apply_schema(&desired).await.unwrap_err(); + assert!( + err.to_string().contains("adding required property"), + "expected 'adding required property' error, got: {err}" + ); + assert_eq!( + db.snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(), + before_version + ); +} + +#[tokio::test] +async fn plan_schema_for_property_type_narrowing_is_not_supported() { + // Symmetric companion to `apply_schema_unsupported_plan_does_not_advance_manifest`, + // which exercises widening (I32 -> I64). Narrowing (I64 -> I32) is also + // unsupported in v1, and should be flagged at plan time so callers can + // route to a manual-migration path before invoking apply. + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let initial = TEST_SCHEMA.replace("age: I32?", "age: I64?"); + let mut db = Omnigraph::init(uri, &initial).await.unwrap(); + load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite) + .await + .unwrap(); + + let plan = db.plan_schema(TEST_SCHEMA).await.unwrap(); + assert!(!plan.supported, "narrowing I64 -> I32 must not be supported"); + assert!(plan.steps.iter().any(|step| matches!( + step, + SchemaMigrationStep::UnsupportedChange { reason, .. } + if reason.contains("changing property type") + ))); +} + +#[tokio::test] +async fn apply_schema_renames_node_type_via_rename_from_and_preserves_rows() { + // Covers the stable-type-id contract: renaming a type preserves the + // underlying Lance dataset (by stable id), so existing rows survive the + // rename. This is the "supported" half of the destructive-vs-supported + // boundary that the rejections above cover. + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + let people_before = count_rows(&db, "node:Person").await; + assert!( + people_before > 0, + "fixture should seed Person rows for this test to be meaningful" + ); + + let desired = r#" +node Person @rename_from("Person") { + full_name: String @key @rename_from("name") + age: I32? +} + +node Company { + name: String @key +} + +edge Knows: Person -> Person { + since: Date? +} + +edge WorksAt: Person -> Company +"#; + + let result = db.apply_schema(desired).await.unwrap(); + assert!(result.supported && result.applied); + assert!(result.steps.iter().any(|step| matches!( + step, + SchemaMigrationStep::RenameProperty { + type_kind: SchemaTypeKind::Node, + type_name, + from, + to, + } if type_name == "Person" && from == "name" && to == "full_name" + ))); + + // Rows survive the property rename. + assert_eq!(count_rows(&db, "node:Person").await, people_before); +} diff --git a/docs/testing.md b/docs/testing.md index 9843dfc..43643e9 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -6,7 +6,7 @@ This file is the always-on map of the test surface. **Consult it before every ta | Crate | Path | Style | |---|---|---| -| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (15 files), fixture-driven, share `tests/helpers/mod.rs` | +| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | @@ -33,6 +33,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `export.rs` | NDJSON streaming export filters | | `s3_storage.rs` | S3-backed repo (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) | | `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior | +| `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths | +| `maintenance.rs` | `optimize` (compaction) + `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation | | `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the four per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`). | | `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path | | `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories). |