diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 14b5a83..f31cc4f 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -35,6 +35,7 @@ use publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; pub(crate) use recovery::{ delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar, RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, + SidecarTableRegistration, SidecarTombstone, }; use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at}; pub use state::SubTableEntry; diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 7f60fce..588042c 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -50,7 +50,7 @@ use crate::storage::StorageAdapter; use super::Snapshot; use super::publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; -use super::{ManifestChange, SubTableUpdate}; +use super::{ManifestChange, SubTableUpdate, TableRegistration, TableTombstone}; /// System actor identifier recorded on every recovery commit. Operators /// distinguish recovery commits from user commits in `omnigraph commit list` @@ -132,6 +132,39 @@ pub(crate) struct SidecarTablePin { pub table_branch: Option, } +/// New-table registration captured by SchemaApply sidecars so recovery +/// can publish a `ManifestChange::RegisterTable` for tables that the +/// writer was about to create. Without this, added tables exist as +/// orphan datasets on disk after recovery while the live `_schema.pg` +/// declares types the manifest doesn't know about — `snapshot.entry()` +/// returns None when the engine tries to read them. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct SidecarTableRegistration { + /// Stable identifier (`node:Tag`, `edge:WorksAt`, etc.). + pub table_key: String, + /// Repo-relative path the manifest will register + /// (e.g. `nodes/{fnv1a64-hex}`); recovery joins this with `root_uri` + /// to open the dataset Lance HEAD when constructing the + /// accompanying `Update`. + pub table_path: String, + /// Lance branch ref the dataset lives on (None for main / default). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub table_branch: Option, +} + +/// Tombstone metadata captured by SchemaApply sidecars so recovery can +/// publish a `ManifestChange::Tombstone` for tables the writer was +/// about to mark removed. Without this, tombstoned types stay visible +/// in the manifest snapshot after recovery even though the live +/// schema no longer declares them. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct SidecarTombstone { + pub table_key: String, + /// Manifest version at which this table was active before the + /// tombstone — required by the publisher's CAS pre-check. + pub tombstone_version: u64, +} + /// In-memory representation of the on-disk JSON sidecar. #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct RecoverySidecar { @@ -152,6 +185,21 @@ pub(crate) struct RecoverySidecar { /// kinds) carry `None` and recovery falls back to `append_commit`. #[serde(default, skip_serializing_if = "Option::is_none")] pub merge_source_commit_id: Option, + /// SchemaApply-only: tables the writer was about to register + /// (added types + renamed targets). Recovery emits a + /// `RegisterTable` + `Update` pair per entry so the manifest + /// catches up to the live schema's declared type set. + /// Backward-compat: empty / absent for older sidecars and + /// non-SchemaApply writers. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub additional_registrations: Vec, + /// SchemaApply-only: tables the writer was about to tombstone + /// (removed types + renamed sources). Recovery emits a + /// `ManifestChange::Tombstone` per entry. + /// Backward-compat: empty / absent for older sidecars and + /// non-SchemaApply writers. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub tombstones: Vec, } /// Opaque handle returned by [`write_sidecar`] so the caller can delete @@ -692,12 +740,15 @@ async fn process_sidecar( Phase C did not land)" ); let (new_manifest_version, published_versions) = - roll_forward_all(root_uri, sidecar).await?; + roll_forward_all(root_uri, sidecar, snapshot).await?; // `to_version` records the ACTUAL Lance HEAD published for // each table (not pin.post_commit_pin, which is a lower bound // for loose-match writers like SchemaApply / EnsureIndices / // BranchMerge that run multiple commit_staged calls per table). - let outcomes: Vec = sidecar + // SchemaApply additional_registrations are also included so + // operators reading the audit row see the full publish set, + // not just the pinned subset. + let mut outcomes: Vec = sidecar .tables .iter() .map(|pin| TableOutcome { @@ -709,6 +760,13 @@ async fn process_sidecar( .unwrap_or(pin.post_commit_pin), }) .collect(); + for reg in &sidecar.additional_registrations { + outcomes.push(TableOutcome { + table_key: reg.table_key.clone(), + from_version: 0, + to_version: published_versions.get(®.table_key).copied().unwrap_or(0), + }); + } record_audit( root_uri, sidecar, @@ -850,10 +908,14 @@ async fn record_audit_recovery_rollforward( async fn roll_forward_all( root_uri: &str, sidecar: &RecoverySidecar, + snapshot: &Snapshot, ) -> Result<(u64, HashMap)> { - let mut updates: Vec = Vec::with_capacity(sidecar.tables.len()); - let mut expected: HashMap = HashMap::with_capacity(sidecar.tables.len()); - let mut published_versions: HashMap = HashMap::with_capacity(sidecar.tables.len()); + let total_changes = + sidecar.tables.len() + sidecar.additional_registrations.len() + sidecar.tombstones.len(); + let mut updates: Vec = Vec::with_capacity(total_changes); + let mut expected: HashMap = HashMap::with_capacity(total_changes); + let mut published_versions: HashMap = + HashMap::with_capacity(sidecar.tables.len() + sidecar.additional_registrations.len()); for pin in &sidecar.tables { // Open the dataset at its CURRENT Lance HEAD on the pin's branch @@ -897,6 +959,88 @@ async fn roll_forward_all( published_versions.insert(pin.table_key.clone(), head_version); } + // SchemaApply-only: register added tables (and renamed targets) and + // emit accompanying Update entries so recovery's manifest commit + // matches what the writer would have published. Without this, added + // tables exist as orphan datasets on disk but never receive a + // manifest entry, leaving the live schema and manifest mismatched. + // + // Filtered against `snapshot`: when the manifest already has a live + // entry for `reg.table_key`, a previous recovery (or the writer + // itself, before crashing in Phase D) has already published the + // registration — skip it to avoid the publisher's ExpectedVersionMismatch + // (expected=0, actual=current_version) on the redundant Register. + for reg in &sidecar.additional_registrations { + if snapshot.entry(®.table_key).is_some() { + // Already registered — record the current version in + // published_versions so the audit row's `to_version` reflects + // reality, but emit no manifest change. + if let Some(entry) = snapshot.entry(®.table_key) { + published_versions.insert(reg.table_key.clone(), entry.table_version); + } + continue; + } + let dataset_uri = format!("{}/{}", root_uri.trim_end_matches('/'), reg.table_path); + let head_ds = Dataset::open(&dataset_uri) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let head_ds = match reg.table_branch.as_deref() { + Some(b) if b != "main" => head_ds + .checkout_branch(b) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?, + _ => head_ds, + }; + let head_version = head_ds.version().version; + let row_count = head_ds + .count_rows(None) + .await + .map_err(|e| OmniError::Lance(e.to_string()))? as u64; + let version_metadata = super::metadata::TableVersionMetadata::from_dataset( + root_uri, + ®.table_path, + &head_ds, + )?; + + updates.push(ManifestChange::RegisterTable(TableRegistration { + table_key: reg.table_key.clone(), + table_path: reg.table_path.clone(), + })); + updates.push(ManifestChange::Update(SubTableUpdate { + table_key: reg.table_key.clone(), + table_version: head_version, + table_branch: reg.table_branch.clone(), + row_count, + version_metadata, + })); + // No prior manifest entry expected for an added table. + expected.insert(reg.table_key.clone(), 0); + published_versions.insert(reg.table_key.clone(), head_version); + } + + // SchemaApply-only: tombstone removed types (and renamed sources). + // + // Filtered against `snapshot`: when the manifest no longer has an + // entry for `tomb.table_key`, the tombstone has already landed in + // a prior recovery / the writer's Phase C — skip emit so the + // publisher doesn't error on a redundant tombstone. + for tomb in &sidecar.tombstones { + if snapshot.entry(&tomb.table_key).is_none() { + continue; + } + updates.push(ManifestChange::Tombstone(TableTombstone { + table_key: tomb.table_key.clone(), + tombstone_version: tomb.tombstone_version, + })); + // Tombstone CAS pre-check expects the table to be at + // `tombstone_version - 1` (the pre-tombstone version, since + // schema_apply sets `tombstone_version = source.table_version + 1`). + expected.insert( + tomb.table_key.clone(), + tomb.tombstone_version.saturating_sub(1), + ); + } + let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref()); let new_dataset = publisher.publish(&updates, &expected).await?; Ok((new_dataset.version().version, published_versions)) @@ -1038,6 +1182,8 @@ pub(crate) fn new_sidecar( writer_kind, tables, merge_source_commit_id: None, + additional_registrations: Vec::new(), + tombstones: Vec::new(), } } diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 18b9219..e35258c 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -174,7 +174,45 @@ pub(super) async fn apply_schema_with_lock( }) }) .collect(); - let recovery_handle = if recovery_pins.is_empty() { + // Capture additional registrations + tombstones for the sidecar so + // recovery can publish them alongside the per-table updates. Without + // this, an added type's dataset is created in Phase B but the + // manifest never gains an entry for it after roll-forward — the + // live `_schema.pg` declares a type the manifest doesn't know about + // and reads through the engine fail with "no manifest entry for X". + let mut sidecar_registrations: Vec = Vec::new(); + for table_key in &added_tables { + sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration { + table_key: table_key.clone(), + table_path: table_path_for_table_key(table_key)?, + table_branch: None, + }); + } + for target_table_key in renamed_tables.keys() { + sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration { + table_key: target_table_key.clone(), + table_path: table_path_for_table_key(target_table_key)?, + table_branch: None, + }); + } + let mut sidecar_tombstones: Vec = Vec::new(); + for source_table_key in renamed_tables.values() { + let source_entry = snapshot.entry(source_table_key).ok_or_else(|| { + OmniError::manifest(format!( + "missing source table '{}' for schema rename when building recovery sidecar", + source_table_key + )) + })?; + sidecar_tombstones.push(crate::db::manifest::SidecarTombstone { + table_key: source_table_key.clone(), + tombstone_version: source_entry.table_version.saturating_add(1), + }); + } + + let recovery_handle = if recovery_pins.is_empty() + && sidecar_registrations.is_empty() + && sidecar_tombstones.is_empty() + { None } else { // `branch=None` because schema_apply publishes against main — @@ -184,12 +222,14 @@ pub(super) async fn apply_schema_with_lock( // the coordinator's active branch, which is the pre-lock branch). // If the lock release fires before recovery, the lock branch is // gone — the sidecar must not reference it. - let sidecar = crate::db::manifest::new_sidecar( + let mut sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::SchemaApply, None, db.audit_actor_id.clone(), recovery_pins, ); + sidecar.additional_registrations = sidecar_registrations; + sidecar.tombstones = sidecar_tombstones; Some( crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar) .await?, diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index d063169..8366b9f 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -94,7 +94,7 @@ async fn graph_publish_failpoint_triggers_before_commit_append() { // state. #[tokio::test] -async fn schema_apply_recovers_pre_commit_crash() { +async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); @@ -111,11 +111,29 @@ async fn schema_apply_recovers_pre_commit_crash() { ); } - // Reopen — recovery sweep should delete staging files and keep the - // original schema, since the manifest commit never happened. + // Reopen. With the sidecar protocol, a Phase B → Phase C crash + // (per-table commit_staged done; manifest publish not yet) is + // recoverable: the sidecar's `additional_registrations` carries the + // intent to register `node:Company`, schema-state recovery promotes + // the staging files, and the manifest-drift sweep publishes the + // RegisterTable + Update so the manifest catches up to the schema + // the writer already declared. The orphan-dataset-on-disk-with-no- + // manifest-entry corruption that pre-this-protocol recoveries left + // behind is closed. let db = Omnigraph::open(&uri).await.unwrap(); - assert_eq!(db.schema_source(), SCHEMA_V1); + assert_eq!( + db.schema_source(), + SCHEMA_V2_ADDED_TYPE, + "live schema must reflect the rolled-forward apply (Company added)" + ); assert_no_staging_files(dir.path()); + // node:Company must be registered in the manifest (queryable); + // pre-protocol recoveries left it as an orphan dataset on disk. + let company_rows = helpers::count_rows(&db, "node:Company").await; + assert_eq!( + company_rows, 0, + "node:Company must have a manifest entry post-recovery" + ); } #[tokio::test] @@ -1102,6 +1120,21 @@ edge WorksAt: Person -> Company live_schema.contains("node Tag"), "_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}", ); + + // Catalog ↔ manifest agreement: the new `node:Tag` type the schema + // declares must have a manifest entry the engine can read against. + // Without registrations / tombstones in the sidecar, recovery's + // `roll_forward_all` only publishes Updates for rewritten tables; + // added tables (Tag) end up as orphan datasets on disk with no + // manifest entry, and the live schema declares a type the manifest + // doesn't know about. + let db = Omnigraph::open(&uri).await.unwrap(); + let tag_rows = helpers::count_rows(&db, "node:Tag").await; + assert_eq!( + tag_rows, 0, + "node:Tag must have a manifest entry (with 0 rows) post-recovery; \ + a panic here means recovery failed to register the added table" + ); } #[tokio::test]