mr-668: close init re-init footgun via InitOptions preflight (green)

`Omnigraph::init` is "create a new graph"; existing graphs need
an explicit overwrite. Today's behavior — silently overwrite
schema files, then on inner failure delete them via best-effort
cleanup — is destructive against an existing graph regardless of
which branch fires.

Correct-by-design fix:

* New `InitOptions { force: bool }` struct (default `force: false`).
* New `Omnigraph::init_with_options(uri, schema, options)`. The
  old `Omnigraph::init(uri, schema)` is a thin shortcut that
  passes `InitOptions::default()`.
* `init_with_storage` runs a `storage.exists()` preflight on the
  three schema URIs BEFORE any parse, write, or coordinator call.
  Any hit → typed `OmniError::AlreadyInitialized { uri }`. The
  destructive code paths (the `write_text` overwrite and the
  best-effort cleanup) are now unreachable in strict mode against
  an existing graph.
* `force: true` skips the preflight; existing operators who
  actually mean to overwrite opt in explicitly.
* CLI: `omnigraph init --force` maps to `InitOptions { force: true }`.
* HTTP: `OmniError::AlreadyInitialized` maps to 409 via
  `ApiError::from_omni`. Not currently HTTP-reachable (POST /graphs
  was pulled), but the wiring lands here so a future runtime
  create endpoint has one canonical translation.

Closes the "init is destructive against existing state" class.
The regression test added in the previous commit
(`init_on_existing_graph_uri_does_not_destroy_existing_schema`)
turns green: the original schema files now survive a second
init attempt byte-for-byte, and the call errors cleanly with
`AlreadyInitialized`. The four existing
`init_failpoint_after_*_cleans_up_*` tests stay green — strict
mode's preflight passes on a fresh tempdir, and cleanup still
runs as before when a failpoint fires mid-write.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-27 13:24:49 +02:00
parent 094e868be6
commit 67a46528ef
No known key found for this signature in database
5 changed files with 98 additions and 9 deletions

View file

@ -73,6 +73,13 @@ enum Command {
schema: PathBuf,
/// Graph URI (local path or s3://)
uri: String,
/// Overwrite existing schema artifacts at the URI. Without
/// this flag, init refuses to touch a URI that already holds
/// `_schema.pg`, `_schema.ir.json`, or `__schema_state.json`
/// — closes the re-init footgun (MR-668 follow-up). With the
/// flag, the operator opts in to destructive semantics.
#[arg(long)]
force: bool,
},
/// Load data into a graph
Load {
@ -1746,10 +1753,15 @@ async fn main() -> Result<()> {
print_embed_human(&output);
}
}
Command::Init { schema, uri } => {
Command::Init { schema, uri, force } => {
let schema_source = fs::read_to_string(&schema)?;
ensure_local_graph_parent(&uri)?;
Omnigraph::init(&uri, &schema_source).await?;
Omnigraph::init_with_options(
&uri,
&schema_source,
omnigraph::db::InitOptions { force },
)
.await?;
scaffold_config_if_missing(&uri)?;
println!("initialized {}", uri);
}

View file

@ -684,6 +684,12 @@ impl ApiError {
// engine gate fires, the bearer is valid — any failure from
// the engine is a policy outcome, not an auth one.
OmniError::Policy(message) => Self::forbidden(message),
// `Omnigraph::init` against an existing graph URI in strict
// mode. Not currently HTTP-reachable (POST /graphs was
// pulled), but mapping is wired so the variant has a
// single canonical translation when a future runtime
// create endpoint lands.
err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
}
}
}

View file

@ -11,7 +11,7 @@ pub use commit_graph::GraphCommit;
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
pub use omnigraph::{
CleanupPolicyOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions,
CleanupPolicyOptions, InitOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyOptions,
SchemaApplyResult, TableCleanupStats, TableOptimizeStats,
};
pub(crate) use omnigraph::ensure_public_branch_ref;

View file

@ -165,29 +165,92 @@ pub enum OpenMode {
ReadOnly,
}
/// Options for [`Omnigraph::init_with_options`].
///
/// `force` controls the safety preflight that prevents an
/// accidental re-init from overwriting an existing graph's schema
/// metadata. Default behavior (`force: false`) fails fast with
/// [`OmniError::AlreadyInitialized`] if any of `_schema.pg`,
/// `_schema.ir.json`, or `__schema_state.json` already exists at
/// the target URI. With `force: true` the preflight is skipped —
/// existing schema files are overwritten in place. Force does NOT
/// purge old Lance datasets or `__manifest/`; reclaiming those
/// still requires deleting the graph directory by hand (or via a
/// future `DELETE /graphs/{id}`).
#[derive(Debug, Clone, Copy, Default)]
pub struct InitOptions {
/// Skip the existing-graph preflight. Operators set this when
/// they actually mean to overwrite — e.g. `omnigraph init --force`.
pub force: bool,
}
impl Omnigraph {
/// Create a new graph at `uri` from schema source.
///
/// Creates `_schema.pg`, per-type Lance datasets, and `__manifest`.
/// Strict mode: errors with [`OmniError::AlreadyInitialized`] if
/// `uri` already holds any of the three schema artifacts. To
/// overwrite an existing graph deliberately, call
/// [`Self::init_with_options`] with `InitOptions { force: true }`.
pub async fn init(uri: &str, schema_source: &str) -> Result<Self> {
Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?).await
Self::init_with_options(uri, schema_source, InitOptions::default()).await
}
/// Create a new graph at `uri`, with explicit init-time options.
///
/// See [`InitOptions`] for the safety contract — by default this
/// behaves identically to [`Self::init`].
pub async fn init_with_options(
uri: &str,
schema_source: &str,
options: InitOptions,
) -> Result<Self> {
Self::init_with_storage(uri, schema_source, storage_for_uri(uri)?, options).await
}
pub(crate) async fn init_with_storage(
uri: &str,
schema_source: &str,
storage: Arc<dyn StorageAdapter>,
options: InitOptions,
) -> Result<Self> {
let root = normalize_root_uri(uri)?;
// Preflight: refuse to clobber an existing graph unless the
// operator passed `force`. This runs BEFORE any parse or
// write so a misdirected `init` against an existing graph
// URI cannot reach a code path that overwrites or, on a
// later cleanup, deletes the schema files.
//
// Closes the "init is destructive against existing state"
// class: there is no longer a code path where strict-mode
// `init` can mutate a populated graph root.
if !options.force {
for candidate in [
schema_source_uri(&root),
schema_ir_uri(&root),
schema_state_uri(&root),
] {
if storage.exists(&candidate).await? {
return Err(OmniError::AlreadyInitialized {
uri: root.clone(),
});
}
}
}
let schema_ir = read_schema_ir_from_source(schema_source)?;
let mut catalog = build_catalog_from_ir(&schema_ir)?;
fixup_blob_schemas(&mut catalog);
// Run the I/O phase. On any error, best-effort-clean the schema
// artifacts that may have been written to disk before returning
// the original error. The cleanup is best-effort: a failure to
// delete is logged via `tracing::warn` but does not mask the
// original init error.
// the original error. The cleanup is safe in strict mode because
// the preflight above guarantees the three schema URIs did NOT
// exist before this call, so any file there afterward is ours
// to delete. In `force` mode the operator opted in to overwrite
// semantics, so cleanup-on-failure of force re-inits may delete
// schema files that were present pre-call — that's part of the
// force contract.
//
// Coverage gap: Lance per-type datasets and `__manifest/`
// directory created by `GraphCoordinator::init` are NOT cleaned
@ -1852,7 +1915,7 @@ edge WorksAt: Person -> Company
let uri = dir.path().to_str().unwrap();
let adapter = Arc::new(RecordingStorageAdapter::default());
Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone())
Omnigraph::init_with_storage(uri, TEST_SCHEMA, adapter.clone(), InitOptions::default())
.await
.unwrap();
assert!(adapter.writes().contains(&join_uri(uri, "_schema.pg")));

View file

@ -92,6 +92,14 @@ pub enum OmniError {
/// callers can match on this variant directly.
#[error("policy: {0}")]
Policy(String),
/// `Omnigraph::init` was called against a URI that already holds
/// schema artifacts from a previous init. Strict mode (the default)
/// fails fast with this error before touching disk so an existing
/// graph's metadata cannot be overwritten or destroyed. Operators
/// who actually want to overwrite pass `InitOptions { force: true }`
/// (CLI: `omnigraph init --force`).
#[error("graph already initialized at '{uri}'; pass --force to overwrite")]
AlreadyInitialized { uri: String },
}
impl OmniError {