recovery: register added tables + tombstones in SchemaApply roll-forward

Cursor flagged that SchemaApply sidecars only captured `Update` pins
(via `snapshot.entry()?` in schema_apply.rs:166), so recovery's
`roll_forward_all` only published `ManifestChange::Update` for the
rewritten/indexed tables. Added types (`added_tables`) and tombstones
(`renamed_tables` sources) were silently dropped during recovery.

Reproducer: in `schema_apply_phase_b_failure_recovered_on_next_open`,
the v2 schema added a `Tag` node type. Pre-fix, `node:Tag` ended up as
an orphan dataset on disk while the manifest never received a
`RegisterTable` entry — the live `_schema.pg` declared a type the
manifest didn't know about, and `count_rows(node:Tag)` panicked with
`no manifest entry for node:Tag`. The existing test passed only
because it never queried Tag.

Fix:
1. Extend `RecoverySidecar` with `additional_registrations` and
   `tombstones` fields (optional, serde-default for backward compat
   with existing on-disk sidecars). Both are SchemaApply-only.
2. Populate them in `apply_schema_with_lock` from the migration plan's
   upfront diff (`added_tables` + `renamed_tables` keys for
   registrations; `renamed_tables` values for tombstones, version-
   pinned at `source_entry.table_version + 1`).
3. Update `roll_forward_all` to:
   - emit `RegisterTable` + `Update` for each `additional_registrations`
     entry (read the dataset's current Lance HEAD for the version
     metadata + row_count)
   - emit `Tombstone` for each `tombstones` entry
   - filter against `snapshot` so previously-published registrations /
     tombstones are skipped (handles the post-Phase-C-success-but-
     sidecar-not-yet-deleted case — without filtering, the publisher's
     CAS pre-check would error with `expected=0, actual=N` on the
     redundant Register)
4. Extend the audit-row outcomes to include published registrations.

Test changes:
- `schema_apply_phase_b_failure_recovered_on_next_open` now asserts
  `count_rows(node:Tag) == 0` (no panic), proving the new manifest
  entry exists.
- `schema_apply_recovers_pre_commit_crash` renamed to
  `schema_apply_pre_commit_crash_rolls_forward_via_sidecar` and
  rewritten — pre-fix it expected pre-commit crashes to roll BACK
  (delete staging, keep V1, leave Company as orphan); the sidecar
  protocol's "complete the writer's intent" semantic now rolls
  FORWARD (rename staging -> final, register Company atomically). The
  new assertions verify schema = V2 and `node:Company` is queryable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-05 22:15:50 +02:00
parent 3ea7a1fd50
commit fb0f024652
No known key found for this signature in database
4 changed files with 232 additions and 12 deletions

View file

@ -35,6 +35,7 @@ use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar,
RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
SidecarTableRegistration, SidecarTombstone,
};
use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
pub use state::SubTableEntry;

View file

@ -50,7 +50,7 @@ use crate::storage::StorageAdapter;
use super::Snapshot;
use super::publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
use super::{ManifestChange, SubTableUpdate};
use super::{ManifestChange, SubTableUpdate, TableRegistration, TableTombstone};
/// System actor identifier recorded on every recovery commit. Operators
/// distinguish recovery commits from user commits in `omnigraph commit list`
@ -132,6 +132,39 @@ pub(crate) struct SidecarTablePin {
pub table_branch: Option<String>,
}
/// New-table registration captured by SchemaApply sidecars so recovery
/// can publish a `ManifestChange::RegisterTable` for tables that the
/// writer was about to create. Without this, added tables exist as
/// orphan datasets on disk after recovery while the live `_schema.pg`
/// declares types the manifest doesn't know about — `snapshot.entry()`
/// returns None when the engine tries to read them.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SidecarTableRegistration {
/// Stable identifier (`node:Tag`, `edge:WorksAt`, etc.).
pub table_key: String,
/// Repo-relative path the manifest will register
/// (e.g. `nodes/{fnv1a64-hex}`); recovery joins this with `root_uri`
/// to open the dataset Lance HEAD when constructing the
/// accompanying `Update`.
pub table_path: String,
/// Lance branch ref the dataset lives on (None for main / default).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub table_branch: Option<String>,
}
/// Tombstone metadata captured by SchemaApply sidecars so recovery can
/// publish a `ManifestChange::Tombstone` for tables the writer was
/// about to mark removed. Without this, tombstoned types stay visible
/// in the manifest snapshot after recovery even though the live
/// schema no longer declares them.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SidecarTombstone {
pub table_key: String,
/// Manifest version at which this table was active before the
/// tombstone — required by the publisher's CAS pre-check.
pub tombstone_version: u64,
}
/// In-memory representation of the on-disk JSON sidecar.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RecoverySidecar {
@ -152,6 +185,21 @@ pub(crate) struct RecoverySidecar {
/// kinds) carry `None` and recovery falls back to `append_commit`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub merge_source_commit_id: Option<String>,
/// SchemaApply-only: tables the writer was about to register
/// (added types + renamed targets). Recovery emits a
/// `RegisterTable` + `Update` pair per entry so the manifest
/// catches up to the live schema's declared type set.
/// Backward-compat: empty / absent for older sidecars and
/// non-SchemaApply writers.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub additional_registrations: Vec<SidecarTableRegistration>,
/// SchemaApply-only: tables the writer was about to tombstone
/// (removed types + renamed sources). Recovery emits a
/// `ManifestChange::Tombstone` per entry.
/// Backward-compat: empty / absent for older sidecars and
/// non-SchemaApply writers.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tombstones: Vec<SidecarTombstone>,
}
/// Opaque handle returned by [`write_sidecar`] so the caller can delete
@ -692,12 +740,15 @@ async fn process_sidecar(
Phase C did not land)"
);
let (new_manifest_version, published_versions) =
roll_forward_all(root_uri, sidecar).await?;
roll_forward_all(root_uri, sidecar, snapshot).await?;
// `to_version` records the ACTUAL Lance HEAD published for
// each table (not pin.post_commit_pin, which is a lower bound
// for loose-match writers like SchemaApply / EnsureIndices /
// BranchMerge that run multiple commit_staged calls per table).
let outcomes: Vec<TableOutcome> = sidecar
// SchemaApply additional_registrations are also included so
// operators reading the audit row see the full publish set,
// not just the pinned subset.
let mut outcomes: Vec<TableOutcome> = sidecar
.tables
.iter()
.map(|pin| TableOutcome {
@ -709,6 +760,13 @@ async fn process_sidecar(
.unwrap_or(pin.post_commit_pin),
})
.collect();
for reg in &sidecar.additional_registrations {
outcomes.push(TableOutcome {
table_key: reg.table_key.clone(),
from_version: 0,
to_version: published_versions.get(&reg.table_key).copied().unwrap_or(0),
});
}
record_audit(
root_uri,
sidecar,
@ -850,10 +908,14 @@ async fn record_audit_recovery_rollforward(
async fn roll_forward_all(
root_uri: &str,
sidecar: &RecoverySidecar,
snapshot: &Snapshot,
) -> Result<(u64, HashMap<String, u64>)> {
let mut updates: Vec<ManifestChange> = Vec::with_capacity(sidecar.tables.len());
let mut expected: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
let mut published_versions: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
let total_changes =
sidecar.tables.len() + sidecar.additional_registrations.len() + sidecar.tombstones.len();
let mut updates: Vec<ManifestChange> = Vec::with_capacity(total_changes);
let mut expected: HashMap<String, u64> = HashMap::with_capacity(total_changes);
let mut published_versions: HashMap<String, u64> =
HashMap::with_capacity(sidecar.tables.len() + sidecar.additional_registrations.len());
for pin in &sidecar.tables {
// Open the dataset at its CURRENT Lance HEAD on the pin's branch
@ -897,6 +959,88 @@ async fn roll_forward_all(
published_versions.insert(pin.table_key.clone(), head_version);
}
// SchemaApply-only: register added tables (and renamed targets) and
// emit accompanying Update entries so recovery's manifest commit
// matches what the writer would have published. Without this, added
// tables exist as orphan datasets on disk but never receive a
// manifest entry, leaving the live schema and manifest mismatched.
//
// Filtered against `snapshot`: when the manifest already has a live
// entry for `reg.table_key`, a previous recovery (or the writer
// itself, before crashing in Phase D) has already published the
// registration — skip it to avoid the publisher's ExpectedVersionMismatch
// (expected=0, actual=current_version) on the redundant Register.
for reg in &sidecar.additional_registrations {
if snapshot.entry(&reg.table_key).is_some() {
// Already registered — record the current version in
// published_versions so the audit row's `to_version` reflects
// reality, but emit no manifest change.
if let Some(entry) = snapshot.entry(&reg.table_key) {
published_versions.insert(reg.table_key.clone(), entry.table_version);
}
continue;
}
let dataset_uri = format!("{}/{}", root_uri.trim_end_matches('/'), reg.table_path);
let head_ds = Dataset::open(&dataset_uri)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_ds = match reg.table_branch.as_deref() {
Some(b) if b != "main" => head_ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head_ds,
};
let head_version = head_ds.version().version;
let row_count = head_ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let version_metadata = super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&reg.table_path,
&head_ds,
)?;
updates.push(ManifestChange::RegisterTable(TableRegistration {
table_key: reg.table_key.clone(),
table_path: reg.table_path.clone(),
}));
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: reg.table_key.clone(),
table_version: head_version,
table_branch: reg.table_branch.clone(),
row_count,
version_metadata,
}));
// No prior manifest entry expected for an added table.
expected.insert(reg.table_key.clone(), 0);
published_versions.insert(reg.table_key.clone(), head_version);
}
// SchemaApply-only: tombstone removed types (and renamed sources).
//
// Filtered against `snapshot`: when the manifest no longer has an
// entry for `tomb.table_key`, the tombstone has already landed in
// a prior recovery / the writer's Phase C — skip emit so the
// publisher doesn't error on a redundant tombstone.
for tomb in &sidecar.tombstones {
if snapshot.entry(&tomb.table_key).is_none() {
continue;
}
updates.push(ManifestChange::Tombstone(TableTombstone {
table_key: tomb.table_key.clone(),
tombstone_version: tomb.tombstone_version,
}));
// Tombstone CAS pre-check expects the table to be at
// `tombstone_version - 1` (the pre-tombstone version, since
// schema_apply sets `tombstone_version = source.table_version + 1`).
expected.insert(
tomb.table_key.clone(),
tomb.tombstone_version.saturating_sub(1),
);
}
let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref());
let new_dataset = publisher.publish(&updates, &expected).await?;
Ok((new_dataset.version().version, published_versions))
@ -1038,6 +1182,8 @@ pub(crate) fn new_sidecar(
writer_kind,
tables,
merge_source_commit_id: None,
additional_registrations: Vec::new(),
tombstones: Vec::new(),
}
}

View file

@ -174,7 +174,45 @@ pub(super) async fn apply_schema_with_lock(
})
})
.collect();
let recovery_handle = if recovery_pins.is_empty() {
// Capture additional registrations + tombstones for the sidecar so
// recovery can publish them alongside the per-table updates. Without
// this, an added type's dataset is created in Phase B but the
// manifest never gains an entry for it after roll-forward — the
// live `_schema.pg` declares a type the manifest doesn't know about
// and reads through the engine fail with "no manifest entry for X".
let mut sidecar_registrations: Vec<crate::db::manifest::SidecarTableRegistration> = Vec::new();
for table_key in &added_tables {
sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
table_key: table_key.clone(),
table_path: table_path_for_table_key(table_key)?,
table_branch: None,
});
}
for target_table_key in renamed_tables.keys() {
sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
table_key: target_table_key.clone(),
table_path: table_path_for_table_key(target_table_key)?,
table_branch: None,
});
}
let mut sidecar_tombstones: Vec<crate::db::manifest::SidecarTombstone> = Vec::new();
for source_table_key in renamed_tables.values() {
let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing source table '{}' for schema rename when building recovery sidecar",
source_table_key
))
})?;
sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
table_key: source_table_key.clone(),
tombstone_version: source_entry.table_version.saturating_add(1),
});
}
let recovery_handle = if recovery_pins.is_empty()
&& sidecar_registrations.is_empty()
&& sidecar_tombstones.is_empty()
{
None
} else {
// `branch=None` because schema_apply publishes against main —
@ -184,12 +222,14 @@ pub(super) async fn apply_schema_with_lock(
// the coordinator's active branch, which is the pre-lock branch).
// If the lock release fires before recovery, the lock branch is
// gone — the sidecar must not reference it.
let sidecar = crate::db::manifest::new_sidecar(
let mut sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::SchemaApply,
None,
db.audit_actor_id.clone(),
recovery_pins,
);
sidecar.additional_registrations = sidecar_registrations;
sidecar.tombstones = sidecar_tombstones;
Some(
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
.await?,

View file

@ -94,7 +94,7 @@ async fn graph_publish_failpoint_triggers_before_commit_append() {
// state.
#[tokio::test]
async fn schema_apply_recovers_pre_commit_crash() {
async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
@ -111,11 +111,29 @@ async fn schema_apply_recovers_pre_commit_crash() {
);
}
// Reopen — recovery sweep should delete staging files and keep the
// original schema, since the manifest commit never happened.
// Reopen. With the sidecar protocol, a Phase B → Phase C crash
// (per-table commit_staged done; manifest publish not yet) is
// recoverable: the sidecar's `additional_registrations` carries the
// intent to register `node:Company`, schema-state recovery promotes
// the staging files, and the manifest-drift sweep publishes the
// RegisterTable + Update so the manifest catches up to the schema
// the writer already declared. The orphan-dataset-on-disk-with-no-
// manifest-entry corruption that pre-this-protocol recoveries left
// behind is closed.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source(), SCHEMA_V1);
assert_eq!(
db.schema_source(),
SCHEMA_V2_ADDED_TYPE,
"live schema must reflect the rolled-forward apply (Company added)"
);
assert_no_staging_files(dir.path());
// node:Company must be registered in the manifest (queryable);
// pre-protocol recoveries left it as an orphan dataset on disk.
let company_rows = helpers::count_rows(&db, "node:Company").await;
assert_eq!(
company_rows, 0,
"node:Company must have a manifest entry post-recovery"
);
}
#[tokio::test]
@ -1102,6 +1120,21 @@ edge WorksAt: Person -> Company
live_schema.contains("node Tag"),
"_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}",
);
// Catalog ↔ manifest agreement: the new `node:Tag` type the schema
// declares must have a manifest entry the engine can read against.
// Without registrations / tombstones in the sidecar, recovery's
// `roll_forward_all` only publishes Updates for rewritten tables;
// added tables (Tag) end up as orphan datasets on disk with no
// manifest entry, and the live schema declares a type the manifest
// doesn't know about.
let db = Omnigraph::open(&uri).await.unwrap();
let tag_rows = helpers::count_rows(&db, "node:Tag").await;
assert_eq!(
tag_rows, 0,
"node:Tag must have a manifest entry (with 0 rows) post-recovery; \
a panic here means recovery failed to register the added table"
);
}
#[tokio::test]