Make schema apply atomic across crashes (#57)

Schema apply previously committed the manifest before writing the schema
source and IR contract files. A crash in that window left the manifest
pointing at the new schema while _schema.pg, _schema.ir.json, and
__schema_state.json still reflected the old one — a silent inconsistency
that subsequent reads hit as type errors.

Reorders the apply: write to staging filenames first, commit the
manifest, then atomically rename staging → final. On open, a recovery
sweep reconciles any leftover staging files against the manifest's table
set: pre-commit crashes get the staging files deleted, post-commit
crashes get the renames completed (idempotent — handles partial
renames). Property-only migrations where both schemas imply the same
table set return an operator-actionable error rather than guessing.

Adds rename_text + delete to StorageAdapter (atomic on local FS via
tokio::fs::rename; copy + delete on S3 — recovery is tolerant of the
non-atomic case). Failpoints test coverage at both crash boundaries plus
a partial-rename scenario.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-04-27 16:21:00 +03:00 committed by GitHub
parent 372f793ad6
commit f75b941a9e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 448 additions and 14 deletions

View file

@ -42,7 +42,9 @@ use super::manifest::{
};
use super::schema_state::{
SCHEMA_SOURCE_FILENAME, load_or_bootstrap_schema_contract, read_accepted_schema_ir,
validate_schema_contract, write_schema_contract,
recover_schema_state_files, schema_ir_staging_uri, schema_ir_uri, schema_source_staging_uri,
schema_source_uri, schema_state_staging_uri, schema_state_uri, validate_schema_contract,
write_schema_contract, write_schema_contract_staging,
};
use super::{
ReadTarget, ResolvedTarget, RunId, SCHEMA_APPLY_LOCK_BRANCH, SnapshotId,
@ -130,11 +132,16 @@ impl Omnigraph {
storage: Arc<dyn StorageAdapter>,
) -> Result<Self> {
let root = normalize_root_uri(uri)?;
// Read _schema.pg
let schema_path = join_uri(&root, SCHEMA_SOURCE_FILENAME);
// Open the coordinator first so the schema-staging recovery sweep can
// compare its snapshot against any leftover staging files. Recovery
// either deletes staging (pre-commit crash) or completes the rename
// (post-commit crash) before the live schema files are read.
let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?;
// Read _schema.pg (post-recovery — may have just been renamed in).
let schema_path = schema_source_uri(&root);
let schema_source = storage.read_text(&schema_path).await?;
let current_source_ir = read_schema_ir_from_source(&schema_source)?;
let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
let branches = coordinator.branch_list().await?;
let (accepted_ir, _) = load_or_bootstrap_schema_contract(
&root,
@ -1562,6 +1569,8 @@ edge WorksAt: Person -> Company
reads: Mutex<Vec<String>>,
writes: Mutex<Vec<String>>,
exists_checks: Mutex<Vec<String>>,
renames: Mutex<Vec<(String, String)>>,
deletes: Mutex<Vec<String>>,
}
impl RecordingStorageAdapter {
@ -1594,6 +1603,19 @@ edge WorksAt: Person -> Company
self.exists_checks.lock().unwrap().push(uri.to_string());
self.inner.exists(uri).await
}
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
self.renames
.lock()
.unwrap()
.push((from_uri.to_string(), to_uri.to_string()));
self.inner.rename_text(from_uri, to_uri).await
}
async fn delete(&self, uri: &str) -> Result<()> {
self.deletes.lock().unwrap().push(uri.to_string());
self.inner.delete(uri).await
}
}
#[tokio::test]

View file

@ -314,6 +314,22 @@ pub(super) async fn apply_schema_with_lock(
)));
}
// Atomic schema apply.
//
// Write the new schema source + IR contract to staging filenames first,
// then commit the manifest, then rename staging → final. A crash
// between these stages is recoverable on next open via
// `recover_schema_state_files`:
// - crash before commit → manifest unchanged; staging deleted on open
// - crash after commit → manifest advanced; staging renamed on open
let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
db.storage
.write_text(&staging_pg_uri, desired_schema_source)
.await?;
write_schema_contract_staging(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
crate::failpoints::maybe_fail("schema_apply.after_staging_write")?;
let actor_id = db.current_audit_actor().map(str::to_string);
let PublishedSnapshot {
manifest_version,
@ -323,11 +339,23 @@ pub(super) async fn apply_schema_with_lock(
.commit_changes_with_actor(&manifest_changes, actor_id.as_deref())
.await?;
let schema_path = join_uri(&db.root_uri, SCHEMA_SOURCE_FILENAME);
crate::failpoints::maybe_fail("schema_apply.after_manifest_commit")?;
db.storage
.write_text(&schema_path, desired_schema_source)
.rename_text(&staging_pg_uri, &schema_source_uri(&db.root_uri))
.await?;
db.storage
.rename_text(
&schema_ir_staging_uri(&db.root_uri),
&schema_ir_uri(&db.root_uri),
)
.await?;
db.storage
.rename_text(
&schema_state_staging_uri(&db.root_uri),
&schema_state_uri(&db.root_uri),
)
.await?;
write_schema_contract(&db.root_uri, db.storage.as_ref(), &desired_ir).await?;
db.catalog = desired_catalog;
db.schema_source = desired_schema_source.to_string();

View file

@ -1,9 +1,12 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::{SchemaIR, build_schema_ir, schema_ir_hash, schema_ir_pretty_json};
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::db::manifest::Snapshot;
use crate::error::{OmniError, Result};
use crate::storage::{StorageAdapter, join_uri};
@ -11,6 +14,13 @@ pub(crate) const SCHEMA_SOURCE_FILENAME: &str = "_schema.pg";
pub(crate) const SCHEMA_IR_FILENAME: &str = "_schema.ir.json";
pub(crate) const SCHEMA_STATE_FILENAME: &str = "__schema_state.json";
// Staging filenames used by atomic schema apply. Schema apply writes to these
// first, then commits the manifest, then renames staging → final. Recovery on
// open reconciles any leftover staging files against the manifest.
pub(crate) const SCHEMA_SOURCE_STAGING_FILENAME: &str = "_schema.pg.staging";
pub(crate) const SCHEMA_IR_STAGING_FILENAME: &str = "_schema.ir.json.staging";
pub(crate) const SCHEMA_STATE_STAGING_FILENAME: &str = "__schema_state.json.staging";
const SCHEMA_STATE_FORMAT_VERSION: u32 = 1;
const SCHEMA_IDENTITY_VERSION: u32 = 1;
@ -87,6 +97,38 @@ pub(crate) async fn write_schema_contract(
root_uri: &str,
storage: &dyn StorageAdapter,
schema_ir: &SchemaIR,
) -> Result<SchemaState> {
write_schema_contract_to(
storage,
&schema_ir_uri(root_uri),
&schema_state_uri(root_uri),
schema_ir,
)
.await
}
/// Variant of `write_schema_contract` that writes the IR + state JSON to the
/// staging filenames. Used by atomic schema apply: staging files are written
/// before the manifest commit, then renamed to the final names afterward.
pub(crate) async fn write_schema_contract_staging(
root_uri: &str,
storage: &dyn StorageAdapter,
schema_ir: &SchemaIR,
) -> Result<SchemaState> {
write_schema_contract_to(
storage,
&schema_ir_staging_uri(root_uri),
&schema_state_staging_uri(root_uri),
schema_ir,
)
.await
}
async fn write_schema_contract_to(
storage: &dyn StorageAdapter,
ir_uri: &str,
state_uri: &str,
schema_ir: &SchemaIR,
) -> Result<SchemaState> {
let ir_json = schema_ir_pretty_json(schema_ir)
.map_err(|err| OmniError::manifest_internal(err.to_string()))?;
@ -97,12 +139,8 @@ pub(crate) async fn write_schema_contract(
OmniError::manifest_internal(format!("serialize schema state error: {}", err))
})?;
storage
.write_text(&schema_ir_uri(root_uri), &ir_json)
.await?;
storage
.write_text(&schema_state_uri(root_uri), &state_json)
.await?;
storage.write_text(ir_uri, &ir_json).await?;
storage.write_text(state_uri, &state_json).await?;
Ok(state)
}
@ -143,6 +181,18 @@ pub(crate) fn schema_state_uri(root_uri: &str) -> String {
join_uri(root_uri, SCHEMA_STATE_FILENAME)
}
pub(crate) fn schema_source_staging_uri(root_uri: &str) -> String {
join_uri(root_uri, SCHEMA_SOURCE_STAGING_FILENAME)
}
pub(crate) fn schema_ir_staging_uri(root_uri: &str) -> String {
join_uri(root_uri, SCHEMA_IR_STAGING_FILENAME)
}
pub(crate) fn schema_state_staging_uri(root_uri: &str) -> String {
join_uri(root_uri, SCHEMA_STATE_STAGING_FILENAME)
}
enum SchemaContractRead {
Present { ir: SchemaIR, state: SchemaState },
MissingAll,
@ -234,3 +284,178 @@ fn schema_lock_conflict(detail: impl Into<String>) -> OmniError {
detail.into()
))
}
/// Reconcile leftover schema staging files (`_schema.pg.staging`,
/// `_schema.ir.json.staging`, `__schema_state.json.staging`) against the
/// manifest snapshot.
///
/// Atomic schema apply writes these staging files before committing the
/// manifest, then renames them to their final names. A crash mid-apply can
/// leave staging files behind. This function determines whether the crash
/// happened before or after the manifest commit and either deletes the
/// staging files (pre-commit) or completes the rename (post-commit).
///
/// The discriminator is the manifest's set of registered table keys: it
/// matches the schema source whose state corresponds to the manifest's
/// current version. For migrations that change the table set
/// (add/remove/rename a node or edge type), this is unambiguous. For
/// property-only migrations where both schemas imply the same table set,
/// recovery cannot disambiguate from staging files alone and returns an
/// operator-actionable error rather than guessing.
pub(crate) async fn recover_schema_state_files(
root_uri: &str,
storage: Arc<dyn StorageAdapter>,
snapshot: &Snapshot,
) -> Result<()> {
let pg_staging = schema_source_staging_uri(root_uri);
let ir_staging = schema_ir_staging_uri(root_uri);
let state_staging = schema_state_staging_uri(root_uri);
let pg_exists = storage.exists(&pg_staging).await?;
let ir_exists = storage.exists(&ir_staging).await?;
let state_exists = storage.exists(&state_staging).await?;
if !pg_exists && !ir_exists && !state_exists {
return Ok(());
}
if !pg_exists {
// _schema.pg.staging is gone but at least one of the other staging
// files is still present. This is a partial-rename: the post-commit
// crash happened mid-rename (after _schema.pg was renamed in but
// before _schema.ir.json or __schema_state.json was). The live
// _schema.pg should already reflect the new schema; verify that
// and complete the remaining renames.
let live_source = storage.read_text(&schema_source_uri(root_uri)).await?;
let live_ir = compile_schema_source(&live_source)?;
let live_hash =
schema_ir_hash(&live_ir).map_err(|err| schema_lock_conflict(err.to_string()))?;
if state_exists {
let state_json = storage.read_text(&state_staging).await?;
let staging_state = serde_json::from_str::<SchemaState>(&state_json)
.map_err(|err| schema_lock_conflict(err.to_string()))?;
if staging_state.schema_ir_hash != live_hash {
return Err(schema_lock_conflict(format!(
"found schema staging files (ir/state) without _schema.pg.staging, and the live _schema.pg does not match the staging schema state hash; manual operator action required (manifest v{})",
snapshot.version()
)));
}
}
warn!(
"completing partial schema-file rename (manifest v{})",
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
return Ok(());
}
let staging_source = storage.read_text(&pg_staging).await?;
let staging_ir = compile_schema_source(&staging_source)?;
let live_source = storage.read_text(&schema_source_uri(root_uri)).await?;
let live_ir = compile_schema_source(&live_source)?;
let staging_hash =
schema_ir_hash(&staging_ir).map_err(|err| schema_lock_conflict(err.to_string()))?;
let live_hash =
schema_ir_hash(&live_ir).map_err(|err| schema_lock_conflict(err.to_string()))?;
if staging_hash == live_hash {
warn!(
"removing leftover schema staging files matching the live schema (no-op apply that crashed)"
);
cleanup_staging_files(root_uri, storage.as_ref()).await?;
return Ok(());
}
let live_keys = expected_table_keys(&live_ir);
let staging_keys = expected_table_keys(&staging_ir);
let actual_keys: BTreeSet<String> = snapshot
.entries()
.map(|entry| entry.table_key.clone())
.collect();
if live_keys == staging_keys {
return Err(schema_lock_conflict(format!(
"found schema staging files but cannot disambiguate pre- vs post-commit crash: live and staging schemas imply identical table sets (likely a property-only migration). Inspect _schema.pg.staging vs _schema.pg manually and either remove the staging files (to keep the live schema) or replace _schema.pg with the staging file (to apply the new schema). Manifest version: v{}",
snapshot.version()
)));
}
if actual_keys == live_keys {
warn!(
"schema apply crashed before manifest commit; removing staging files and keeping live schema (manifest v{})",
snapshot.version()
);
cleanup_staging_files(root_uri, storage.as_ref()).await?;
Ok(())
} else if actual_keys == staging_keys {
warn!(
"schema apply crashed after manifest commit; completing schema-file rename (manifest v{})",
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
Ok(())
} else {
Err(schema_lock_conflict(format!(
"found schema staging files but the manifest's table set ({:?}) matches neither the live schema ({:?}) nor the staging schema ({:?}); manual operator action required",
actual_keys, live_keys, staging_keys
)))
}
}
async fn cleanup_staging_files(root_uri: &str, storage: &dyn StorageAdapter) -> Result<()> {
storage.delete(&schema_source_staging_uri(root_uri)).await?;
storage.delete(&schema_ir_staging_uri(root_uri)).await?;
storage
.delete(&schema_state_staging_uri(root_uri))
.await?;
Ok(())
}
async fn complete_staging_rename(root_uri: &str, storage: &dyn StorageAdapter) -> Result<()> {
// Each rename is independent and idempotent: if the source no longer
// exists (already renamed) we skip it. This handles partial-rename
// recovery (e.g. one file renamed before crash).
rename_if_present(
storage,
&schema_source_staging_uri(root_uri),
&schema_source_uri(root_uri),
)
.await?;
rename_if_present(
storage,
&schema_ir_staging_uri(root_uri),
&schema_ir_uri(root_uri),
)
.await?;
rename_if_present(
storage,
&schema_state_staging_uri(root_uri),
&schema_state_uri(root_uri),
)
.await?;
Ok(())
}
async fn rename_if_present(
storage: &dyn StorageAdapter,
from_uri: &str,
to_uri: &str,
) -> Result<()> {
if storage.exists(from_uri).await? {
storage.rename_text(from_uri, to_uri).await?;
}
Ok(())
}
fn expected_table_keys(ir: &SchemaIR) -> BTreeSet<String> {
let mut keys = BTreeSet::new();
for node in &ir.nodes {
keys.insert(format!("node:{}", node.name));
}
for edge in &ir.edges {
keys.insert(format!("edge:{}", edge.name));
}
keys
}

View file

@ -20,6 +20,13 @@ pub trait StorageAdapter: Debug + Send + Sync {
async fn read_text(&self, uri: &str) -> Result<String>;
async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
async fn exists(&self, uri: &str) -> Result<bool>;
/// Move a file from `from_uri` to `to_uri`, replacing any existing file at
/// `to_uri`. Atomic on local POSIX; on S3 implemented as copy + delete
/// (NOT atomic — callers that depend on atomicity for crash recovery must
/// tolerate "both source and destination exist after a crash").
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
/// Remove a file. Returns Ok(()) if the file does not exist.
async fn delete(&self, uri: &str) -> Result<()>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -59,6 +66,22 @@ impl StorageAdapter for LocalStorageAdapter {
async fn exists(&self, uri: &str) -> Result<bool> {
Ok(local_path_from_uri(uri)?.exists())
}
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
let from = local_path_from_uri(from_uri)?;
let to = local_path_from_uri(to_uri)?;
tokio::fs::rename(&from, &to).await?;
Ok(())
}
async fn delete(&self, uri: &str) -> Result<()> {
let path = local_path_from_uri(uri)?;
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err.into()),
}
}
}
#[async_trait]
@ -104,6 +127,33 @@ impl StorageAdapter for S3StorageAdapter {
Err(err) => Err(storage_backend_error("exists", uri, err)),
}
}
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
// S3 has no atomic rename. Copy then delete; if the copy succeeds and
// the delete fails (or the process crashes between them), both
// source and destination exist with the same content. Recovery code
// must tolerate this case — see schema_state::recover_schema_state_files.
let from = self.object_path(from_uri)?;
let to = self.object_path(to_uri)?;
self.store
.copy(&from, &to)
.await
.map_err(|err| storage_backend_error("rename:copy", from_uri, err))?;
self.store
.delete(&from)
.await
.map_err(|err| storage_backend_error("rename:delete", from_uri, err))?;
Ok(())
}
async fn delete(&self, uri: &str) -> Result<()> {
let location = self.object_path(uri)?;
match self.store.delete(&location).await {
Ok(()) => Ok(()),
Err(object_store::Error::NotFound { .. }) => Ok(()),
Err(err) => Err(storage_backend_error("delete", uri, err)),
}
}
}
impl S3StorageAdapter {

View file

@ -6,7 +6,11 @@ use fail::FailScenario;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint;
use helpers::{MUTATION_QUERIES, mixed_params};
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main};
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
const SCHEMA_V2_ADDED_TYPE: &str =
"node Person { name: String @key }\nnode Company { name: String @key }\n";
#[tokio::test]
async fn branch_create_failpoint_triggers() {
@ -45,3 +49,108 @@ async fn graph_publish_failpoint_triggers_before_commit_append() {
.contains("injected failpoint triggered: graph_publish.before_commit_append")
);
}
// Atomic schema apply: schema apply writes staging files first, then commits
// the manifest, then renames staging → final. Tests below inject crashes at
// the two boundaries and assert that reopening the repo yields a consistent
// state.
#[tokio::test]
async fn schema_apply_recovers_pre_commit_crash() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
"got: {}",
err
);
}
// Reopen — recovery sweep should delete staging files and keep the
// original schema, since the manifest commit never happened.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source(), SCHEMA_V1);
assert_no_staging_files(dir.path());
}
#[tokio::test]
async fn schema_apply_recovers_post_commit_crash() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_manifest_commit"),
"got: {}",
err
);
}
// Reopen — manifest is at the new version, so recovery sweep should
// complete the rename and the live schema matches v2.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE);
assert_no_staging_files(dir.path());
}
#[tokio::test]
async fn schema_apply_recovers_partial_rename() {
// Construct a partial-rename state: _schema.pg has been renamed in
// (matching v2), but _schema.ir.json.staging and __schema_state.json.staging
// were never renamed. Recovery should detect that the live source matches
// the staging state's hash and complete the remaining renames.
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap();
}
// Simulate: one of the renames (the IR or state file) didn't complete by
// copying the live ir/state files back to their staging names.
std::fs::copy(
dir.path().join("_schema.ir.json"),
dir.path().join("_schema.ir.json.staging"),
)
.unwrap();
std::fs::copy(
dir.path().join("__schema_state.json"),
dir.path().join("__schema_state.json.staging"),
)
.unwrap();
// Reopen — recovery should complete the rename (overwriting final files
// with identical staging content) and remove the staging files.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE);
assert_no_staging_files(dir.path());
}
fn assert_no_staging_files(repo: &std::path::Path) {
for name in [
"_schema.pg.staging",
"_schema.ir.json.staging",
"__schema_state.json.staging",
] {
let path = repo.join(name);
assert!(
!path.exists(),
"staging file {} still exists after recovery",
path.display()
);
}
}