From f75b941a9e21935cb23c27568bea5869d0bd26f5 Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Mon, 27 Apr 2026 16:21:00 +0300 Subject: [PATCH] Make schema apply atomic across crashes (#57) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schema apply previously committed the manifest before writing the schema source and IR contract files. A crash in that window left the manifest pointing at the new schema while _schema.pg, _schema.ir.json, and __schema_state.json still reflected the old one — a silent inconsistency that subsequent reads hit as type errors. Reorders the apply: write to staging filenames first, commit the manifest, then atomically rename staging → final. On open, a recovery sweep reconciles any leftover staging files against the manifest's table set: pre-commit crashes get the staging files deleted, post-commit crashes get the renames completed (idempotent — handles partial renames). Property-only migrations where both schemas imply the same table set return an operator-actionable error rather than guessing. Adds rename_text + delete to StorageAdapter (atomic on local FS via tokio::fs::rename; copy + delete on S3 — recovery is tolerant of the non-atomic case). Failpoints test coverage at both crash boundaries plus a partial-rename scenario. Co-authored-by: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/db/omnigraph.rs | 30 ++- .../src/db/omnigraph/schema_apply.rs | 34 ++- crates/omnigraph/src/db/schema_state.rs | 237 +++++++++++++++++- crates/omnigraph/src/storage.rs | 50 ++++ crates/omnigraph/tests/failpoints.rs | 111 +++++++- 5 files changed, 448 insertions(+), 14 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 0f251ba..302bb49 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -42,7 +42,9 @@ use super::manifest::{ }; use super::schema_state::{ SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir, - validate_schema_contract, write_schema_contract, + recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri, + schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract, + write_schema_contract, write_schema_contract_staging, }; use super::{ ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId, @@ -130,11 +132,16 @@ impl Omnigraph { storage: Arc, ) -> Result { let root = normalize_root_uri(uri)?; - // Read _schema.pg - let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME); + // Open the coordinator first so the schema-staging recovery sweep can + // compare its snapshot against any leftover staging files. Recovery + // either deletes staging (pre-commit crash) or completes the rename + // (post-commit crash) before the live schema files are read. + let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?; + recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?; + // Read _schema.pg (post-recovery — may have just been renamed in). + let schema_path = schema_source_uri(&root); let schema_source = storage.read_text(&schema_path).await?; let current_source_ir = read_schema_ir_from_source(&schema_source)?; - let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?; let branches = coordinator.branch_list().await?; let (accepted_ir, _) = load_or_bootstrap_schema_contract( &root, @@ -1562,6 +1569,8 @@ edge WorksAt: Person -> Company reads: Mutex>, writes: Mutex>, exists_checks: Mutex>, + renames: Mutex>, + deletes: Mutex>, } impl RecordingStorageAdapter { @@ -1594,6 +1603,19 @@ edge WorksAt: Person -> Company self.exists_checks.lock().unwrap().push(uri.to_string()); self.inner.exists(uri).await } + + async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { + self.renames + .lock() + .unwrap() + .push((from_uri.to_string(), to_uri.to_string())); + self.inner.rename_text(from_uri, to_uri).await + } + + async fn delete(&self, uri: &str) -> Result<()> { + self.deletes.lock().unwrap().push(uri.to_string()); + self.inner.delete(uri).await + } } #[tokio::test] diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index bda47e1..1ef5907 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -314,6 +314,22 @@ pub(super) async fn apply_schema_with_lock( ))); } + // Atomic schema apply. + // + // Write the new schema source + IR contract to staging filenames first, + // then commit the manifest, then rename staging → final. A crash + // between these stages is recoverable on next open via + // `recover_schema_state_files`: + // - crash before commit → manifest unchanged; staging deleted on open + // - crash after commit → manifest advanced; staging renamed on open + let staging_pg_uri = schema_source_staging_uri(&db.root_uri); + db.storage + .write_text(&staging_pg_uri, desired_schema_source) + .await?; + write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?; + + crate::failpoints::maybe_fail("schema_apply.after_staging_write")?; + let actor_id = db.current_audit_actor().map(str::to_string); let PublishedSnapshot { manifest_version, @@ -323,11 +339,23 @@ pub(super) async fn apply_schema_with_lock( .commit_changes_with_actor(&manifest_changes, actor_id.as_deref()) .await?; - let schema_path = join_uri(&db.root_uri, SCHEMA_SOURCE_FILENAME); + crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?; + db.storage - .write_text(&schema_path, desired_schema_source) + .rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri)) + .await?; + db.storage + .rename_text( + &schema_ir_staging_uri(&db.root_uri), + &schema_ir_uri(&db.root_uri), + ) + .await?; + db.storage + .rename_text( + &schema_state_staging_uri(&db.root_uri), + &schema_state_uri(&db.root_uri), + ) .await?; - write_schema_contract(&db.root_uri, db.storage.as_ref(), &desired_ir).await?; db.catalog = desired_catalog; db.schema_source = desired_schema_source.to_string(); diff --git a/crates/omnigraph/src/db/schema_state.rs b/crates/omnigraph/src/db/schema_state.rs index c62f72e..8ba24f4 100644 --- a/crates/omnigraph/src/db/schema_state.rs +++ b/crates/omnigraph/src/db/schema_state.rs @@ -1,9 +1,12 @@ +use std::collections::BTreeSet; use std::sync::Arc; use omnigraph_compiler::schema::parser::parse_schema; use omnigraph_compiler::{SchemaIR, build_schema_ir, schema_ir_hash, schema_ir_pretty_json}; use serde::{Deserialize, Serialize}; +use tracing::warn; +use crate::db::manifest::Snapshot; use crate::error::{OmniError, Result}; use crate::storage::{StorageAdapter, join_uri}; @@ -11,6 +14,13 @@ pub(crate) const SCHEMA_SOURCE_FILENAME: &str = "_schema.pg"; pub(crate) const SCHEMA_IR_FILENAME: &str = "_schema.ir.json"; pub(crate) const SCHEMA_STATE_FILENAME: &str = "__schema_state.json"; +// Staging filenames used by atomic schema apply. Schema apply writes to these +// first, then commits the manifest, then renames staging → final. Recovery on +// open reconciles any leftover staging files against the manifest. +pub(crate) const SCHEMA_SOURCE_STAGING_FILENAME: &str = "_schema.pg.staging"; +pub(crate) const SCHEMA_IR_STAGING_FILENAME: &str = "_schema.ir.json.staging"; +pub(crate) const SCHEMA_STATE_STAGING_FILENAME: &str = "__schema_state.json.staging"; + const SCHEMA_STATE_FORMAT_VERSION: u32 = 1; const SCHEMA_IDENTITY_VERSION: u32 = 1; @@ -87,6 +97,38 @@ pub(crate) async fn write_schema_contract( root_uri: &str, storage: &dyn StorageAdapter, schema_ir: &SchemaIR, +) -> Result { + write_schema_contract_to( + storage, + &schema_ir_uri(root_uri), + &schema_state_uri(root_uri), + schema_ir, + ) + .await +} + +/// Variant of `write_schema_contract` that writes the IR + state JSON to the +/// staging filenames. Used by atomic schema apply: staging files are written +/// before the manifest commit, then renamed to the final names afterward. +pub(crate) async fn write_schema_contract_staging( + root_uri: &str, + storage: &dyn StorageAdapter, + schema_ir: &SchemaIR, +) -> Result { + write_schema_contract_to( + storage, + &schema_ir_staging_uri(root_uri), + &schema_state_staging_uri(root_uri), + schema_ir, + ) + .await +} + +async fn write_schema_contract_to( + storage: &dyn StorageAdapter, + ir_uri: &str, + state_uri: &str, + schema_ir: &SchemaIR, ) -> Result { let ir_json = schema_ir_pretty_json(schema_ir) .map_err(|err| OmniError::manifest_internal(err.to_string()))?; @@ -97,12 +139,8 @@ pub(crate) async fn write_schema_contract( OmniError::manifest_internal(format!("serialize schema state error: {}", err)) })?; - storage - .write_text(&schema_ir_uri(root_uri), &ir_json) - .await?; - storage - .write_text(&schema_state_uri(root_uri), &state_json) - .await?; + storage.write_text(ir_uri, &ir_json).await?; + storage.write_text(state_uri, &state_json).await?; Ok(state) } @@ -143,6 +181,18 @@ pub(crate) fn schema_state_uri(root_uri: &str) -> String { join_uri(root_uri, SCHEMA_STATE_FILENAME) } +pub(crate) fn schema_source_staging_uri(root_uri: &str) -> String { + join_uri(root_uri, SCHEMA_SOURCE_STAGING_FILENAME) +} + +pub(crate) fn schema_ir_staging_uri(root_uri: &str) -> String { + join_uri(root_uri, SCHEMA_IR_STAGING_FILENAME) +} + +pub(crate) fn schema_state_staging_uri(root_uri: &str) -> String { + join_uri(root_uri, SCHEMA_STATE_STAGING_FILENAME) +} + enum SchemaContractRead { Present { ir: SchemaIR, state: SchemaState }, MissingAll, @@ -234,3 +284,178 @@ fn schema_lock_conflict(detail: impl Into) -> OmniError { detail.into() )) } + +/// Reconcile leftover schema staging files (`_schema.pg.staging`, +/// `_schema.ir.json.staging`, `__schema_state.json.staging`) against the +/// manifest snapshot. +/// +/// Atomic schema apply writes these staging files before committing the +/// manifest, then renames them to their final names. A crash mid-apply can +/// leave staging files behind. This function determines whether the crash +/// happened before or after the manifest commit and either deletes the +/// staging files (pre-commit) or completes the rename (post-commit). +/// +/// The discriminator is the manifest's set of registered table keys: it +/// matches the schema source whose state corresponds to the manifest's +/// current version. For migrations that change the table set +/// (add/remove/rename a node or edge type), this is unambiguous. For +/// property-only migrations where both schemas imply the same table set, +/// recovery cannot disambiguate from staging files alone and returns an +/// operator-actionable error rather than guessing. +pub(crate) async fn recover_schema_state_files( + root_uri: &str, + storage: Arc, + snapshot: &Snapshot, +) -> Result<()> { + let pg_staging = schema_source_staging_uri(root_uri); + let ir_staging = schema_ir_staging_uri(root_uri); + let state_staging = schema_state_staging_uri(root_uri); + + let pg_exists = storage.exists(&pg_staging).await?; + let ir_exists = storage.exists(&ir_staging).await?; + let state_exists = storage.exists(&state_staging).await?; + + if !pg_exists && !ir_exists && !state_exists { + return Ok(()); + } + + if !pg_exists { + // _schema.pg.staging is gone but at least one of the other staging + // files is still present. This is a partial-rename: the post-commit + // crash happened mid-rename (after _schema.pg was renamed in but + // before _schema.ir.json or __schema_state.json was). The live + // _schema.pg should already reflect the new schema; verify that + // and complete the remaining renames. + let live_source = storage.read_text(&schema_source_uri(root_uri)).await?; + let live_ir = compile_schema_source(&live_source)?; + let live_hash = + schema_ir_hash(&live_ir).map_err(|err| schema_lock_conflict(err.to_string()))?; + if state_exists { + let state_json = storage.read_text(&state_staging).await?; + let staging_state = serde_json::from_str::(&state_json) + .map_err(|err| schema_lock_conflict(err.to_string()))?; + if staging_state.schema_ir_hash != live_hash { + return Err(schema_lock_conflict(format!( + "found schema staging files (ir/state) without _schema.pg.staging, and the live _schema.pg does not match the staging schema state hash; manual operator action required (manifest v{})", + snapshot.version() + ))); + } + } + warn!( + "completing partial schema-file rename (manifest v{})", + snapshot.version() + ); + complete_staging_rename(root_uri, storage.as_ref()).await?; + return Ok(()); + } + + let staging_source = storage.read_text(&pg_staging).await?; + let staging_ir = compile_schema_source(&staging_source)?; + + let live_source = storage.read_text(&schema_source_uri(root_uri)).await?; + let live_ir = compile_schema_source(&live_source)?; + + let staging_hash = + schema_ir_hash(&staging_ir).map_err(|err| schema_lock_conflict(err.to_string()))?; + let live_hash = + schema_ir_hash(&live_ir).map_err(|err| schema_lock_conflict(err.to_string()))?; + + if staging_hash == live_hash { + warn!( + "removing leftover schema staging files matching the live schema (no-op apply that crashed)" + ); + cleanup_staging_files(root_uri, storage.as_ref()).await?; + return Ok(()); + } + + let live_keys = expected_table_keys(&live_ir); + let staging_keys = expected_table_keys(&staging_ir); + let actual_keys: BTreeSet = snapshot + .entries() + .map(|entry| entry.table_key.clone()) + .collect(); + + if live_keys == staging_keys { + return Err(schema_lock_conflict(format!( + "found schema staging files but cannot disambiguate pre- vs post-commit crash: live and staging schemas imply identical table sets (likely a property-only migration). Inspect _schema.pg.staging vs _schema.pg manually and either remove the staging files (to keep the live schema) or replace _schema.pg with the staging file (to apply the new schema). Manifest version: v{}", + snapshot.version() + ))); + } + + if actual_keys == live_keys { + warn!( + "schema apply crashed before manifest commit; removing staging files and keeping live schema (manifest v{})", + snapshot.version() + ); + cleanup_staging_files(root_uri, storage.as_ref()).await?; + Ok(()) + } else if actual_keys == staging_keys { + warn!( + "schema apply crashed after manifest commit; completing schema-file rename (manifest v{})", + snapshot.version() + ); + complete_staging_rename(root_uri, storage.as_ref()).await?; + Ok(()) + } else { + Err(schema_lock_conflict(format!( + "found schema staging files but the manifest's table set ({:?}) matches neither the live schema ({:?}) nor the staging schema ({:?}); manual operator action required", + actual_keys, live_keys, staging_keys + ))) + } +} + +async fn cleanup_staging_files(root_uri: &str, storage: &dyn StorageAdapter) -> Result<()> { + storage.delete(&schema_source_staging_uri(root_uri)).await?; + storage.delete(&schema_ir_staging_uri(root_uri)).await?; + storage + .delete(&schema_state_staging_uri(root_uri)) + .await?; + Ok(()) +} + +async fn complete_staging_rename(root_uri: &str, storage: &dyn StorageAdapter) -> Result<()> { + // Each rename is independent and idempotent: if the source no longer + // exists (already renamed) we skip it. This handles partial-rename + // recovery (e.g. one file renamed before crash). + rename_if_present( + storage, + &schema_source_staging_uri(root_uri), + &schema_source_uri(root_uri), + ) + .await?; + rename_if_present( + storage, + &schema_ir_staging_uri(root_uri), + &schema_ir_uri(root_uri), + ) + .await?; + rename_if_present( + storage, + &schema_state_staging_uri(root_uri), + &schema_state_uri(root_uri), + ) + .await?; + Ok(()) +} + +async fn rename_if_present( + storage: &dyn StorageAdapter, + from_uri: &str, + to_uri: &str, +) -> Result<()> { + if storage.exists(from_uri).await? { + storage.rename_text(from_uri, to_uri).await?; + } + Ok(()) +} + +fn expected_table_keys(ir: &SchemaIR) -> BTreeSet { + let mut keys = BTreeSet::new(); + for node in &ir.nodes { + keys.insert(format!("node:{}", node.name)); + } + for edge in &ir.edges { + keys.insert(format!("edge:{}", edge.name)); + } + keys +} diff --git a/crates/omnigraph/src/storage.rs b/crates/omnigraph/src/storage.rs index 73d9441..c252e0e 100644 --- a/crates/omnigraph/src/storage.rs +++ b/crates/omnigraph/src/storage.rs @@ -20,6 +20,13 @@ pub trait StorageAdapter: Debug + Send + Sync { async fn read_text(&self, uri: &str) -> Result; async fn write_text(&self, uri: &str, contents: &str) -> Result<()>; async fn exists(&self, uri: &str) -> Result; + /// Move a file from `from_uri` to `to_uri`, replacing any existing file at + /// `to_uri`. Atomic on local POSIX; on S3 implemented as copy + delete + /// (NOT atomic — callers that depend on atomicity for crash recovery must + /// tolerate "both source and destination exist after a crash"). + async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>; + /// Remove a file. Returns Ok(()) if the file does not exist. + async fn delete(&self, uri: &str) -> Result<()>; } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -59,6 +66,22 @@ impl StorageAdapter for LocalStorageAdapter { async fn exists(&self, uri: &str) -> Result { Ok(local_path_from_uri(uri)?.exists()) } + + async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { + let from = local_path_from_uri(from_uri)?; + let to = local_path_from_uri(to_uri)?; + tokio::fs::rename(&from, &to).await?; + Ok(()) + } + + async fn delete(&self, uri: &str) -> Result<()> { + let path = local_path_from_uri(uri)?; + match tokio::fs::remove_file(&path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + } + } } #[async_trait] @@ -104,6 +127,33 @@ impl StorageAdapter for S3StorageAdapter { Err(err) => Err(storage_backend_error("exists", uri, err)), } } + + async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { + // S3 has no atomic rename. Copy then delete; if the copy succeeds and + // the delete fails (or the process crashes between them), both + // source and destination exist with the same content. Recovery code + // must tolerate this case — see schema_state::recover_schema_state_files. + let from = self.object_path(from_uri)?; + let to = self.object_path(to_uri)?; + self.store + .copy(&from, &to) + .await + .map_err(|err| storage_backend_error("rename:copy", from_uri, err))?; + self.store + .delete(&from) + .await + .map_err(|err| storage_backend_error("rename:delete", from_uri, err))?; + Ok(()) + } + + async fn delete(&self, uri: &str) -> Result<()> { + let location = self.object_path(uri)?; + match self.store.delete(&location).await { + Ok(()) => Ok(()), + Err(object_store::Error::NotFound { .. }) => Ok(()), + Err(err) => Err(storage_backend_error("delete", uri, err)), + } + } } impl S3StorageAdapter { diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index c1ca555..bdc5f83 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -6,7 +6,11 @@ use fail::FailScenario; use omnigraph::db::Omnigraph; use omnigraph::failpoints::ScopedFailPoint; -use helpers::{MUTATION_QUERIES, mixed_params}; +use helpers::{MUTATION_QUERIES, mixed_params, mutate_main}; + +const SCHEMA_V1: &str = "node Person { name: String @key }\n"; +const SCHEMA_V2_ADDED_TYPE: &str = + "node Person { name: String @key }\nnode Company { name: String @key }\n"; #[tokio::test] async fn branch_create_failpoint_triggers() { @@ -45,3 +49,108 @@ async fn graph_publish_failpoint_triggers_before_commit_append() { .contains("injected failpoint triggered: graph_publish.before_commit_append") ); } + +// Atomic schema apply: schema apply writes staging files first, then commits +// the manifest, then renames staging → final. Tests below inject crashes at +// the two boundaries and assert that reopening the repo yields a consistent +// state. + +#[tokio::test] +async fn schema_apply_recovers_pre_commit_crash() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + { + let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap(); + let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return"); + let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: schema_apply.after_staging_write"), + "got: {}", + err + ); + } + + // Reopen — recovery sweep should delete staging files and keep the + // original schema, since the manifest commit never happened. + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!(db.schema_source(), SCHEMA_V1); + assert_no_staging_files(dir.path()); +} + +#[tokio::test] +async fn schema_apply_recovers_post_commit_crash() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + { + let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap(); + let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return"); + let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: schema_apply.after_manifest_commit"), + "got: {}", + err + ); + } + + // Reopen — manifest is at the new version, so recovery sweep should + // complete the rename and the live schema matches v2. + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE); + assert_no_staging_files(dir.path()); +} + +#[tokio::test] +async fn schema_apply_recovers_partial_rename() { + // Construct a partial-rename state: _schema.pg has been renamed in + // (matching v2), but _schema.ir.json.staging and __schema_state.json.staging + // were never renamed. Recovery should detect that the live source matches + // the staging state's hash and complete the remaining renames. + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + { + let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap(); + db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap(); + } + + // Simulate: one of the renames (the IR or state file) didn't complete by + // copying the live ir/state files back to their staging names. + std::fs::copy( + dir.path().join("_schema.ir.json"), + dir.path().join("_schema.ir.json.staging"), + ) + .unwrap(); + std::fs::copy( + dir.path().join("__schema_state.json"), + dir.path().join("__schema_state.json.staging"), + ) + .unwrap(); + + // Reopen — recovery should complete the rename (overwriting final files + // with identical staging content) and remove the staging files. + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE); + assert_no_staging_files(dir.path()); +} + +fn assert_no_staging_files(repo: &std::path::Path) { + for name in [ + "_schema.pg.staging", + "_schema.ir.json.staging", + "__schema_state.json.staging", + ] { + let path = repo.join(name); + assert!( + !path.exists(), + "staging file {} still exists after recovery", + path.display() + ); + } +}