mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-15 01:55:13 +02:00
schema-lint chassis v1.2: --allow-data-loss flag + Hard mode (MR-694) — completes v1 (#100)
* schema-lint v1 commit 5: --allow-data-loss flag + Hard mode Final v1 commit. Wires up the --allow-data-loss CLI flag and Hard mode for both DropProperty and DropType. Per docs/dev/schema-lint-v1-plan.md, commit #5 of the schema-lint chassis v1 series (MR-694). CLI (omnigraph-cli/src/main.rs): - New --allow-data-loss flag on both `omnigraph schema plan` and `omnigraph schema apply` subcommands. Off by default (Soft). - HTTP remote schema apply explicitly rejects the flag for now (CLI-only; HTTP parity is a separate small follow-up that adds the field to SchemaApplyRequest + the server handler). Engine (omnigraph.rs + schema_apply.rs): - New SchemaApplyOptions { allow_data_loss: bool } public struct (Default = all false), re-exported via omnigraph::db::SchemaApplyOptions. - New public methods: plan_schema_with_options and apply_schema_with_options. Existing plan_schema/apply_schema are now thin wrappers that pass Default::default(). - promote_drops_to_hard: post-plan walk that promotes every DropMode::Soft step to DropMode::Hard when the flag is set. Keeps the compiler's plan_schema_migration signature unchanged (no breaking change for tests / callers). - Apply path: both Drop arms accept Hard mode; behavior is identical to Soft inside the apply loop. The DIFFERENCE is the new hard_cleanup_targets: Vec<(String, String)> accumulator, populated for every Hard variant with (table_key, full_dataset_uri). - Post-publish cleanup: a new loop after the manifest commit iterates hard_cleanup_targets and calls cleanup_old_versions (before_timestamp = now) on each dataset URI. Best-effort — the apply is already durable; cleanup failure is logged via tracing::warn rather than failing the apply. - New cleanup_dataset_old_versions helper inlines the Lance cleanup_old_versions call against a dataset URI. Behavioral details: - DropProperty Hard: stage_overwrite produced a new dataset version without the column. cleanup_old_versions removes the prior version (and reclaims unique fragments). After Hard apply, snapshot_at_version(pre_drop).open(table_key) FAILS because the prior dataset version was reclaimed. - DropType Hard: no per-table write happens (the change is the manifest tombstone). cleanup_old_versions on the orphan dataset is a no-op in the immediate term (no prior versions to clean since the dataset wasn't modified by this apply). The dataset directory persists. Full orphan-cleanup is a documented follow-up — the user-facing contract is "data is unreachable via omnigraph" (manifest entry tombstoned), which is satisfied. Tests (tests/schema_apply.rs): - apply_schema_with_allow_data_loss_promotes_drops_to_hard: default plan emits Soft; with options.allow_data_loss=true, plan emits Hard; apply succeeds. - apply_schema_hard_drops_property_makes_prior_version_unreachable: Hard drop succeeds, current snapshot lacks the column, and snapshot_at_version(pre_drop).open("node:Person") FAILS (Lance prior version reclaimed by cleanup). - apply_schema_hard_drops_node_and_edge_with_flag_succeeds: both Node and Edge DropType variants are promoted to Hard with the flag; apply succeeds; current manifest entries gone. (Orphan dataset directory cleanup deferred.) Test results: - cargo test -p omnigraph-compiler --lib: 239 passed - cargo test -p omnigraph-engine --test schema_apply: 14 passed (3 new Hard tests + 11 existing soft/regression tests) - cargo test -p omnigraph-server --test openapi: 60 passed (no HTTP API surface changes in this commit; OpenAPI parity follow-up noted) v1 status: complete for CLI/embedded use. MR-694 chassis epic + MR-700 DropType/DropProperty ticket can close after this lands. Known follow-ups (separate small PRs): - HTTP parity: extend SchemaApplyRequest with allow_data_loss field, thread through server handler, regenerate openapi.json. - Orphan-dataset directory deletion for DropType Hard (currently the dataset directory persists; cleanup_old_versions doesn't remove it because the dataset wasn't modified). - MR-948 substrate alignment: swap DropProperty Soft from stage_overwrite to Dataset::drop_columns (catalog_only vs full_rewrite cost class). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fixup: use bail! from color_eyre::eyre instead of anyhow The remote-rejection branch in SchemaCommand::Apply used anyhow::anyhow! which isn't in scope; the CLI's Result type is color_eyre::eyre::Result and bail! is already imported. Caught by CI Test Workspace job on PR #100. --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
58cee158d8
commit
a6e037547f
5 changed files with 423 additions and 40 deletions
|
|
@ -316,6 +316,11 @@ enum SchemaCommand {
|
|||
schema: PathBuf,
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
/// Show the plan as it would execute with `--allow-data-loss`.
|
||||
/// Promotes every `DropMode::Soft` step to `DropMode::Hard`
|
||||
/// so the plan output reflects the destructive intent.
|
||||
#[arg(long, default_value_t = false)]
|
||||
allow_data_loss: bool,
|
||||
},
|
||||
/// Apply a supported schema migration
|
||||
Apply {
|
||||
|
|
@ -329,6 +334,17 @@ enum SchemaCommand {
|
|||
schema: PathBuf,
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
/// Allow destructive (data-loss) schema changes.
|
||||
///
|
||||
/// Without this flag, drops are "soft": the column or table
|
||||
/// is removed from the current manifest version but prior
|
||||
/// versions are retained, so `snapshot_at_version(pre_drop)`
|
||||
/// can still read the dropped data until `omnigraph cleanup`
|
||||
/// runs. With this flag, drops are "hard": `cleanup_old_versions`
|
||||
/// runs on the affected datasets immediately after the apply,
|
||||
/// making the prior data unreachable.
|
||||
#[arg(long, default_value_t = false)]
|
||||
allow_data_loss: bool,
|
||||
},
|
||||
/// Show the current accepted schema source
|
||||
#[command(alias = "get")]
|
||||
|
|
@ -1980,12 +1996,18 @@ async fn main() -> Result<()> {
|
|||
config,
|
||||
schema,
|
||||
json,
|
||||
allow_data_loss,
|
||||
} => {
|
||||
let config = load_cli_config(config.as_ref())?;
|
||||
let uri = resolve_local_uri(&config, uri, target.as_deref(), "schema plan")?;
|
||||
let schema_source = fs::read_to_string(&schema)?;
|
||||
let db = Omnigraph::open(&uri).await?;
|
||||
let plan = db.plan_schema(&schema_source).await?;
|
||||
let plan = db
|
||||
.plan_schema_with_options(
|
||||
&schema_source,
|
||||
omnigraph::db::SchemaApplyOptions { allow_data_loss },
|
||||
)
|
||||
.await?;
|
||||
let output = SchemaPlanOutput {
|
||||
uri: &uri,
|
||||
supported: plan.supported,
|
||||
|
|
@ -2004,6 +2026,7 @@ async fn main() -> Result<()> {
|
|||
config,
|
||||
schema,
|
||||
json,
|
||||
allow_data_loss,
|
||||
} => {
|
||||
let config = load_cli_config(config.as_ref())?;
|
||||
let bearer_token =
|
||||
|
|
@ -2011,6 +2034,12 @@ async fn main() -> Result<()> {
|
|||
let uri = resolve_uri(&config, uri, target.as_deref())?;
|
||||
let schema_source = fs::read_to_string(&schema)?;
|
||||
let output = if is_remote_uri(&uri) {
|
||||
if allow_data_loss {
|
||||
bail!(
|
||||
"--allow-data-loss is not yet supported on remote (HTTP) schema apply; \
|
||||
use `omnigraph schema apply` against a local path or s3:// URI for now"
|
||||
);
|
||||
}
|
||||
remote_json::<SchemaApplyOutput>(
|
||||
&http_client,
|
||||
Method::POST,
|
||||
|
|
@ -2023,7 +2052,13 @@ async fn main() -> Result<()> {
|
|||
.await?
|
||||
} else {
|
||||
let mut db = Omnigraph::open(&uri).await?;
|
||||
schema_apply_output(&uri, db.apply_schema(&schema_source).await?)
|
||||
let result = db
|
||||
.apply_schema_with_options(
|
||||
&schema_source,
|
||||
omnigraph::db::SchemaApplyOptions { allow_data_loss },
|
||||
)
|
||||
.await?;
|
||||
schema_apply_output(&uri, result)
|
||||
};
|
||||
if json {
|
||||
print_json(&output)?;
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ pub use commit_graph::GraphCommit;
|
|||
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
|
||||
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
|
||||
pub use omnigraph::{
|
||||
CleanupPolicyOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyResult,
|
||||
TableCleanupStats, TableOptimizeStats,
|
||||
CleanupPolicyOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions,
|
||||
SchemaApplyResult, TableCleanupStats, TableOptimizeStats,
|
||||
};
|
||||
pub(crate) use omnigraph::ensure_public_branch_ref;
|
||||
pub(crate) use run_registry::is_internal_run_branch;
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ mod schema_apply;
|
|||
mod table_ops;
|
||||
|
||||
pub use optimize::{CleanupPolicyOptions, TableCleanupStats, TableOptimizeStats};
|
||||
pub use schema_apply::SchemaApplyOptions;
|
||||
|
||||
use super::commit_graph::GraphCommit;
|
||||
use super::manifest::{
|
||||
|
|
@ -308,11 +309,29 @@ impl Omnigraph {
|
|||
}
|
||||
|
||||
pub async fn plan_schema(&self, desired_schema_source: &str) -> Result<SchemaMigrationPlan> {
|
||||
schema_apply::plan_schema(self, desired_schema_source).await
|
||||
self.plan_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn plan_schema_with_options(
|
||||
&self,
|
||||
desired_schema_source: &str,
|
||||
options: SchemaApplyOptions,
|
||||
) -> Result<SchemaMigrationPlan> {
|
||||
schema_apply::plan_schema(self, desired_schema_source, options).await
|
||||
}
|
||||
|
||||
pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
|
||||
schema_apply::apply_schema(self, desired_schema_source).await
|
||||
self.apply_schema_with_options(desired_schema_source, SchemaApplyOptions::default())
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn apply_schema_with_options(
|
||||
&self,
|
||||
desired_schema_source: &str,
|
||||
options: SchemaApplyOptions,
|
||||
) -> Result<SchemaApplyResult> {
|
||||
schema_apply::apply_schema(self, desired_schema_source, options).await
|
||||
}
|
||||
|
||||
pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
|
||||
|
|
|
|||
|
|
@ -1,22 +1,60 @@
|
|||
use super::*;
|
||||
|
||||
/// Operator-supplied options that gate schema-apply behavior.
|
||||
///
|
||||
/// Today the only knob is `allow_data_loss`, which promotes
|
||||
/// `DropMode::Soft` steps to `DropMode::Hard` (per chassis v1
|
||||
/// commit #5). Soft is the default — drops are reversible via Lance
|
||||
/// time travel until cleanup runs. Hard runs `cleanup_old_versions`
|
||||
/// on the affected datasets immediately after the manifest publish,
|
||||
/// making the prior column data unreachable.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct SchemaApplyOptions {
|
||||
/// Allow destructive (data-loss) schema changes. When true, the
|
||||
/// planner promotes every `DropMode::Soft` step to
|
||||
/// `DropMode::Hard`, and the apply path runs
|
||||
/// `cleanup_old_versions` on affected datasets after the publish.
|
||||
pub allow_data_loss: bool,
|
||||
}
|
||||
|
||||
/// Promote every `Soft` drop variant in the plan to `Hard` when
|
||||
/// `allow_data_loss` is set. Idempotent on non-drop steps.
|
||||
fn promote_drops_to_hard(plan: &mut SchemaMigrationPlan, allow_data_loss: bool) {
|
||||
if !allow_data_loss {
|
||||
return;
|
||||
}
|
||||
for step in &mut plan.steps {
|
||||
match step {
|
||||
SchemaMigrationStep::DropType { mode, .. }
|
||||
| SchemaMigrationStep::DropProperty { mode, .. } => {
|
||||
*mode = DropMode::Hard;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn plan_schema(
|
||||
db: &Omnigraph,
|
||||
desired_schema_source: &str,
|
||||
options: SchemaApplyOptions,
|
||||
) -> Result<SchemaMigrationPlan> {
|
||||
db.ensure_schema_state_valid().await?;
|
||||
let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
|
||||
let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
|
||||
plan_schema_migration(&accepted_ir, &desired_ir)
|
||||
.map_err(|err| OmniError::manifest(err.to_string()))
|
||||
let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
|
||||
.map_err(|err| OmniError::manifest(err.to_string()))?;
|
||||
promote_drops_to_hard(&mut plan, options.allow_data_loss);
|
||||
Ok(plan)
|
||||
}
|
||||
|
||||
pub(super) async fn apply_schema(
|
||||
db: &Omnigraph,
|
||||
desired_schema_source: &str,
|
||||
options: SchemaApplyOptions,
|
||||
) -> Result<SchemaApplyResult> {
|
||||
acquire_schema_apply_lock(db).await?;
|
||||
let result = apply_schema_with_lock(db, desired_schema_source).await;
|
||||
let result = apply_schema_with_lock(db, desired_schema_source, options).await;
|
||||
let release_result = release_schema_apply_lock(db).await;
|
||||
match (result, release_result) {
|
||||
(Ok(result), Ok(())) => Ok(result),
|
||||
|
|
@ -29,6 +67,7 @@ pub(super) async fn apply_schema(
|
|||
pub(super) async fn apply_schema_with_lock(
|
||||
db: &Omnigraph,
|
||||
desired_schema_source: &str,
|
||||
options: SchemaApplyOptions,
|
||||
) -> Result<SchemaApplyResult> {
|
||||
db.ensure_schema_state_valid().await?;
|
||||
let branches = db.coordinator.read().await.all_branches().await?;
|
||||
|
|
@ -50,8 +89,9 @@ pub(super) async fn apply_schema_with_lock(
|
|||
|
||||
let accepted_ir = read_accepted_schema_ir(db.uri(), Arc::clone(&db.storage)).await?;
|
||||
let desired_ir = read_schema_ir_from_source(desired_schema_source)?;
|
||||
let plan = plan_schema_migration(&accepted_ir, &desired_ir)
|
||||
let mut plan = plan_schema_migration(&accepted_ir, &desired_ir)
|
||||
.map_err(|err| OmniError::manifest(err.to_string()))?;
|
||||
promote_drops_to_hard(&mut plan, options.allow_data_loss);
|
||||
if !plan.supported {
|
||||
let message = plan
|
||||
.steps
|
||||
|
|
@ -79,6 +119,12 @@ pub(super) async fn apply_schema_with_lock(
|
|||
let mut rewritten_tables = BTreeSet::new();
|
||||
let mut indexed_tables = BTreeSet::new();
|
||||
let mut dropped_tables = BTreeSet::new();
|
||||
// Hard-drop cleanup targets: (table_key, full_dataset_uri).
|
||||
// Populated for DropProperty { Hard } and DropType { Hard }; the
|
||||
// post-publish cleanup runs `cleanup_old_versions` on each
|
||||
// dataset to reclaim prior versions, making time-travel back
|
||||
// to pre-drop state unreachable.
|
||||
let mut hard_cleanup_targets: Vec<(String, String)> = Vec::new();
|
||||
let mut property_renames = HashMap::<String, HashMap<String, String>>::new();
|
||||
let mut changed_edge_tables = false;
|
||||
|
||||
|
|
@ -145,25 +191,35 @@ pub(super) async fn apply_schema_with_lock(
|
|||
mode,
|
||||
..
|
||||
} => {
|
||||
// Soft = reuse the existing stage_overwrite rewrite
|
||||
// path. batch_for_schema_apply_rewrite iterates the
|
||||
// *target* schema fields, so a property absent from
|
||||
// desired_catalog is naturally projected away. The
|
||||
// prior Lance version retains the dropped column,
|
||||
// so reads at the previous snapshot still see it
|
||||
// (time-travel reversibility). Hard mode (immediate
|
||||
// compact_files + cleanup_old_versions for actual
|
||||
// data deletion) lands in commit #5 gated by
|
||||
// --allow-data-loss.
|
||||
if !matches!(mode, DropMode::Soft) {
|
||||
return Err(OmniError::manifest_internal(
|
||||
"DropProperty { Hard } not yet implemented (commit #5)",
|
||||
));
|
||||
}
|
||||
// Both Soft and Hard route through the existing
|
||||
// stage_overwrite rewrite path. batch_for_schema_apply_rewrite
|
||||
// iterates the *target* schema fields, so a property
|
||||
// absent from desired_catalog is naturally projected
|
||||
// away in the rebuilt batch.
|
||||
//
|
||||
// The difference between Soft and Hard is what
|
||||
// happens AFTER the manifest publish:
|
||||
// * Soft: nothing — the prior dataset version
|
||||
// retains the dropped column; reads at
|
||||
// snapshot_at_version(pre_drop) still see it.
|
||||
// * Hard: run cleanup_old_versions on the dataset
|
||||
// post-publish, removing the prior version (and
|
||||
// reclaiming any fragments unique to it). After
|
||||
// cleanup, time-travel back fails.
|
||||
let table_key = schema_table_key(*type_kind, type_name);
|
||||
if table_key.starts_with("edge:") {
|
||||
changed_edge_tables = true;
|
||||
}
|
||||
if matches!(mode, DropMode::Hard) {
|
||||
let entry = snapshot.entry(&table_key).ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"missing table '{}' for hard property drop",
|
||||
table_key
|
||||
))
|
||||
})?;
|
||||
let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
hard_cleanup_targets.push((table_key.clone(), full_uri));
|
||||
}
|
||||
rewritten_tables.insert(table_key);
|
||||
}
|
||||
SchemaMigrationStep::DropType {
|
||||
|
|
@ -171,25 +227,35 @@ pub(super) async fn apply_schema_with_lock(
|
|||
name,
|
||||
mode,
|
||||
} => {
|
||||
// Soft = remove the table's entry from the current
|
||||
// __manifest version via a tombstone. The Lance
|
||||
// dataset files are retained — prior __manifest
|
||||
// versions still reference them, so Lance time
|
||||
// travel + branch-from-snapshot can read the dropped
|
||||
// table until `omnigraph cleanup` ages out the older
|
||||
// manifest versions. No per-table write happens here;
|
||||
// the tombstone is the entire change. Hard mode
|
||||
// (immediate dataset deletion via cleanup) lands in
|
||||
// commit #5 gated by --allow-data-loss.
|
||||
if !matches!(mode, DropMode::Soft) {
|
||||
return Err(OmniError::manifest_internal(
|
||||
"DropType { Hard } not yet implemented (commit #5)",
|
||||
));
|
||||
}
|
||||
// Both Soft and Hard tombstone the table's entry in
|
||||
// the current __manifest version (no per-table write).
|
||||
//
|
||||
// The difference is what happens after publish:
|
||||
// * Soft: dataset files retained; prior __manifest
|
||||
// versions still reference them; Lance time
|
||||
// travel + branch-from-snapshot can read the
|
||||
// dropped table.
|
||||
// * Hard: run cleanup_old_versions on the orphan
|
||||
// dataset post-publish. Prior dataset versions
|
||||
// (and their fragments) are reclaimed. The dataset
|
||||
// directory itself persists until a future
|
||||
// orphan-cleanup pass — operators who need the
|
||||
// directory gone too should run `omnigraph cleanup`
|
||||
// and (for now) remove the directory out-of-band.
|
||||
let table_key = schema_table_key(*type_kind, name);
|
||||
if table_key.starts_with("edge:") {
|
||||
changed_edge_tables = true;
|
||||
}
|
||||
if matches!(mode, DropMode::Hard) {
|
||||
let entry = snapshot.entry(&table_key).ok_or_else(|| {
|
||||
OmniError::manifest(format!(
|
||||
"missing table '{}' for hard type drop",
|
||||
table_key
|
||||
))
|
||||
})?;
|
||||
let full_uri = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
hard_cleanup_targets.push((table_key.clone(), full_uri));
|
||||
}
|
||||
dropped_tables.insert(table_key);
|
||||
}
|
||||
step @ SchemaMigrationStep::UnsupportedChange { .. } => {
|
||||
|
|
@ -597,6 +663,25 @@ pub(super) async fn apply_schema_with_lock(
|
|||
}
|
||||
}
|
||||
|
||||
// Hard-drop cleanup: run cleanup_old_versions on each dataset
|
||||
// that had a Hard mode drop step. Best-effort — the schema apply
|
||||
// is already durable. If cleanup fails, the prior data fragments
|
||||
// remain on disk as orphans (reclaimable via `omnigraph cleanup`).
|
||||
// We do NOT fail the apply on cleanup error; the manifest change
|
||||
// is the load-bearing operation.
|
||||
for (table_key, full_uri) in &hard_cleanup_targets {
|
||||
match cleanup_dataset_old_versions(db, full_uri).await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
table_key = table_key.as_str(),
|
||||
"hard-drop cleanup_old_versions failed; rerun `omnigraph cleanup` to reclaim",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(SchemaApplyResult {
|
||||
supported: true,
|
||||
applied: true,
|
||||
|
|
@ -605,6 +690,36 @@ pub(super) async fn apply_schema_with_lock(
|
|||
})
|
||||
}
|
||||
|
||||
/// Run `cleanup_old_versions` on a dataset URI with `before_timestamp = now`.
|
||||
/// Removes every version older than the current, making time-travel back
|
||||
/// to those versions unreachable. Used by Hard mode drops to enforce
|
||||
/// "data is gone" semantics post-apply.
|
||||
///
|
||||
/// The dataset itself isn't deleted — for DropType { Hard }, the
|
||||
/// dataset directory persists with only its current version (or, if
|
||||
/// no current version was written, its pre-drop version). A future
|
||||
/// orphan-cleanup pass should remove the directory entirely.
|
||||
async fn cleanup_dataset_old_versions(db: &Omnigraph, full_uri: &str) -> Result<()> {
|
||||
use chrono::Utc;
|
||||
use lance::dataset::cleanup::CleanupPolicy;
|
||||
let ds = lance::Dataset::open(full_uri)
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
let policy = CleanupPolicy {
|
||||
before_timestamp: Some(Utc::now()),
|
||||
before_version: None,
|
||||
delete_unverified: false,
|
||||
error_if_tagged_old_versions: false,
|
||||
clean_referenced_branches: false,
|
||||
delete_rate_limit: None,
|
||||
};
|
||||
let _removed = lance::dataset::cleanup::cleanup_old_versions(&ds, policy)
|
||||
.await
|
||||
.map_err(|e| OmniError::Lance(e.to_string()))?;
|
||||
let _ = db;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> {
|
||||
db.refresh_coordinator_only().await?;
|
||||
ensure_schema_apply_not_locked(db, operation).await
|
||||
|
|
|
|||
|
|
@ -522,3 +522,217 @@ edge WorksAt: Human -> Company
|
|||
"old node:Person table key should be unmapped after rename"
|
||||
);
|
||||
}
|
||||
|
||||
// ─── Hard-mode drops (chassis v1 commit #5 — --allow-data-loss) ──────────────
|
||||
//
|
||||
// Hard mode promotes every `DropMode::Soft` step to `DropMode::Hard` and runs
|
||||
// `cleanup_old_versions` on affected datasets immediately after the manifest
|
||||
// publish. For DropProperty Hard, this removes the prior dataset version
|
||||
// (where the column lived), making `snapshot_at_version(pre_drop)` unable to
|
||||
// open the dataset at that version. For DropType Hard, the dataset is
|
||||
// untouched by the schema apply itself (no per-table write), so
|
||||
// cleanup_old_versions is currently a no-op for it — the dataset directory
|
||||
// persists. Full orphan-dataset deletion is a separate follow-up.
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_schema_with_allow_data_loss_promotes_drops_to_hard() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
|
||||
let desired = TEST_SCHEMA.replace(" age: I32?\n", "");
|
||||
|
||||
// Default plan (no flag) → Soft.
|
||||
let plan_soft = db.plan_schema(&desired).await.unwrap();
|
||||
assert!(plan_soft.steps.iter().any(|step| matches!(
|
||||
step,
|
||||
SchemaMigrationStep::DropProperty {
|
||||
mode: omnigraph_compiler::DropMode::Soft,
|
||||
..
|
||||
}
|
||||
)));
|
||||
|
||||
// With --allow-data-loss → Hard.
|
||||
let plan_hard = db
|
||||
.plan_schema_with_options(
|
||||
&desired,
|
||||
omnigraph::db::SchemaApplyOptions {
|
||||
allow_data_loss: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(plan_hard.supported);
|
||||
assert!(
|
||||
plan_hard.steps.iter().any(|step| matches!(
|
||||
step,
|
||||
SchemaMigrationStep::DropProperty {
|
||||
mode: omnigraph_compiler::DropMode::Hard,
|
||||
..
|
||||
}
|
||||
)),
|
||||
"with --allow-data-loss, DropProperty should be promoted to Hard: {plan_hard:?}",
|
||||
);
|
||||
// Negative: no remaining Soft drops in the promoted plan.
|
||||
assert!(
|
||||
!plan_hard.steps.iter().any(|step| matches!(
|
||||
step,
|
||||
SchemaMigrationStep::DropProperty {
|
||||
mode: omnigraph_compiler::DropMode::Soft,
|
||||
..
|
||||
} | SchemaMigrationStep::DropType {
|
||||
mode: omnigraph_compiler::DropMode::Soft,
|
||||
..
|
||||
}
|
||||
)),
|
||||
"promoted plan should have no Soft drops left: {plan_hard:?}",
|
||||
);
|
||||
|
||||
// Apply with flag succeeds.
|
||||
let result = db
|
||||
.apply_schema_with_options(
|
||||
&desired,
|
||||
omnigraph::db::SchemaApplyOptions {
|
||||
allow_data_loss: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.applied);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_schema_hard_drops_property_makes_prior_version_unreachable() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
let before_version = db
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap()
|
||||
.version();
|
||||
|
||||
// Hard drop the `age` column. Soft drop would leave the prior
|
||||
// dataset version intact; Hard drop runs cleanup_old_versions on
|
||||
// the dataset post-apply, removing the prior version.
|
||||
let desired = TEST_SCHEMA.replace(" age: I32?\n", "");
|
||||
let result = db
|
||||
.apply_schema_with_options(
|
||||
&desired,
|
||||
omnigraph::db::SchemaApplyOptions {
|
||||
allow_data_loss: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.applied);
|
||||
|
||||
// Current snapshot: column gone from the dataset schema.
|
||||
let current_snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
||||
let current_ds = current_snapshot.open("node:Person").await.unwrap();
|
||||
let current_fields = current_ds
|
||||
.schema()
|
||||
.fields
|
||||
.iter()
|
||||
.map(|f| f.name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
assert!(
|
||||
!current_fields.iter().any(|f| f == "age"),
|
||||
"current Person schema must not include 'age' after hard drop; got {current_fields:?}",
|
||||
);
|
||||
|
||||
// Time travel: at the pre-drop manifest version, the entry points
|
||||
// at the OLD dataset version which has been cleaned up. Opening
|
||||
// the dataset at that snapshot should fail (Lance can't load the
|
||||
// dropped version). This is the Hard-mode contract — the prior
|
||||
// data is unreachable.
|
||||
let pre_drop = db.snapshot_at_version(before_version).await.unwrap();
|
||||
let open_result = pre_drop.open("node:Person").await;
|
||||
assert!(
|
||||
open_result.is_err(),
|
||||
"after hard drop + cleanup, pre-drop snapshot.open() must fail (prior version was reclaimed); got {open_result:?}",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_schema_hard_drops_node_and_edge_with_flag_succeeds() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut db = init_and_load(&dir).await;
|
||||
let before_version = db
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap()
|
||||
.version();
|
||||
|
||||
let desired = r#"
|
||||
node Person {
|
||||
name: String @key
|
||||
age: I32?
|
||||
}
|
||||
|
||||
edge Knows: Person -> Person {
|
||||
since: Date?
|
||||
}
|
||||
"#;
|
||||
|
||||
let plan = db
|
||||
.plan_schema_with_options(
|
||||
desired,
|
||||
omnigraph::db::SchemaApplyOptions {
|
||||
allow_data_loss: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(plan.supported);
|
||||
assert!(
|
||||
plan.steps.iter().any(|step| matches!(
|
||||
step,
|
||||
SchemaMigrationStep::DropType {
|
||||
type_kind: SchemaTypeKind::Node,
|
||||
mode: omnigraph_compiler::DropMode::Hard,
|
||||
..
|
||||
}
|
||||
)),
|
||||
"with --allow-data-loss, DropType {{ Node }} should be Hard: {plan:?}",
|
||||
);
|
||||
assert!(
|
||||
plan.steps.iter().any(|step| matches!(
|
||||
step,
|
||||
SchemaMigrationStep::DropType {
|
||||
type_kind: SchemaTypeKind::Edge,
|
||||
mode: omnigraph_compiler::DropMode::Hard,
|
||||
..
|
||||
}
|
||||
)),
|
||||
"with --allow-data-loss, DropType {{ Edge }} should be Hard: {plan:?}",
|
||||
);
|
||||
|
||||
let result = db
|
||||
.apply_schema_with_options(
|
||||
desired,
|
||||
omnigraph::db::SchemaApplyOptions {
|
||||
allow_data_loss: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.applied);
|
||||
|
||||
let after_version = db
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap()
|
||||
.version();
|
||||
assert!(after_version > before_version);
|
||||
|
||||
// Current manifest: both dropped entries gone.
|
||||
let current = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
||||
assert!(current.entry("node:Company").is_none());
|
||||
assert!(current.entry("edge:WorksAt").is_none());
|
||||
|
||||
// NOTE: DropType Hard's cleanup of the orphan dataset directory
|
||||
// is a known follow-up (the manifest entry is tombstoned and the
|
||||
// dataset's prior versions are cleaned, but the directory itself
|
||||
// persists until an orphan-cleanup pass is implemented). For the
|
||||
// current contract, the data is *unreachable* via omnigraph
|
||||
// (no manifest entry), which is the user-facing guarantee.
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue