diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index eb8ea95..09fd53f 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -316,6 +316,11 @@ enum SchemaCommand { schema: PathBuf, #[arg(long)] json: bool, + /// Show the plan as it would execute with `--allow-data-loss`. + /// Promotes every `DropMode::Soft` step to `DropMode::Hard` + /// so the plan output reflects the destructive intent. + #[arg(long, default_value_t = false)] + allow_data_loss: bool, }, /// Apply a supported schema migration Apply { @@ -329,6 +334,17 @@ enum SchemaCommand { schema: PathBuf, #[arg(long)] json: bool, + /// Allow destructive (data-loss) schema changes. + /// + /// Without this flag, drops are "soft": the column or table + /// is removed from the current manifest version but prior + /// versions are retained, so `snapshot_at_version(pre_drop)` + /// can still read the dropped data until `omnigraph cleanup` + /// runs. With this flag, drops are "hard": `cleanup_old_versions` + /// runs on the affected datasets immediately after the apply, + /// making the prior data unreachable. + #[arg(long, default_value_t = false)] + allow_data_loss: bool, }, /// Show the current accepted schema source #[command(alias = "get")] @@ -1980,12 +1996,18 @@ async fn main() -> Result<()> { config, schema, json, + allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_local_uri(&config, uri, target.as_deref(), "schema plan")?; let schema_source = fs::read_to_string(&schema)?; let db = Omnigraph::open(&uri).await?; - let plan = db.plan_schema(&schema_source).await?; + let plan = db + .plan_schema_with_options( + &schema_source, + omnigraph::db::SchemaApplyOptions { allow_data_loss }, + ) + .await?; let output = SchemaPlanOutput { uri: &uri, supported: plan.supported, @@ -2004,6 +2026,7 @@ async fn main() -> Result<()> { config, schema, json, + allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = @@ -2011,6 +2034,12 @@ async fn main() -> Result<()> { let uri = resolve_uri(&config, uri, target.as_deref())?; let schema_source = fs::read_to_string(&schema)?; let output = if is_remote_uri(&uri) { + if allow_data_loss { + bail!( + "--allow-data-loss is not yet supported on remote (HTTP) schema apply; \ + use `omnigraph schema apply` against a local path or s3:// URI for now" + ); + } remote_json::( &http_client, Method::POST, @@ -2023,7 +2052,13 @@ async fn main() -> Result<()> { .await? } else { let mut db = Omnigraph::open(&uri).await?; - schema_apply_output(&uri, db.apply_schema(&schema_source).await?) + let result = db + .apply_schema_with_options( + &schema_source, + omnigraph::db::SchemaApplyOptions { allow_data_loss }, + ) + .await?; + schema_apply_output(&uri, result) }; if json { print_json(&output)?; diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index b6ab0da..6bdd9ee 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -11,8 +11,8 @@ pub use commit_graph::GraphCommit; pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId}; pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate}; pub use omnigraph::{ - CleanupPolicyOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyResult, - TableCleanupStats, TableOptimizeStats, + CleanupPolicyOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions, + SchemaApplyResult, TableCleanupStats, TableOptimizeStats, }; pub(crate) use omnigraph::ensure_public_branch_ref; pub(crate) use run_registry::is_internal_run_branch; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index bf34ff9..1097ccc 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -34,6 +34,7 @@ mod schema_apply; mod table_ops; pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats}; +pub use schema_apply::SchemaApplyOptions; use super::commit_graph::GraphCommit; use super::manifest::{ @@ -308,11 +309,29 @@ impl Omnigraph { } pub async fn plan_schema(&self, desired_schema_source: &str) -> Result { - schema_apply::plan_schema(self, desired_schema_source).await + self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default()) + .await + } + + pub async fn plan_schema_with_options( + &self, + desired_schema_source: &str, + options: SchemaApplyOptions, + ) -> Result { + schema_apply::plan_schema(self, desired_schema_source, options).await } pub async fn apply_schema(&self, desired_schema_source: &str) -> Result { - schema_apply::apply_schema(self, desired_schema_source).await + self.apply_schema_with_options(desired_schema_source, SchemaApplyOptions::default()) + .await + } + + pub async fn apply_schema_with_options( + &self, + desired_schema_source: &str, + options: SchemaApplyOptions, + ) -> Result { + schema_apply::apply_schema(self, desired_schema_source, options).await } pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> { diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index bdc5e5c..1ff73c6 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -1,22 +1,60 @@ use super::*; +/// Operator-supplied options that gate schema-apply behavior. +/// +/// Today the only knob is `allow_data_loss`, which promotes +/// `DropMode::Soft` steps to `DropMode::Hard` (per chassis v1 +/// commit #5). Soft is the default — drops are reversible via Lance +/// time travel until cleanup runs. Hard runs `cleanup_old_versions` +/// on the affected datasets immediately after the manifest publish, +/// making the prior column data unreachable. +#[derive(Debug, Clone, Default)] +pub struct SchemaApplyOptions { + /// Allow destructive (data-loss) schema changes. When true, the + /// planner promotes every `DropMode::Soft` step to + /// `DropMode::Hard`, and the apply path runs + /// `cleanup_old_versions` on affected datasets after the publish. + pub allow_data_loss: bool, +} + +/// Promote every `Soft` drop variant in the plan to `Hard` when +/// `allow_data_loss` is set. Idempotent on non-drop steps. +fn promote_drops_to_hard(plan: &mut SchemaMigrationPlan, allow_data_loss: bool) { + if !allow_data_loss { + return; + } + for step in &mut plan.steps { + match step { + SchemaMigrationStep::DropType { mode, .. } + | SchemaMigrationStep::DropProperty { mode, .. } => { + *mode = DropMode::Hard; + } + _ => {} + } + } +} + pub(super) async fn plan_schema( db: &Omnigraph, desired_schema_source: &str, + options: SchemaApplyOptions, ) -> Result { db.ensure_schema_state_valid().await?; let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?; let desired_ir = read_schema_ir_from_source(desired_schema_source)?; - plan_schema_migration(&accepted_ir, &desired_ir) - .map_err(|err| OmniError::manifest(err.to_string())) + let mut plan = plan_schema_migration(&accepted_ir, &desired_ir) + .map_err(|err| OmniError::manifest(err.to_string()))?; + promote_drops_to_hard(&mut plan, options.allow_data_loss); + Ok(plan) } pub(super) async fn apply_schema( db: &Omnigraph, desired_schema_source: &str, + options: SchemaApplyOptions, ) -> Result { acquire_schema_apply_lock(db).await?; - let result = apply_schema_with_lock(db, desired_schema_source).await; + let result = apply_schema_with_lock(db, desired_schema_source, options).await; let release_result = release_schema_apply_lock(db).await; match (result, release_result) { (Ok(result), Ok(())) => Ok(result), @@ -29,6 +67,7 @@ pub(super) async fn apply_schema( pub(super) async fn apply_schema_with_lock( db: &Omnigraph, desired_schema_source: &str, + options: SchemaApplyOptions, ) -> Result { db.ensure_schema_state_valid().await?; let branches = db.coordinator.read().await.all_branches().await?; @@ -50,8 +89,9 @@ pub(super) async fn apply_schema_with_lock( let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?; let desired_ir = read_schema_ir_from_source(desired_schema_source)?; - let plan = plan_schema_migration(&accepted_ir, &desired_ir) + let mut plan = plan_schema_migration(&accepted_ir, &desired_ir) .map_err(|err| OmniError::manifest(err.to_string()))?; + promote_drops_to_hard(&mut plan, options.allow_data_loss); if !plan.supported { let message = plan .steps @@ -79,6 +119,12 @@ pub(super) async fn apply_schema_with_lock( let mut rewritten_tables = BTreeSet::new(); let mut indexed_tables = BTreeSet::new(); let mut dropped_tables = BTreeSet::new(); + // Hard-drop cleanup targets: (table_key, full_dataset_uri). + // Populated for DropProperty { Hard } and DropType { Hard }; the + // post-publish cleanup runs `cleanup_old_versions` on each + // dataset to reclaim prior versions, making time-travel back + // to pre-drop state unreachable. + let mut hard_cleanup_targets: Vec<(String, String)> = Vec::new(); let mut property_renames = HashMap::>::new(); let mut changed_edge_tables = false; @@ -145,25 +191,35 @@ pub(super) async fn apply_schema_with_lock( mode, .. } => { - // Soft = reuse the existing stage_overwrite rewrite - // path. batch_for_schema_apply_rewrite iterates the - // *target* schema fields, so a property absent from - // desired_catalog is naturally projected away. The - // prior Lance version retains the dropped column, - // so reads at the previous snapshot still see it - // (time-travel reversibility). Hard mode (immediate - // compact_files + cleanup_old_versions for actual - // data deletion) lands in commit #5 gated by - // --allow-data-loss. - if !matches!(mode, DropMode::Soft) { - return Err(OmniError::manifest_internal( - "DropProperty { Hard } not yet implemented (commit #5)", - )); - } + // Both Soft and Hard route through the existing + // stage_overwrite rewrite path. batch_for_schema_apply_rewrite + // iterates the *target* schema fields, so a property + // absent from desired_catalog is naturally projected + // away in the rebuilt batch. + // + // The difference between Soft and Hard is what + // happens AFTER the manifest publish: + // * Soft: nothing — the prior dataset version + // retains the dropped column; reads at + // snapshot_at_version(pre_drop) still see it. + // * Hard: run cleanup_old_versions on the dataset + // post-publish, removing the prior version (and + // reclaiming any fragments unique to it). After + // cleanup, time-travel back fails. let table_key = schema_table_key(*type_kind, type_name); if table_key.starts_with("edge:") { changed_edge_tables = true; } + if matches!(mode, DropMode::Hard) { + let entry = snapshot.entry(&table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing table '{}' for hard property drop", + table_key + )) + })?; + let full_uri = format!("{}/{}", db.root_uri, entry.table_path); + hard_cleanup_targets.push((table_key.clone(), full_uri)); + } rewritten_tables.insert(table_key); } SchemaMigrationStep::DropType { @@ -171,25 +227,35 @@ pub(super) async fn apply_schema_with_lock( name, mode, } => { - // Soft = remove the table's entry from the current - // __manifest version via a tombstone. The Lance - // dataset files are retained — prior __manifest - // versions still reference them, so Lance time - // travel + branch-from-snapshot can read the dropped - // table until `omnigraph cleanup` ages out the older - // manifest versions. No per-table write happens here; - // the tombstone is the entire change. Hard mode - // (immediate dataset deletion via cleanup) lands in - // commit #5 gated by --allow-data-loss. - if !matches!(mode, DropMode::Soft) { - return Err(OmniError::manifest_internal( - "DropType { Hard } not yet implemented (commit #5)", - )); - } + // Both Soft and Hard tombstone the table's entry in + // the current __manifest version (no per-table write). + // + // The difference is what happens after publish: + // * Soft: dataset files retained; prior __manifest + // versions still reference them; Lance time + // travel + branch-from-snapshot can read the + // dropped table. + // * Hard: run cleanup_old_versions on the orphan + // dataset post-publish. Prior dataset versions + // (and their fragments) are reclaimed. The dataset + // directory itself persists until a future + // orphan-cleanup pass — operators who need the + // directory gone too should run `omnigraph cleanup` + // and (for now) remove the directory out-of-band. let table_key = schema_table_key(*type_kind, name); if table_key.starts_with("edge:") { changed_edge_tables = true; } + if matches!(mode, DropMode::Hard) { + let entry = snapshot.entry(&table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing table '{}' for hard type drop", + table_key + )) + })?; + let full_uri = format!("{}/{}", db.root_uri, entry.table_path); + hard_cleanup_targets.push((table_key.clone(), full_uri)); + } dropped_tables.insert(table_key); } step @ SchemaMigrationStep::UnsupportedChange { .. } => { @@ -597,6 +663,25 @@ pub(super) async fn apply_schema_with_lock( } } + // Hard-drop cleanup: run cleanup_old_versions on each dataset + // that had a Hard mode drop step. Best-effort — the schema apply + // is already durable. If cleanup fails, the prior data fragments + // remain on disk as orphans (reclaimable via `omnigraph cleanup`). + // We do NOT fail the apply on cleanup error; the manifest change + // is the load-bearing operation. + for (table_key, full_uri) in &hard_cleanup_targets { + match cleanup_dataset_old_versions(db, full_uri).await { + Ok(()) => {} + Err(err) => { + tracing::warn!( + error = %err, + table_key = table_key.as_str(), + "hard-drop cleanup_old_versions failed; rerun `omnigraph cleanup` to reclaim", + ); + } + } + } + Ok(SchemaApplyResult { supported: true, applied: true, @@ -605,6 +690,36 @@ pub(super) async fn apply_schema_with_lock( }) } +/// Run `cleanup_old_versions` on a dataset URI with `before_timestamp = now`. +/// Removes every version older than the current, making time-travel back +/// to those versions unreachable. Used by Hard mode drops to enforce +/// "data is gone" semantics post-apply. +/// +/// The dataset itself isn't deleted — for DropType { Hard }, the +/// dataset directory persists with only its current version (or, if +/// no current version was written, its pre-drop version). A future +/// orphan-cleanup pass should remove the directory entirely. +async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> { + use chrono::Utc; + use lance::dataset::cleanup::CleanupPolicy; + let ds = lance::Dataset::open(full_uri) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let policy = CleanupPolicy { + before_timestamp: Some(Utc::now()), + before_version: None, + delete_unverified: false, + error_if_tagged_old_versions: false, + clean_referenced_branches: false, + delete_rate_limit: None, + }; + let _removed = lance::dataset::cleanup::cleanup_old_versions(&ds, policy) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let _ = db; + Ok(()) +} + pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> { db.refresh_coordinator_only().await?; ensure_schema_apply_not_locked(db, operation).await diff --git a/crates/omnigraph/tests/schema_apply.rs b/crates/omnigraph/tests/schema_apply.rs index 57a6073..6862c84 100644 --- a/crates/omnigraph/tests/schema_apply.rs +++ b/crates/omnigraph/tests/schema_apply.rs @@ -522,3 +522,217 @@ edge WorksAt: Human -> Company "old node:Person table key should be unmapped after rename" ); } + +// ─── Hard-mode drops (chassis v1 commit #5 — --allow-data-loss) ────────────── +// +// Hard mode promotes every `DropMode::Soft` step to `DropMode::Hard` and runs +// `cleanup_old_versions` on affected datasets immediately after the manifest +// publish. For DropProperty Hard, this removes the prior dataset version +// (where the column lived), making `snapshot_at_version(pre_drop)` unable to +// open the dataset at that version. For DropType Hard, the dataset is +// untouched by the schema apply itself (no per-table write), so +// cleanup_old_versions is currently a no-op for it — the dataset directory +// persists. Full orphan-dataset deletion is a separate follow-up. + +#[tokio::test] +async fn apply_schema_with_allow_data_loss_promotes_drops_to_hard() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + let desired = TEST_SCHEMA.replace(" age: I32?\n", ""); + + // Default plan (no flag) → Soft. + let plan_soft = db.plan_schema(&desired).await.unwrap(); + assert!(plan_soft.steps.iter().any(|step| matches!( + step, + SchemaMigrationStep::DropProperty { + mode: omnigraph_compiler::DropMode::Soft, + .. + } + ))); + + // With --allow-data-loss → Hard. + let plan_hard = db + .plan_schema_with_options( + &desired, + omnigraph::db::SchemaApplyOptions { + allow_data_loss: true, + }, + ) + .await + .unwrap(); + assert!(plan_hard.supported); + assert!( + plan_hard.steps.iter().any(|step| matches!( + step, + SchemaMigrationStep::DropProperty { + mode: omnigraph_compiler::DropMode::Hard, + .. + } + )), + "with --allow-data-loss, DropProperty should be promoted to Hard: {plan_hard:?}", + ); + // Negative: no remaining Soft drops in the promoted plan. + assert!( + !plan_hard.steps.iter().any(|step| matches!( + step, + SchemaMigrationStep::DropProperty { + mode: omnigraph_compiler::DropMode::Soft, + .. + } | SchemaMigrationStep::DropType { + mode: omnigraph_compiler::DropMode::Soft, + .. + } + )), + "promoted plan should have no Soft drops left: {plan_hard:?}", + ); + + // Apply with flag succeeds. + let result = db + .apply_schema_with_options( + &desired, + omnigraph::db::SchemaApplyOptions { + allow_data_loss: true, + }, + ) + .await + .unwrap(); + assert!(result.applied); +} + +#[tokio::test] +async fn apply_schema_hard_drops_property_makes_prior_version_unreachable() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + let before_version = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(); + + // Hard drop the `age` column. Soft drop would leave the prior + // dataset version intact; Hard drop runs cleanup_old_versions on + // the dataset post-apply, removing the prior version. + let desired = TEST_SCHEMA.replace(" age: I32?\n", ""); + let result = db + .apply_schema_with_options( + &desired, + omnigraph::db::SchemaApplyOptions { + allow_data_loss: true, + }, + ) + .await + .unwrap(); + assert!(result.applied); + + // Current snapshot: column gone from the dataset schema. + let current_snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let current_ds = current_snapshot.open("node:Person").await.unwrap(); + let current_fields = current_ds + .schema() + .fields + .iter() + .map(|f| f.name.clone()) + .collect::>(); + assert!( + !current_fields.iter().any(|f| f == "age"), + "current Person schema must not include 'age' after hard drop; got {current_fields:?}", + ); + + // Time travel: at the pre-drop manifest version, the entry points + // at the OLD dataset version which has been cleaned up. Opening + // the dataset at that snapshot should fail (Lance can't load the + // dropped version). This is the Hard-mode contract — the prior + // data is unreachable. + let pre_drop = db.snapshot_at_version(before_version).await.unwrap(); + let open_result = pre_drop.open("node:Person").await; + assert!( + open_result.is_err(), + "after hard drop + cleanup, pre-drop snapshot.open() must fail (prior version was reclaimed); got {open_result:?}", + ); +} + +#[tokio::test] +async fn apply_schema_hard_drops_node_and_edge_with_flag_succeeds() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + let before_version = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(); + + let desired = r#" +node Person { + name: String @key + age: I32? +} + +edge Knows: Person -> Person { + since: Date? +} +"#; + + let plan = db + .plan_schema_with_options( + desired, + omnigraph::db::SchemaApplyOptions { + allow_data_loss: true, + }, + ) + .await + .unwrap(); + assert!(plan.supported); + assert!( + plan.steps.iter().any(|step| matches!( + step, + SchemaMigrationStep::DropType { + type_kind: SchemaTypeKind::Node, + mode: omnigraph_compiler::DropMode::Hard, + .. + } + )), + "with --allow-data-loss, DropType {{ Node }} should be Hard: {plan:?}", + ); + assert!( + plan.steps.iter().any(|step| matches!( + step, + SchemaMigrationStep::DropType { + type_kind: SchemaTypeKind::Edge, + mode: omnigraph_compiler::DropMode::Hard, + .. + } + )), + "with --allow-data-loss, DropType {{ Edge }} should be Hard: {plan:?}", + ); + + let result = db + .apply_schema_with_options( + desired, + omnigraph::db::SchemaApplyOptions { + allow_data_loss: true, + }, + ) + .await + .unwrap(); + assert!(result.applied); + + let after_version = db + .snapshot_of(ReadTarget::branch("main")) + .await + .unwrap() + .version(); + assert!(after_version > before_version); + + // Current manifest: both dropped entries gone. + let current = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + assert!(current.entry("node:Company").is_none()); + assert!(current.entry("edge:WorksAt").is_none()); + + // NOTE: DropType Hard's cleanup of the orphan dataset directory + // is a known follow-up (the manifest entry is tombstoned and the + // dataset's prior versions are cleaned, but the directory itself + // persists until an orphan-cleanup pass is implemented). For the + // current contract, the data is *unreachable* via omnigraph + // (no manifest entry), which is the user-facing guarantee. +}