Merge pull request #72 from ModernRelay/ragnorc/mr847-recovery-reconciler

Recovery-on-open reconciler
This commit is contained in:
Ragnor Comerford 2026-05-06 11:59:09 +02:00 committed by GitHub
commit 028b913d9a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 7490 additions and 192 deletions

View file

@ -198,7 +198,7 @@ omnigraph policy explain --actor act-alice --action change --branch main
| Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing |
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) recovery-on-open reconciler for the residual gap between (1) and (2). Layer (3) is **not yet shipped** — tracked in MR-847. Until MR-847 lands, a failure between per-table `commit_staged` and the manifest publish leaves drift (the documented "Phase B → Phase C residual" — see [docs/runs.md](docs/runs.md)). Engine writes route through a sealed `TableStorage` trait (MR-793) exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index` pending Phase 9) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and call-site conversion (Phase 1b) completes. **Do not describe atomicity as "fully upheld" until MR-847 ships.** |
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore`, and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). Continuous in-process recovery (no restart needed between Phase B failure and recovery) is the goal of a future background reconciler. Engine writes route through a sealed `TableStorage` trait exposing `stage_*` + `commit_staged` as the canonical staged-write surface; documented inline-commit residuals (`delete_where`, `create_vector_index`, plus legacy `append_batch` / `merge_insert_batches` / `overwrite_batch` / `create_*_index`) remain on the trait until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)) and the migration of every call site completes. |
| Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency |
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
| BTREE / inverted (FTS) / vector indexes | ✅ | `ensure_indices` builds them on every relevant column; idempotent; lazy across branches |

View file

@ -16,6 +16,8 @@ mod migrations;
mod namespace;
#[path = "manifest/publisher.rs"]
mod publisher;
#[path = "manifest/recovery.rs"]
mod recovery;
#[path = "manifest/repo.rs"]
mod repo;
#[path = "manifest/state.rs"]
@ -30,6 +32,11 @@ pub(crate) use namespace::open_table_head_for_write;
#[cfg(test)]
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
delete_sidecar, has_schema_apply_sidecar, new_sidecar, recover_manifest_drift, write_sidecar,
RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
SidecarTableRegistration, SidecarTombstone,
};
use repo::{init_manifest_repo, open_manifest_repo, snapshot_state_at};
pub use state::SubTableEntry;
#[cfg(test)]

File diff suppressed because it is too large Load diff

View file

@ -2,6 +2,7 @@ pub mod commit_graph;
pub mod graph_coordinator;
pub mod manifest;
mod omnigraph;
mod recovery_audit;
mod run_registry;
mod schema_state;
@ -9,8 +10,8 @@ pub use commit_graph::GraphCommit;
pub use graph_coordinator::{GraphCoordinator, ReadTarget, ResolvedTarget, SnapshotId};
pub use manifest::{Snapshot, SubTableEntry, SubTableUpdate};
pub use omnigraph::{
CleanupPolicyOptions, MergeOutcome, Omnigraph, SchemaApplyResult, TableCleanupStats,
TableOptimizeStats,
CleanupPolicyOptions, MergeOutcome, Omnigraph, OpenMode, SchemaApplyResult,
TableCleanupStats, TableOptimizeStats,
};
pub(crate) use omnigraph::ensure_public_branch_ref;
pub(crate) use run_registry::is_internal_run_branch;

View file

@ -81,6 +81,24 @@ pub struct Omnigraph {
pub(crate) audit_actor_id: Option<String>,
}
/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
///
/// Recovery requires Lance writes (`Dataset::restore`, `ManifestBatchPublisher::publish`).
/// Read-only consumers — NDJSON export, `commit list`, `read`, schema
/// inspection — should not trigger writes (they may run with read-only
/// object-store credentials, and silent open-time mutations are
/// surprising). They also don't need recovery: reads always resolve
/// through the manifest pin, which is the consistent snapshot regardless
/// of any Phase B → Phase C drift on the per-table side.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OpenMode {
/// Run the recovery sweep on open. Default for `Omnigraph::open`.
ReadWrite,
/// Skip the recovery sweep. Use for read-only consumers via
/// [`Omnigraph::open_read_only`].
ReadOnly,
}
impl Omnigraph {
/// Create a new repo at `uri` from schema source.
///
@ -119,24 +137,65 @@ impl Omnigraph {
})
}
/// Open an existing repo.
/// Open an existing repo (read-write).
///
/// Reads `_schema.pg`, parses it, builds the catalog, and opens `__manifest`.
/// Runs the open-time recovery sweep before returning — see [`OpenMode`].
pub async fn open(uri: &str) -> Result<Self> {
Self::open_with_storage(uri, storage_for_uri(uri)?).await
Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadWrite).await
}
/// Open an existing repo for read-only consumers (NDJSON export,
/// `commit list`, etc.). Skips the recovery sweep — see [`OpenMode`].
pub async fn open_read_only(uri: &str) -> Result<Self> {
Self::open_with_storage_and_mode(uri, storage_for_uri(uri)?, OpenMode::ReadOnly).await
}
/// `open_with_storage` retained for existing callers (init/test paths).
/// Defaults to `OpenMode::ReadWrite`.
pub(crate) async fn open_with_storage(
uri: &str,
storage: Arc<dyn StorageAdapter>,
) -> Result<Self> {
Self::open_with_storage_and_mode(uri, storage, OpenMode::ReadWrite).await
}
pub(crate) async fn open_with_storage_and_mode(
uri: &str,
storage: Arc<dyn StorageAdapter>,
mode: OpenMode,
) -> Result<Self> {
let root = normalize_root_uri(uri)?;
// Open the coordinator first so the schema-staging recovery sweep can
// compare its snapshot against any leftover staging files. Recovery
// either deletes staging (pre-commit crash) or completes the rename
// (post-commit crash) before the live schema files are read.
let coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?;
// compare its snapshot against any leftover staging files.
let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
// Both the schema-state recovery sweep AND the manifest-drift
// recovery sweep are gated on `OpenMode::ReadWrite`. Read-only
// consumers (NDJSON export, `commit list`, schema show) shouldn't
// trigger object-store mutations: they may run with read-only
// credentials, and silent open-time writes are surprising. Both
// sweeps' work is recoverable on the next ReadWrite open, so
// skipping under ReadOnly doesn't lose any safety guarantees —
// the manifest pin is the consistent snapshot regardless of
// drift on the per-table side or leftover schema-staging files.
if matches!(mode, OpenMode::ReadWrite) {
let schema_state_recovery =
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
.await?;
// Recovery sweep: close the Phase B → Phase C residual on
// any sidecar left over from a crashed writer. Continuous
// in-process recovery for long-running servers (no restart
// required between Phase B failure and recovery) is a
// separate background-reconciler effort.
crate::db::manifest::recover_manifest_drift(
&root,
Arc::clone(&storage),
&mut coordinator,
crate::db::manifest::RecoveryMode::Full,
schema_state_recovery,
)
.await?;
}
// Read _schema.pg (post-recovery — may have just been renamed in).
let schema_path = schema_source_uri(&root);
let schema_source = storage.read_text(&schema_path).await?;
@ -202,17 +261,29 @@ impl Omnigraph {
/// Engine-facing trait surface around `TableStore`.
///
/// MR-793 Phase 1: this is the canonical accessor for newly-written
/// engine code. The trait's signatures use opaque `SnapshotHandle` /
/// `StagedHandle` instead of leaking `lance::Dataset` /
/// This is the canonical accessor for newly-written engine code. The
/// trait's signatures use opaque `SnapshotHandle` / `StagedHandle`
/// instead of leaking `lance::Dataset` /
/// `lance::dataset::transaction::Transaction`. Existing call sites
/// that still use `db.table_store.X(...)` (the inherent struct
/// methods) are migrated incrementally — see §9 of
/// `.context/mr-793-design.md`.
/// methods) are migrated incrementally.
pub(crate) fn storage(&self) -> &dyn crate::storage_layer::TableStorage {
&self.table_store
}
/// Engine-level access to the object-store adapter (S3 / local fs).
/// Used by the recovery sidecar protocol — writers in the engine
/// call this to write/delete sidecars at `__recovery/{ulid}.json`.
pub(crate) fn storage_adapter(&self) -> &dyn crate::storage::StorageAdapter {
self.storage.as_ref()
}
/// Engine-level access to the repo's normalized root URI. Used by
/// the recovery sidecar protocol to compute `__recovery/` paths.
pub(crate) fn root_uri(&self) -> &str {
&self.root_uri
}
pub(crate) async fn open_coordinator_for_branch(
&self,
branch: Option<&str>,
@ -306,8 +377,89 @@ impl Omnigraph {
Ok(())
}
/// Re-read the handle-local coordinator state from storage.
pub(crate) async fn refresh(&mut self) -> Result<()> {
/// Re-read the handle-local coordinator state from storage AND run
/// in-process recovery. Closes the Phase B → Phase C residual (e.g.
/// `MutationStaging::finalize` crash mid-publish in a long-running
/// server) without restart.
///
/// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
/// recovery sequence, in the same order, with one restriction: the
/// manifest-drift sweep runs in `RollForwardOnly` mode (rollback /
/// abort cases defer to the next ReadWrite open because
/// `Dataset::restore` is unsafe under concurrency). Each step:
///
/// 1. `coordinator.refresh()` — re-read manifest.
/// 2. `recover_schema_state_files` — complete an in-flight
/// schema_apply's staging→final rename if a SchemaApply sidecar
/// is on disk; idempotent + early-returns when no staging files
/// exist. Required BEFORE manifest-drift recovery so a
/// SchemaApply roll-forward doesn't publish the manifest while
/// the staging files remain unrenamed (which would corrupt the
/// repo: data on new schema, catalog on old).
/// 3. `recover_manifest_drift(... RollForwardOnly)` — close the
/// finalize→publisher residual via roll-forward; defer rollback
/// work to next ReadWrite open.
/// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
///
/// Steady state cost: one `list_dir` of `__recovery/` (typically
/// returns empty → early return for both passes). No additional
/// Lance reads.
///
/// Engine-internal callers that already hold an in-flight sidecar
/// (e.g. `schema_apply` mid-write) MUST use
/// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
/// avoid the recovery sweep racing their own sidecar.
pub async fn refresh(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
let schema_state_recovery = recover_schema_state_files(
&self.root_uri,
Arc::clone(&self.storage),
&self.coordinator.snapshot(),
)
.await?;
crate::db::manifest::recover_manifest_drift(
&self.root_uri,
Arc::clone(&self.storage),
&mut self.coordinator,
crate::db::manifest::RecoveryMode::RollForwardOnly,
schema_state_recovery,
)
.await?;
self.reload_schema_if_source_changed().await?;
self.runtime_cache.invalidate_all().await;
Ok(())
}
async fn reload_schema_if_source_changed(&mut self) -> Result<()> {
let schema_path = schema_source_uri(&self.root_uri);
let schema_source = self.storage.read_text(&schema_path).await?;
if schema_source == self.schema_source {
return Ok(());
}
let current_source_ir = read_schema_ir_from_source(&schema_source)?;
let branches = self.coordinator.branch_list().await?;
let (accepted_ir, _) = load_or_bootstrap_schema_contract(
&self.root_uri,
Arc::clone(&self.storage),
&branches,
&current_source_ir,
)
.await?;
let mut catalog = build_catalog_from_ir(&accepted_ir)?;
fixup_blob_schemas(&mut catalog);
self.schema_source = schema_source;
self.catalog = catalog;
Ok(())
}
/// Refresh coordinator state and invalidate the runtime cache WITHOUT
/// running the recovery sweep. Engine-internal callers that hold an
/// in-flight sidecar (e.g. `schema_apply::apply_schema_with_lock`'s
/// internal lease-check refresh) need this variant: running recovery
/// here would observe the caller's own sidecar, classify it as
/// RolledPastExpected, and roll it forward — racing the caller's
/// own publish path.
pub(crate) async fn refresh_coordinator_only(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
self.runtime_cache.invalidate_all().await;
Ok(())
@ -462,6 +614,23 @@ impl Omnigraph {
table_ops::ensure_indices_on(self, branch).await
}
#[cfg(feature = "failpoints")]
#[doc(hidden)]
pub async fn failpoint_publish_table_head_without_index_rebuild_for_test(
&mut self,
branch: &str,
table_key: &str,
table_branch: Option<&str>,
) -> Result<u64> {
table_ops::failpoint_publish_table_head_without_index_rebuild_for_test(
self,
branch,
table_key,
table_branch,
)
.await
}
/// Compact small Lance fragments into fewer larger ones across every
/// node + edge table on `main`. See [`optimize`] for details.
pub async fn optimize(&mut self) -> Result<Vec<optimize::TableOptimizeStats>> {
@ -840,7 +1009,6 @@ impl Omnigraph {
pub(crate) async fn invalidate_graph_index(&self) {
table_ops::invalidate_graph_index(self).await
}
}
pub(crate) fn normalize_branch_name(branch: &str) -> Result<Option<String>> {
@ -1276,6 +1444,10 @@ edge WorksAt: Person -> Company
self.deletes.lock().unwrap().push(uri.to_string());
self.inner.delete(uri).await
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}
}
#[tokio::test]

View file

@ -151,6 +151,91 @@ pub(super) async fn apply_schema_with_lock(
let mut table_updates = HashMap::<String, crate::db::SubTableUpdate>::new();
let mut table_tombstones = HashMap::<String, u64>::new();
// Recovery sidecar: protect the per-table commit_staged loop in
// rewritten_tables + indexed_tables. The post_commit_pin we record
// here is a lower bound (expected + 1); the classifier loose-matches
// for SidecarKind::SchemaApply because the actual N depends on how
// many indices need building. See classify_table's loose-match arm.
let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = rewritten_tables
.iter()
.chain(indexed_tables.iter().filter(|t| {
!rewritten_tables.contains(*t)
&& !added_tables.contains(*t)
&& !renamed_tables.contains_key(*t)
}))
.filter_map(|table_key| {
let entry = snapshot.entry(table_key)?;
Some(crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: db.table_store.dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
})
})
.collect();
// Capture additional registrations + tombstones for the sidecar so
// recovery can publish them alongside the per-table updates. Without
// this, an added type's dataset is created in Phase B but the
// manifest never gains an entry for it after roll-forward — the
// live `_schema.pg` declares a type the manifest doesn't know about
// and reads through the engine fail with "no manifest entry for X".
let mut sidecar_registrations: Vec<crate::db::manifest::SidecarTableRegistration> = Vec::new();
for table_key in &added_tables {
sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
table_key: table_key.clone(),
table_path: table_path_for_table_key(table_key)?,
table_branch: None,
});
}
for target_table_key in renamed_tables.keys() {
sidecar_registrations.push(crate::db::manifest::SidecarTableRegistration {
table_key: target_table_key.clone(),
table_path: table_path_for_table_key(target_table_key)?,
table_branch: None,
});
}
let mut sidecar_tombstones: Vec<crate::db::manifest::SidecarTombstone> = Vec::new();
for source_table_key in renamed_tables.values() {
let source_entry = snapshot.entry(source_table_key).ok_or_else(|| {
OmniError::manifest(format!(
"missing source table '{}' for schema rename when building recovery sidecar",
source_table_key
))
})?;
sidecar_tombstones.push(crate::db::manifest::SidecarTombstone {
table_key: source_table_key.clone(),
tombstone_version: source_entry.table_version.saturating_add(1),
});
}
let recovery_handle = if recovery_pins.is_empty()
&& sidecar_registrations.is_empty()
&& sidecar_tombstones.is_empty()
{
None
} else {
// `branch=None` because schema_apply publishes against main —
// the `__schema_apply_lock__` branch is purely a serialization
// sentinel (acquire_schema_apply_lock creates it; the manifest
// publish via coordinator.commit_changes_with_actor below targets
// the coordinator's active branch, which is the pre-lock branch).
// If the lock release fires before recovery, the lock branch is
// gone — the sidecar must not reference it.
let mut sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::SchemaApply,
None,
db.audit_actor_id.clone(),
recovery_pins,
);
sidecar.additional_registrations = sidecar_registrations;
sidecar.tombstones = sidecar_tombstones;
Some(
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
.await?,
)
};
for table_key in &added_tables {
let table_path = table_path_for_table_key(table_key)?;
let dataset_uri = db.table_store.dataset_uri(&table_path);
@ -237,8 +322,8 @@ pub(super) async fn apply_schema_with_lock(
)
.await?;
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
// MR-793 Phase 6: route through stage_overwrite + commit_staged
// for non-empty batches. Lance's `InsertBuilder::execute_uncommitted`
// Route through stage_overwrite + commit_staged for non-empty
// batches. Lance's `InsertBuilder::execute_uncommitted`
// errors on empty data (lance-4.0.0 `src/dataset/write/insert.rs:144`),
// so the empty-rewrite case stays on `overwrite_dataset` (which
// accepts empty input). The empty case is rare in schema_apply
@ -257,11 +342,7 @@ pub(super) async fn apply_schema_with_lock(
// open the wrong HEAD here.
let existing = db
.table_store
.open_dataset_head_for_write(
table_key,
&dataset_uri,
entry.table_branch.as_deref(),
)
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
let staged = db.table_store.stage_overwrite(&existing, batch).await?;
db.table_store
@ -336,7 +417,7 @@ pub(super) async fn apply_schema_with_lock(
}));
}
db.refresh().await?;
db.refresh_coordinator_only().await?;
if db.version() != base_manifest_version {
return Err(OmniError::manifest_conflict(format!(
"schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
@ -353,6 +434,8 @@ pub(super) async fn apply_schema_with_lock(
// `recover_schema_state_files`:
// - crash before commit → manifest unchanged; staging deleted on open
// - crash after commit → manifest advanced; staging renamed on open
crate::failpoints::maybe_fail("schema_apply.before_staging_write")?;
let staging_pg_uri = schema_source_staging_uri(&db.root_uri);
db.storage
.write_text(&staging_pg_uri, desired_schema_source)
@ -396,6 +479,23 @@ pub(super) async fn apply_schema_with_lock(
db.invalidate_graph_index().await;
}
// Recovery sidecar lifecycle: delete after the manifest commit
// succeeded. Best-effort: if this delete fails, the sidecar persists
// and on next open the sweep sees every table at the post-publish
// manifest pin (NoMovement) and the sidecar is treated as a stale
// artifact (recovery is a no-op and the sidecar is cleaned up).
// Failing the schema_apply call would report failure for a migration
// that already succeeded.
if let Some(handle) = recovery_handle {
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
);
}
}
Ok(SchemaApplyResult {
supported: true,
applied: true,
@ -405,13 +505,13 @@ pub(super) async fn apply_schema_with_lock(
}
pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str) -> Result<()> {
db.refresh().await?;
db.refresh_coordinator_only().await?;
ensure_schema_apply_not_locked(db, operation).await
}
pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
db.ensure_schema_state_valid().await?;
db.refresh().await?;
db.refresh_coordinator_only().await?;
let branches = db.coordinator.all_branches().await?;
if branches
.iter()
@ -425,7 +525,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
db.coordinator
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
db.refresh().await?;
db.refresh_coordinator_only().await?;
let blocking_branches = db
.coordinator
@ -449,7 +549,12 @@ pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
db.coordinator
.branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
db.refresh().await
// Use refresh_coordinator_only — the full Omnigraph::refresh would
// run roll-forward-only recovery, and on the failure path the
// in-flight schema_apply sidecar is still on disk; recovery would
// race the caller's own publish (or roll forward an aborted apply
// we want to leave for next-open).
db.refresh_coordinator_only().await
}
pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {

View file

@ -31,6 +31,37 @@ pub(super) async fn ensure_indices_on(db: &mut Omnigraph, branch: &str) -> Resul
ensure_indices_for_branch(db, branch.as_deref()).await
}
#[cfg(feature = "failpoints")]
pub(super) async fn failpoint_publish_table_head_without_index_rebuild_for_test(
db: &mut Omnigraph,
branch: &str,
table_key: &str,
table_branch: Option<&str>,
) -> Result<u64> {
let branch = normalize_branch_name(branch)?;
let snapshot = db.snapshot_for_branch(branch.as_deref()).await?;
let entry = snapshot
.entry(table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
let ds = db
.table_store
.open_dataset_head_for_write(table_key, &full_path, table_branch)
.await?;
let state = db.table_store.table_state(&full_path, &ds).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: state.version,
table_branch: table_branch.map(str::to_string),
row_count: state.row_count,
version_metadata: state.version_metadata,
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.to_string(), entry.table_version);
commit_prepared_updates_on_branch_with_expected(db, branch.as_deref(), &[update], &expected)
.await
}
pub(super) async fn ensure_indices_for_branch(
db: &mut Omnigraph,
branch: Option<&str>,
@ -42,6 +73,95 @@ pub(super) async fn ensure_indices_for_branch(
let mut updates = Vec::new();
let active_branch = resolved.branch;
// Recovery sidecar: protect the per-table commit_staged loop in
// build_indices_on_dataset (one commit per index built). Only pins
// tables that ACTUALLY need index work — the classifier
// loose-matches for SidecarKind::EnsureIndices (the actual N
// depends on which indices are missing), but if a table needs zero
// commits and gets pinned, the all-or-nothing decision rule
// classifies it as `NoMovement` and rolls back legitimately-
// committed work on sibling tables. Steady-state runs (everything
// already indexed) skip the sidecar entirely.
let mut recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = Vec::new();
for type_name in db.catalog.node_types.keys() {
let table_key = format!("node:{}", type_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
// Match the processing loop's branch filter: when running on a
// feature branch, main-branch tables (table_branch = None) are
// skipped (`None => continue` at ~line 118). Pinning them here
// would force NoMovement on recovery and trigger an all-or-
// nothing rollback of legitimately-committed work on the
// feature-branch tables.
if active_branch.is_some() && entry.table_branch.is_none() {
continue;
}
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
if needs_index_work_node(
db,
type_name,
&table_key,
&full_path,
entry.table_branch.as_deref(),
)
.await?
{
recovery_pins.push(crate::db::manifest::SidecarTablePin {
table_key,
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// Use active_branch (where commits actually land), NOT
// entry.table_branch (where the table currently lives).
// open_owned_dataset_for_branch_write forks a feature
// branch from a main-branch table on first write — the
// resulting commit lands on active_branch. Recovery's
// open_lance_head must check the same branch.
table_branch: active_branch.clone(),
});
}
}
for edge_name in db.catalog.edge_types.keys() {
let table_key = format!("edge:{}", edge_name);
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
if active_branch.is_some() && entry.table_branch.is_none() {
continue;
}
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref()).await? {
recovery_pins.push(crate::db::manifest::SidecarTablePin {
table_key,
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// Use active_branch (where commits actually land), NOT
// entry.table_branch (where the table currently lives).
// open_owned_dataset_for_branch_write forks a feature
// branch from a main-branch table on first write — the
// resulting commit lands on active_branch. Recovery's
// open_lance_head must check the same branch.
table_branch: active_branch.clone(),
});
}
}
let recovery_handle = if recovery_pins.is_empty() {
None
} else {
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::EnsureIndices,
active_branch.clone(),
db.audit_actor_id.clone(),
recovery_pins,
);
Some(
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar)
.await?,
)
};
for type_name in db.catalog.node_types.keys() {
let table_key = format!("node:{}", type_name);
let Some(entry) = snapshot.entry(&table_key) else {
@ -136,13 +256,123 @@ pub(super) async fn ensure_indices_for_branch(
}
}
// Failpoint: pin the per-writer Phase B → Phase C residual for
// ensure_indices. Lance HEAD has advanced on every touched table
// (one commit_staged per index built) but the manifest publish below
// hasn't run. Used by
// `tests/failpoints.rs::ensure_indices_phase_b_failure_recovered_on_next_open`.
crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?;
if !updates.is_empty() {
commit_prepared_updates_on_branch(db, branch, &updates).await?;
}
// Recovery sidecar lifecycle: delete after the manifest publish (or
// no-op when there were no updates — the sidecar covered the
// per-table commit window regardless). Best-effort cleanup; failing
// the user here would error a call that already succeeded.
if let Some(handle) = recovery_handle {
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
);
}
}
Ok(())
}
/// Returns true if the node table is missing at least one declared
/// scalar/vector index that `build_indices_on_dataset_for_catalog` would
/// build AND has at least one row (the ensure_indices loop has
/// `if row_count > 0 { build_indices(...) }`, so empty tables produce
/// zero commits and must NOT be pinned in the sidecar — pinning them
/// would force `NoMovement` classification on recovery and trigger the
/// all-or-nothing rollback of sibling tables' legitimate index work).
///
/// Per the actual `build_indices_on_dataset_for_catalog` implementation
/// (this file, ~line 419-491), nodes get BTree (id) + per-prop FTS
/// (@search String) + per-prop Vector indices; edges get BTree only
/// (id, src, dst). The two helpers mirror that asymmetry — see the
/// `needs_index_work_edge` doc comment.
async fn needs_index_work_node(
db: &Omnigraph,
type_name: &str,
table_key: &str,
full_path: &str,
table_branch: Option<&str>,
) -> Result<bool> {
let ds = db
.table_store
.open_dataset_head_for_write(table_key, full_path, table_branch)
.await?;
// Empty tables are skipped by the ensure_indices loop, so they must
// not be pinned in the sidecar — pinning a table that produces zero
// commits classifies as NoMovement on recovery and forces all-or-
// nothing rollback of sibling tables' legitimate index work.
// Errors from count_rows are propagated: silently treating them as
// "0 rows" risks skipping a table that is actually about to be
// modified.
if db.table_store.count_rows(&ds, None).await? == 0 {
return Ok(false);
}
if !db.table_store.has_btree_index(&ds, "id").await? {
return Ok(true);
}
let Some(node_type) = db.catalog.node_types.get(type_name) else {
return Ok(false);
};
for index_cols in &node_type.indices {
if index_cols.len() != 1 {
continue;
}
let prop_name = &index_cols[0];
let Some(prop_type) = node_type.properties.get(prop_name) else {
continue;
};
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
if !db.table_store.has_fts_index(&ds, prop_name).await? {
return Ok(true);
}
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
if !db.table_store.has_vector_index(&ds, prop_name).await? {
return Ok(true);
}
}
}
Ok(false)
}
/// Companion to `needs_index_work_node` for edge tables.
///
/// **Intentional asymmetry with the node helper**: edges only need
/// BTree indices (id, src, dst) per `build_indices_on_dataset_for_catalog`
/// at the edge branch (this file, lines 474-485). FTS / vector indices
/// on edge properties are not built today; if they ever are, this
/// helper plus the build function must be updated together.
///
/// Empty edge tables are skipped by the ensure_indices loop the same
/// way node tables are; see `needs_index_work_node`.
async fn needs_index_work_edge(
db: &Omnigraph,
table_key: &str,
full_path: &str,
table_branch: Option<&str>,
) -> Result<bool> {
let ds = db
.table_store
.open_dataset_head_for_write(table_key, full_path, table_branch)
.await?;
if db.table_store.count_rows(&ds, None).await? == 0 {
return Ok(false);
}
Ok(!db.table_store.has_btree_index(&ds, "id").await?
|| !db.table_store.has_btree_index(&ds, "src").await?
|| !db.table_store.has_btree_index(&ds, "dst").await?)
}
pub(super) async fn open_for_mutation(
db: &Omnigraph,
table_key: &str,
@ -290,14 +520,14 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
}
if let Some(node_type) = catalog.node_types.get(type_name) {
// Per MR-793 §10 OQ3: stage scalar indices first (BTree,
// Inverted), then call `create_vector_index` inline. The
// inline-commit on a vector index advances HEAD, which would
// invalidate any uncommitted scalar index transactions if we
// stacked them. Today the per-stage shape commits each
// scalar index immediately so the order constraint is
// implicit, but if we ever batch scalar stages we must
// ensure they all land before the vector inline-commit.
// Stage scalar indices first (BTree, Inverted), then call
// `create_vector_index` inline. The inline-commit on a vector
// index advances HEAD, which would invalidate any uncommitted
// scalar index transactions if we stacked them. Today the
// per-stage shape commits each scalar index immediately so
// the order constraint is implicit, but if we ever batch
// scalar stages we must ensure they all land before the
// vector inline-commit.
for index_cols in &node_type.indices {
if index_cols.len() != 1 {
continue;
@ -353,13 +583,13 @@ pub(super) async fn build_indices_on_dataset_for_catalog(
}
/// Stage a BTREE index transaction and commit it, advancing the in-memory
/// `*ds` to the new HEAD. MR-793 Phase 4: replaces the previous
/// inline-commit `create_btree_index(ds)` call with the staged primitive
/// + an immediate `commit_staged`. Per-call behavior is unchanged
/// (HEAD advances once per index), but the bytes-on-disk and HEAD-advance
/// are now decoupled at the `TableStore` API surface — a caller that
/// needs end-of-batch atomicity can stage many transactions and commit
/// them in one pass (Phase 8's index reconciler relies on this).
/// `*ds` to the new HEAD. The staged primitive + immediate `commit_staged`
/// pair replaced the earlier inline-commit `create_btree_index(ds)` call.
/// Per-call behavior is unchanged (HEAD advances once per index), but
/// the bytes-on-disk and HEAD-advance are now decoupled at the
/// `TableStore` API surface — a caller that needs end-of-batch atomicity
/// can stage many transactions and commit them in one pass (the eventual
/// index reconciler relies on this).
async fn stage_and_commit_btree(
db: &Omnigraph,
table_key: &str,
@ -377,8 +607,9 @@ async fn stage_and_commit_btree(
))
})?;
// Failpoint between stage and commit. Used by `tests/failpoints.rs`
// to demonstrate that a Phase A failure in the staged-index path
// leaves no Lance-HEAD drift on the touched table.
// to demonstrate that a stage-step failure in the staged-index
// path (`stage_create_btree_index` succeeded; `commit_staged` not
// yet called) leaves no Lance-HEAD drift on the touched table.
crate::failpoints::maybe_fail("ensure_indices.post_stage_pre_commit_btree")?;
let new_ds = db
.table_store
@ -395,7 +626,7 @@ async fn stage_and_commit_btree(
}
/// Stage an INVERTED (FTS) index transaction and commit it. See
/// `stage_and_commit_btree` for the MR-793 Phase 4 rationale.
/// `stage_and_commit_btree` for the rationale.
async fn stage_and_commit_inverted(
db: &Omnigraph,
table_key: &str,

View file

@ -0,0 +1,362 @@
//! Recovery audit row storage in `_graph_commit_recoveries.lance`.
//!
//! Sibling to `_graph_commits.lance` (`commit_graph.rs`). Each successful
//! recovery sweep — roll-forward or roll-back — records one row here so
//! operators investigating a sidecar-attributed mutation can correlate
//! `omnigraph commit list --filter actor=omnigraph:recovery` with the
//! original actor whose mutation was rolled forward / back.
//!
//! Sibling-table is additive: it doesn't bump
//! `INTERNAL_MANIFEST_SCHEMA_VERSION`, and can be removed in favor of a
//! schema migration later if the join cost matters. The schema-migration
//! alternative (adding `recovery_for_actor` and `recovery_kind` columns
//! to `_graph_commits.lance` itself) was considered and rejected to keep
//! this change additive.
//!
//! Atomicity caveat: append to `_graph_commit_recoveries.lance` is
//! sequential w.r.t. the `CommitGraph::append_commit` write. A crash
//! between the two leaves an orphan commit-graph row with no audit row.
//! Same shape as the existing `_graph_commits` + `_graph_commit_actors`
//! split; the recovery sweep tolerates it the same way (re-entry sees
//! `NoMovement` for already-restored / already-published tables; the
//! audit append is retried).
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use arrow_array::{
Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::{WriteMode, WriteParams};
use lance_file::version::LanceFileVersion;
use serde::{Deserialize, Serialize};
use crate::error::{OmniError, Result};
const RECOVERIES_DIR: &str = "_graph_commit_recoveries.lance";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) enum RecoveryKind {
RolledForward,
RolledBack,
}
impl RecoveryKind {
fn as_str(self) -> &'static str {
match self {
RecoveryKind::RolledForward => "RolledForward",
RecoveryKind::RolledBack => "RolledBack",
}
}
fn parse(s: &str) -> Result<Self> {
match s {
"RolledForward" => Ok(RecoveryKind::RolledForward),
"RolledBack" => Ok(RecoveryKind::RolledBack),
other => Err(OmniError::manifest_internal(format!(
"unknown recovery_kind '{}' in _graph_commit_recoveries.lance",
other
))),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct TableOutcome {
pub table_key: String,
/// For RolledForward: the prior manifest pin (== sidecar.expected_version).
/// For RolledBack: same.
pub from_version: u64,
/// For RolledForward: the new manifest pin (== sidecar.post_commit_pin).
/// For RolledBack: == sidecar.expected_version (Lance HEAD reverted).
pub to_version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct RecoveryAuditRecord {
pub graph_commit_id: String,
pub recovery_kind: RecoveryKind,
pub recovery_for_actor: Option<String>,
pub operation_id: String,
pub sidecar_writer_kind: String,
pub per_table_outcomes: Vec<TableOutcome>,
pub created_at: i64,
}
pub(crate) struct RecoveryAudit {
root_uri: String,
dataset: Option<Dataset>,
}
impl RecoveryAudit {
/// Open the recovery-audit dataset for the repo, or return a handle
/// with no dataset yet (created on first append). Mirrors the
/// optional-dataset pattern from `_graph_commit_actors.lance`.
pub(crate) async fn open(root_uri: &str) -> Result<Self> {
let root = root_uri.trim_end_matches('/').to_string();
let dataset = Dataset::open(&recoveries_uri(&root)).await.ok();
Ok(Self {
root_uri: root,
dataset,
})
}
/// Append one recovery audit record. Lazily initializes the dataset
/// on first call (idempotent under racy creation via the same
/// `Dataset already exists` rebound as `_graph_commit_actors.lance`).
pub(crate) async fn append(&mut self, record: RecoveryAuditRecord) -> Result<()> {
let batch = recovery_record_to_batch(&record)?;
let reader = RecordBatchIterator::new(vec![Ok(batch)], recoveries_schema());
let mut dataset = match self.dataset.take() {
Some(dataset) => dataset,
None => create_recoveries_dataset(&self.root_uri).await?,
};
dataset
.append(reader, None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
self.dataset = Some(dataset);
Ok(())
}
/// Read every recorded recovery (test + audit-CLI surface). Ordered by
/// `created_at` ascending.
pub(crate) async fn list(&self) -> Result<Vec<RecoveryAuditRecord>> {
let dataset = match &self.dataset {
Some(dataset) => dataset,
None => return Ok(Vec::new()),
};
let batches: Vec<RecordBatch> = dataset
.scan()
.try_into_stream()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let mut out = Vec::new();
for batch in batches {
for row in 0..batch.num_rows() {
out.push(decode_row(&batch, row)?);
}
}
out.sort_by_key(|r| r.created_at);
Ok(out)
}
}
fn recoveries_uri(root_uri: &str) -> String {
format!("{}/{}", root_uri.trim_end_matches('/'), RECOVERIES_DIR)
}
fn recoveries_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("graph_commit_id", DataType::Utf8, false),
Field::new("recovery_kind", DataType::Utf8, false),
Field::new("recovery_for_actor", DataType::Utf8, true),
Field::new("operation_id", DataType::Utf8, false),
Field::new("sidecar_writer_kind", DataType::Utf8, false),
// per_table_outcomes is serialized as a JSON string. The audit
// table is queried infrequently; a JSON column avoids needing
// a list-of-struct schema, which would make schema evolution
// (adding fields per outcome) more painful.
Field::new("per_table_outcomes_json", DataType::Utf8, false),
Field::new(
"created_at",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
]))
}
async fn create_recoveries_dataset(root_uri: &str) -> Result<Dataset> {
let uri = recoveries_uri(root_uri);
let batch = RecordBatch::new_empty(recoveries_schema());
let reader = RecordBatchIterator::new(vec![Ok(batch)], recoveries_schema());
let params = WriteParams {
mode: WriteMode::Create,
enable_stable_row_ids: true,
data_storage_version: Some(LanceFileVersion::V2_2),
..Default::default()
};
match Dataset::write(reader, &uri as &str, Some(params)).await {
Ok(dataset) => Ok(dataset),
Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
.await
.map_err(|open_err| OmniError::Lance(open_err.to_string())),
Err(err) => Err(OmniError::Lance(err.to_string())),
}
}
fn recovery_record_to_batch(record: &RecoveryAuditRecord) -> Result<RecordBatch> {
let outcomes_json = serde_json::to_string(&record.per_table_outcomes).map_err(|e| {
OmniError::manifest_internal(format!(
"failed to serialize per_table_outcomes for recovery audit: {}",
e
))
})?;
RecordBatch::try_new(
recoveries_schema(),
vec![
Arc::new(StringArray::from(vec![record.graph_commit_id.clone()])),
Arc::new(StringArray::from(vec![record.recovery_kind.as_str()])),
Arc::new(StringArray::from(vec![record
.recovery_for_actor
.clone()])),
Arc::new(StringArray::from(vec![record.operation_id.clone()])),
Arc::new(StringArray::from(vec![record.sidecar_writer_kind.clone()])),
Arc::new(StringArray::from(vec![outcomes_json])),
Arc::new(TimestampMicrosecondArray::from(vec![record.created_at])),
],
)
.map_err(|e| OmniError::Lance(e.to_string()))
}
fn decode_row(batch: &RecordBatch, row: usize) -> Result<RecoveryAuditRecord> {
let str_col = |name: &str| -> Result<&StringArray> {
batch
.column_by_name(name)
.ok_or_else(|| OmniError::manifest_internal(format!("missing column '{}' in recovery audit", name)))?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| OmniError::manifest_internal(format!("column '{}' has wrong type", name)))
};
let ts_col = batch
.column_by_name("created_at")
.ok_or_else(|| OmniError::manifest_internal("missing 'created_at' column".to_string()))?
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| {
OmniError::manifest_internal("'created_at' column has wrong type".to_string())
})?;
let graph_commit_ids = str_col("graph_commit_id")?;
let kinds = str_col("recovery_kind")?;
let for_actors = str_col("recovery_for_actor")?;
let op_ids = str_col("operation_id")?;
let writers = str_col("sidecar_writer_kind")?;
let outcomes_json = str_col("per_table_outcomes_json")?;
let outcomes: Vec<TableOutcome> =
serde_json::from_str(outcomes_json.value(row)).map_err(|e| {
OmniError::manifest_internal(format!(
"failed to deserialize per_table_outcomes_json from recovery audit: {}",
e
))
})?;
Ok(RecoveryAuditRecord {
graph_commit_id: graph_commit_ids.value(row).to_string(),
recovery_kind: RecoveryKind::parse(kinds.value(row))?,
recovery_for_actor: if for_actors.is_null(row) {
None
} else {
Some(for_actors.value(row).to_string())
},
operation_id: op_ids.value(row).to_string(),
sidecar_writer_kind: writers.value(row).to_string(),
per_table_outcomes: outcomes,
created_at: ts_col.value(row),
})
}
pub(crate) fn now_micros() -> Result<i64> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.map_err(|e| {
OmniError::manifest_internal(format!("system clock before unix epoch: {}", e))
})
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_record() -> RecoveryAuditRecord {
RecoveryAuditRecord {
graph_commit_id: "01H000000000000000000000XX".to_string(),
recovery_kind: RecoveryKind::RolledForward,
recovery_for_actor: Some("act-alice".to_string()),
operation_id: "01H000000000000000000000OP".to_string(),
sidecar_writer_kind: "Mutation".to_string(),
per_table_outcomes: vec![
TableOutcome {
table_key: "node:Person".to_string(),
from_version: 5,
to_version: 6,
},
TableOutcome {
table_key: "edge:Knows".to_string(),
from_version: 12,
to_version: 13,
},
],
created_at: 1_700_000_000_000_000,
}
}
#[tokio::test]
async fn recovery_audit_round_trips_through_lance() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_str().unwrap();
let mut audit = RecoveryAudit::open(root).await.unwrap();
// Empty repo: list returns empty.
assert!(audit.list().await.unwrap().is_empty());
// Append + list.
let record = sample_record();
audit.append(record.clone()).await.unwrap();
let listed = audit.list().await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0], record);
// Append a second record; both visible, sorted by created_at.
let mut second = sample_record();
second.graph_commit_id = "01H000000000000000000000YY".to_string();
second.recovery_kind = RecoveryKind::RolledBack;
second.recovery_for_actor = None;
second.created_at = record.created_at + 1;
audit.append(second.clone()).await.unwrap();
let listed = audit.list().await.unwrap();
assert_eq!(listed.len(), 2);
assert_eq!(listed[0], record);
assert_eq!(listed[1], second);
}
#[tokio::test]
async fn recovery_audit_persists_across_open_cycles() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_str().unwrap();
{
let mut audit = RecoveryAudit::open(root).await.unwrap();
audit.append(sample_record()).await.unwrap();
}
let audit = RecoveryAudit::open(root).await.unwrap();
let listed = audit.list().await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0], sample_record());
}
#[test]
fn recovery_kind_round_trips_through_string() {
assert_eq!(
RecoveryKind::parse("RolledForward").unwrap(),
RecoveryKind::RolledForward,
);
assert_eq!(
RecoveryKind::parse("RolledBack").unwrap(),
RecoveryKind::RolledBack,
);
assert!(RecoveryKind::parse("Garbage").is_err());
}
}

View file

@ -285,6 +285,24 @@ fn schema_lock_conflict(detail: impl Into<String>) -> OmniError {
))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SchemaStateRecovery {
Noop,
CleanedStaging,
CompletedStagingRename { schema_apply_sidecar: bool },
}
impl SchemaStateRecovery {
pub(crate) fn completed_schema_apply_sidecar_rename(self) -> bool {
matches!(
self,
Self::CompletedStagingRename {
schema_apply_sidecar: true,
}
)
}
}
/// Reconcile leftover schema staging files (`_schema.pg.staging`,
/// `_schema.ir.json.staging`, `__schema_state.json.staging`) against the
/// manifest snapshot.
@ -306,7 +324,7 @@ pub(crate) async fn recover_schema_state_files(
root_uri: &str,
storage: Arc<dyn StorageAdapter>,
snapshot: &Snapshot,
) -> Result<()> {
) -> Result<SchemaStateRecovery> {
let pg_staging = schema_source_staging_uri(root_uri);
let ir_staging = schema_ir_staging_uri(root_uri);
let state_staging = schema_state_staging_uri(root_uri);
@ -316,7 +334,28 @@ pub(crate) async fn recover_schema_state_files(
let state_exists = storage.exists(&state_staging).await?;
if !pg_exists && !ir_exists && !state_exists {
return Ok(());
return Ok(SchemaStateRecovery::Noop);
}
// Schema-apply atomicity: when a SchemaApply sidecar is present,
// the writer reached Phase B (Lance HEADs advanced) but didn't
// complete Phase C (manifest publish + staging→final renames). The
// recovery sweep about to run will roll the table versions forward
// to the new Lance HEADs; we MUST also rename the staging files
// forward so the catalog matches. Without this, the disambiguation
// logic below sees actual_keys == live_keys (manifest didn't move)
// and deletes the staging files, leaving the repo with new-schema
// data on disk but the old `_schema.pg` live — corruption.
if crate::db::manifest::has_schema_apply_sidecar(root_uri, storage.as_ref()).await? {
warn!(
"recovery: SchemaApply sidecar present; completing schema-staging rename so the \
manifest-drift sweep's roll-forward sees the new catalog (manifest v{})",
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
return Ok(SchemaStateRecovery::CompletedStagingRename {
schema_apply_sidecar: true,
});
}
if !pg_exists {
@ -346,7 +385,9 @@ pub(crate) async fn recover_schema_state_files(
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
return Ok(());
return Ok(SchemaStateRecovery::CompletedStagingRename {
schema_apply_sidecar: false,
});
}
let staging_source = storage.read_text(&pg_staging).await?;
@ -365,7 +406,7 @@ pub(crate) async fn recover_schema_state_files(
"removing leftover schema staging files matching the live schema (no-op apply that crashed)"
);
cleanup_staging_files(root_uri, storage.as_ref()).await?;
return Ok(());
return Ok(SchemaStateRecovery::CleanedStaging);
}
let live_keys = expected_table_keys(&live_ir);
@ -388,14 +429,16 @@ pub(crate) async fn recover_schema_state_files(
snapshot.version()
);
cleanup_staging_files(root_uri, storage.as_ref()).await?;
Ok(())
Ok(SchemaStateRecovery::CleanedStaging)
} else if actual_keys == staging_keys {
warn!(
"schema apply crashed after manifest commit; completing schema-file rename (manifest v{})",
snapshot.version()
);
complete_staging_rename(root_uri, storage.as_ref()).await?;
Ok(())
Ok(SchemaStateRecovery::CompletedStagingRename {
schema_apply_sidecar: false,
})
} else {
Err(schema_lock_conflict(format!(
"found schema staging files but the manifest's table set ({:?}) matches neither the live schema ({:?}) nor the staging schema ({:?}); manual operator action required",
@ -407,9 +450,7 @@ pub(crate) async fn recover_schema_state_files(
async fn cleanup_staging_files(root_uri: &str, storage: &dyn StorageAdapter) -> Result<()> {
storage.delete(&schema_source_staging_uri(root_uri)).await?;
storage.delete(&schema_ir_staging_uri(root_uri)).await?;
storage
.delete(&schema_state_staging_uri(root_uri))
.await?;
storage.delete(&schema_state_staging_uri(root_uri)).await?;
Ok(())
}

View file

@ -913,9 +913,9 @@ async fn publish_rewritten_merge_table(
// Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for
// existing rows, bumps _row_last_updated_at_version only for actually-changed rows).
//
// MR-793 Phase 5: routed through the staged primitive so a failure
// between writing fragments and committing leaves no Lance-HEAD
// drift. The commit_staged here is per-table per-call (Lance has no
// Routed through the staged primitive so a failure between writing
// fragments and committing leaves no Lance-HEAD drift. The
// commit_staged here is per-table per-call (Lance has no
// multi-dataset atomic commit); the residual sits at this single
// commit point, narrowed from the previous "merge_insert + delete +
// index" multi-step inline-commit chain.
@ -957,11 +957,11 @@ async fn publish_rewritten_merge_table(
//
// INLINE-COMMIT RESIDUAL: lance-4.0.0 does not expose a public
// two-phase delete API (DeleteJob is `pub(crate)` —
// lance-format/lance#6658 is open with no PRs). MR-793 deliberately
// does NOT introduce a `stage_delete` wrapper that would secretly
// inline-commit (a side-channel — see design doc §3.2). When the
// upstream API ships, swap this `delete_where` call for
// `stage_delete` + `commit_staged`.
// lance-format/lance#6658 is open with no PRs). We deliberately do
// NOT introduce a `stage_delete` wrapper that would secretly
// inline-commit (it would create a side-channel between the staged
// and inline write paths). When the upstream API ships, swap this
// `delete_where` call for `stage_delete` + `commit_staged`.
if !staged.deleted_ids.is_empty() {
let escaped: Vec<String> = staged
.deleted_ids
@ -977,11 +977,11 @@ async fn publish_rewritten_merge_table(
// Phase 3: rebuild indices.
//
// `build_indices_on_dataset` was migrated in MR-793 Phase 4 to use
// `stage_create_btree_index` / `stage_create_inverted_index` +
// `commit_staged` for scalar indices. Vector indices remain inline
// (residual — `build_index_metadata_from_segments` is `pub(crate)`
// in lance-4.0.0; companion ticket to lance-format/lance#6658).
// `build_indices_on_dataset` uses `stage_create_btree_index` /
// `stage_create_inverted_index` + `commit_staged` for scalar
// indices. Vector indices remain inline-commit
// (`build_index_metadata_from_segments` is `pub(crate)` in lance-
// 4.0.0 — companion ticket to lance-format/lance#6658).
let row_count = target_db
.table_store()
.table_state(&full_path, &current_ds)
@ -1167,6 +1167,96 @@ impl Omnigraph {
validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
// Recovery sidecar: protect the per-table commit_staged loop.
// Pin only `RewriteMerged` candidates because they always
// advance Lance HEAD through `publish_rewritten_merge_table`
// (which runs stage_merge_insert + delete_where + index
// rebuilds — multiple commit_staged calls per table; loose
// classification handles the multi-step drift).
//
// `AdoptSourceState` candidates are NOT pinned: their publish
// path is `publish_adopted_source_state`, whose subcases mostly
// don't advance Lance HEAD (pure manifest pointer switch, or
// fork via `fork_dataset_from_entry_state` which only adds a
// Lance branch ref). If those subcases were pinned, recovery
// would classify them as NoMovement and the all-or-nothing
// decision would force a rollback that destroys legitimately-
// committed work on sibling RewriteMerged tables.
//
// Residual: two `AdoptSourceState` subcases (when source has a
// table_branch AND the source delta is non-empty) internally
// call `publish_rewritten_merge_table` and DO advance HEAD.
// Those are not covered by this sidecar — if they fail mid-
// commit, the residual persists until the next ReadWrite open
// detects it via a subsequent ExpectedVersionMismatch from a
// later writer that touches the same table. Closing this gap
// requires pre-computing source deltas during candidate
// classification (a structural change to `CandidateTableState`)
// and is left as follow-up work.
let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
.iter()
.filter_map(|table_key| {
let candidate = candidates.get(table_key)?;
if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
return None;
}
let entry = target_snapshot.entry(table_key)?;
Some(crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: self.table_store().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// Use the merge target branch (where commits actually
// land), NOT entry.table_branch (where the table
// currently lives). publish_rewritten_merge_table calls
// open_for_mutation, which forks an inherited-from-main
// table to active_branch on first write — the resulting
// Lance commit lands on active_branch. Recovery's
// open_lance_head must check the same branch, otherwise
// an inherited-table feature-to-feature merge classifies
// as NoMovement and the all-or-nothing rollback skips
// the orphaned post-Phase-B HEAD on the target ref.
// Same rationale as table_ops.rs:115-120 in
// ensure_indices_for_branch.
table_branch: self.active_branch().map(str::to_string),
})
})
.collect();
let recovery_handle = if recovery_pins.is_empty() {
None
} else {
// Use the merge target branch directly, NOT a heuristic
// derived from `ordered_table_keys.first()`. The first
// sorted table key may not be in the target snapshot at all
// (its `entry()` returns None → branch becomes None == main),
// and the SubTableEntry's `table_branch` field isn't
// necessarily the merge target branch. The caller
// `branch_merge` calls `swap_coordinator_for_branch(target_branch)`
// before invoking this function, so `self.active_branch()`
// is the target.
let target_branch = self.active_branch().map(str::to_string);
let mut sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::BranchMerge,
target_branch,
self.audit_actor_id.clone(),
recovery_pins,
);
// Carry the source branch's HEAD commit id so the recovery
// sweep's audit step can record this as a MERGE commit
// (linked to the source) instead of a plain commit. Without
// this, future merges between the same pair lose
// already-up-to-date detection and merge-base correctness.
sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
Some(
crate::db::manifest::write_sidecar(
self.root_uri(),
self.storage_adapter(),
&sidecar,
)
.await?,
)
};
let mut updates = Vec::new();
let mut changed_edge_tables = false;
for table_key in &ordered_table_keys {
@ -1195,11 +1285,32 @@ impl Omnigraph {
updates.push(update);
}
// Failpoint: pin the per-writer Phase B → Phase C residual for
// branch_merge. Lance HEAD has advanced on every touched table
// (publish_*) but the manifest publish below hasn't run. Used
// by `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
let manifest_version = if updates.is_empty() {
self.version()
} else {
self.commit_manifest_updates(&updates).await?
};
// Recovery sidecar lifecycle: delete after manifest publish.
// Best-effort cleanup; the merge already landed durably so
// failing the user here is undesirable.
if let Some(handle) = recovery_handle {
if let Err(err) =
crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
{
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
);
}
}
self.record_merge_commit(
manifest_version,
target_head_commit_id,

View file

@ -737,18 +737,23 @@ impl Omnigraph {
Err(e) => Err(e),
Ok(total) if staging.is_empty() => Ok(total),
Ok(total) => {
let (updates, expected_versions) = staging
.finalize(self, requested.as_deref())
let (updates, expected_versions, sidecar_handle) = staging
.finalize(
self,
requested.as_deref(),
crate::db::manifest::SidecarKind::Mutation,
)
.await?;
// Failpoint that wedges the documented finalize→publisher
// residual: per-table `commit_staged` calls already
// advanced Lance HEAD on every touched table; a failure
// injected here mirrors the production-rare case where
// the publisher's CAS pre-check rejects (or the manifest
// write throws) after staged commits succeeded. Used by
// `tests/failpoints.rs::finalize_publisher_residual_*`
// to pin the documented residual behavior. See
// `docs/runs.md` "Finalize → publisher residual".
// write throws) after staged commits succeeded. The
// sidecar written inside `staging.finalize()` persists
// across this failure so the next `Omnigraph::open`'s
// recovery sweep can roll forward — see
// `tests/failpoints.rs::recovery_rolls_forward_after_finalize_publisher_failure`.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
self.commit_updates_on_branch_with_expected(
requested.as_deref(),
@ -756,6 +761,33 @@ impl Omnigraph {
&expected_versions,
)
.await?;
// Phase C succeeded — sidecar can be deleted. If this
// delete fails, the next open's sweep classifies every
// table as NoMovement (manifest pin == Lance HEAD ==
// post_commit_pin) and the sidecar is treated as a
// stale artifact (cleaned up via the Phase 2 logic).
if let Some(handle) = sidecar_handle {
// Best-effort cleanup: the manifest publish already
// succeeded, so the user's mutation is durable. A
// failed delete leaves the sidecar on disk; the
// next open's recovery sweep classifies every table
// as `NoMovement` (manifest pin == Lance HEAD ==
// post_commit_pin) and tidies up. Failing the user
// here would return an error for a write that
// already landed.
if let Err(err) = crate::db::manifest::delete_sidecar(
&handle,
self.storage_adapter(),
)
.await
{
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
);
}
}
Ok(total)
}
}

View file

@ -27,6 +27,9 @@ use lance::Dataset;
use omnigraph_compiler::catalog::EdgeType;
use crate::db::SubTableUpdate;
use crate::db::manifest::{
new_sidecar, write_sidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
};
use crate::error::{OmniError, Result};
/// Whether the per-table accumulator should commit via `stage_append`
@ -218,8 +221,13 @@ impl MutationStaging {
pub(crate) async fn finalize(
self,
db: &crate::db::Omnigraph,
_branch: Option<&str>,
) -> Result<(Vec<SubTableUpdate>, HashMap<String, u64>)> {
branch: Option<&str>,
sidecar_kind: SidecarKind,
) -> Result<(
Vec<SubTableUpdate>,
HashMap<String, u64>,
Option<RecoverySidecarHandle>,
)> {
let MutationStaging {
expected_versions,
paths,
@ -230,6 +238,50 @@ impl MutationStaging {
let mut updates: Vec<SubTableUpdate> =
inline_committed.into_values().collect();
// Sidecar protocol: build the per-table pin list BEFORE any Lance
// commit_staged runs, then write the sidecar so a crash between
// Phase B (this loop's commit_staged calls) and Phase C (the
// manifest publish in the caller) is recoverable on next open.
// Skipped when `pending` is empty (delete-only mutation; the D₂
// parse-time rule keeps deletes out of this code path so this
// branch is reached only for the inline-committed-only case).
let pins: Vec<SidecarTablePin> = pending
.iter()
.map(|(table_key, _)| {
let path = paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing path for table '{}'",
table_key,
))
})?;
let expected = *expected_versions.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::finalize: missing expected version for table '{}'",
table_key,
))
})?;
Ok::<SidecarTablePin, OmniError>(SidecarTablePin {
table_key: table_key.clone(),
table_path: path.full_path.clone(),
expected_version: expected,
post_commit_pin: expected + 1,
table_branch: path.table_branch.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
let sidecar_handle = if pins.is_empty() {
None
} else {
let sidecar = new_sidecar(
sidecar_kind,
branch.map(|s| s.to_string()),
db.audit_actor_id.clone(),
pins,
);
Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?)
};
for (table_key, table) in pending {
let path = paths.get(&table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
@ -318,7 +370,7 @@ impl MutationStaging {
});
}
Ok((updates, expected_versions))
Ok((updates, expected_versions, sidecar_handle))
}
}

View file

@ -537,10 +537,39 @@ async fn load_jsonl_reader<R: BufRead>(
// Phase 4: Atomic manifest commit with publisher-level OCC.
if use_staging {
let (updates, expected_versions) = staging.finalize(db, branch).await?;
let (updates, expected_versions, sidecar_handle) = staging
.finalize(db, branch, crate::db::manifest::SidecarKind::Load)
.await?;
// Same finalize → publisher residual as mutations: per-table
// staged commits have advanced Lance HEAD, but the manifest
// publish has not run yet. Reuse the mutation failpoint name so
// one failpoint pins the shared `MutationStaging` boundary.
crate::failpoints::maybe_fail("mutation.post_finalize_pre_publisher")?;
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
.await?;
// The recovery sidecar protects the per-table commit_staged →
// manifest publish window. Phase C succeeded — clean up
// best-effort: failing the user here would error out a write
// that already landed durably.
if let Some(handle) = sidecar_handle {
if let Err(err) =
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
{
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"recovery sidecar cleanup failed; the next open's recovery sweep will resolve it"
);
}
}
} else {
// LoadMode::Overwrite keeps the legacy inline-commit path —
// truncate-then-append doesn't fit the staged shape (see
// `docs/runs.md` "LoadMode::Overwrite residual"). The recovery
// sidecar is not applicable here because the writer doesn't go
// through MutationStaging; per-table inline commits + a final
// manifest publish handle their own residual via the documented
// operator workflow (re-run overwrite to recover).
db.commit_updates_on_branch_with_expected(
branch,
&overwrite_updates,

View file

@ -27,6 +27,10 @@ pub trait StorageAdapter: Debug + Send + Sync {
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
/// Remove a file. Returns Ok(()) if the file does not exist.
async fn delete(&self, uri: &str) -> Result<()>;
/// List all files (non-recursively, files only) directly under `dir_uri`.
/// Returns full URIs (same scheme as `dir_uri`). The result is unordered.
/// Returns Ok(empty) if the directory does not exist or is empty.
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -59,6 +63,16 @@ impl StorageAdapter for LocalStorageAdapter {
async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
let path = local_path_from_uri(uri)?;
// Ensure parent directory exists. S3 has no equivalent (PutObject
// is path-agnostic). For local fs, callers like the recovery
// sidecar protocol expect transparent directory creation under
// the repo root (the `__recovery/` directory doesn't pre-exist;
// first sidecar write creates it).
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await?;
}
}
tokio::fs::write(&path, contents).await?;
Ok(())
}
@ -82,6 +96,27 @@ impl StorageAdapter for LocalStorageAdapter {
Err(err) => Err(err.into()),
}
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
let path = local_path_from_uri(dir_uri)?;
let mut out = Vec::new();
let mut entries = match tokio::fs::read_dir(&path).await {
Ok(e) => e,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(out),
Err(err) => return Err(err.into()),
};
let dir_str = dir_uri.trim_end_matches('/');
while let Some(entry) = entries.next_entry().await? {
let ft = entry.file_type().await?;
if !ft.is_file() {
continue;
}
if let Some(name) = entry.file_name().to_str() {
out.push(format!("{}/{}", dir_str, name));
}
}
Ok(out)
}
}
#[async_trait]
@ -154,6 +189,43 @@ impl StorageAdapter for S3StorageAdapter {
Err(err) => Err(storage_backend_error("delete", uri, err)),
}
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
// Normalize: ensure the URI describes a directory (trailing '/') so
// we don't match sibling paths with a shared prefix
// (e.g. listing `__recovery` shouldn't match `__recovery_log/...`).
let dir_with_slash = if dir_uri.ends_with('/') {
dir_uri.to_string()
} else {
format!("{}/", dir_uri)
};
// object_store::Path strips the trailing '/'; re-add it for filtering.
let prefix_loc = self.object_path(&dir_with_slash)?;
let prefix_with_slash = format!("{}/", prefix_loc.as_ref());
let mut entries = self.store.list(Some(&prefix_loc));
let mut out = Vec::new();
let bucket_root = format!("{}{}/", S3_SCHEME_PREFIX, self.bucket);
while let Some(meta) = entries
.try_next()
.await
.map_err(|err| storage_backend_error("list_dir", dir_uri, err))?
{
let key_str = meta.location.as_ref();
// Require the directory boundary to filter out sibling-prefix
// matches (object_store's `list` is prefix-based, not dir-based).
if !key_str.starts_with(&prefix_with_slash) {
continue;
}
let suffix = &key_str[prefix_with_slash.len()..];
// Non-recursive: skip anything inside a sub-directory.
if suffix.contains('/') {
continue;
}
out.push(format!("{}{}", bucket_root, key_str));
}
Ok(out)
}
}
impl S3StorageAdapter {

View file

@ -0,0 +1,921 @@
//! Composite end-to-end flow integration test.
//!
//! Walks the canonical user flow in one fixture: init → load → branch →
//! mutate → query → merge → time-travel → optimize → cleanup → reopen.
//! Every numbered step has at least one assertion.
//!
//! This is the deterministic narrative counterpart to a randomized /
//! property-based reliability harness — it catches regressions where
//! individual operations all pass their unit tests but their composition
//! breaks. It runs in CI on every PR (no `#[ignore]`).
mod helpers;
use arrow_array::{Array, Int64Array};
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;
use omnigraph_compiler::result::QueryResult;
use helpers::{
MUTATION_QUERIES, count_rows, count_rows_branch, mixed_params, mutate_branch, mutate_main,
query_branch, query_main, snapshot_main, version_branch, version_main,
};
/// Extract the `total` value from a `total_people` query result and
/// assert it equals `expected`. The query returns one row with one
/// `Int64` column named `total`; asserting only `num_rows() == 1`
/// would not catch a regression that returns a stale or wrong count.
fn assert_total(result: &QueryResult, expected: i64, context: &str) {
let batch = result.concat_batches().unwrap();
assert_eq!(
batch.num_rows(),
1,
"total_people must return exactly one summary row ({context})"
);
let total_col = batch
.column_by_name("total")
.unwrap_or_else(|| panic!("missing `total` column ({context})"))
.as_any()
.downcast_ref::<Int64Array>()
.unwrap_or_else(|| panic!("`total` column is not Int64 ({context})"));
assert_eq!(
total_col.value(0),
expected,
"total_people count mismatch ({context})"
);
}
const TEST_SCHEMA: &str = include_str!("fixtures/test.pg");
const TEST_DATA: &str = include_str!("fixtures/test.jsonl");
const TEST_QUERIES: &str = include_str!("fixtures/test.gq");
#[tokio::test]
async fn composite_flow_canonical_lifecycle() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// ─────────────────────────────────────────────────────────────────
// Step 1: init a fresh repo with the standard test schema.
// ─────────────────────────────────────────────────────────────────
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let v_init = version_branch(&db, "main").await.unwrap();
assert!(
v_init >= 1,
"init must produce a non-zero manifest version; got {}",
v_init
);
// ─────────────────────────────────────────────────────────────────
// Step 2: load JSONL seed data (Person + Company nodes,
// Knows + WorksAt edges).
// ─────────────────────────────────────────────────────────────────
load_jsonl(&mut db, TEST_DATA, LoadMode::Append).await.unwrap();
let v_after_load = version_branch(&db, "main").await.unwrap();
assert!(
v_after_load > v_init,
"load must advance the manifest version: v_init={}, v_after_load={}",
v_init,
v_after_load,
);
assert_eq!(
count_rows(&db, "node:Person").await,
4,
"test.jsonl declares 4 Person rows"
);
assert_eq!(
count_rows(&db, "node:Company").await,
2,
"test.jsonl declares 2 Company rows"
);
// ─────────────────────────────────────────────────────────────────
// Step 3: branch_create `feature` off main.
// ─────────────────────────────────────────────────────────────────
db.branch_create("feature").await.unwrap();
let branches = db.branch_list().await.unwrap();
assert!(
branches.iter().any(|b| b == "feature"),
"feature branch must appear in branch_list; got {:?}",
branches,
);
// ─────────────────────────────────────────────────────────────────
// Step 4: mutate on `feature` — single statement (insert) +
// multi-statement (insert + insert).
// ─────────────────────────────────────────────────────────────────
mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect("single-statement insert on feature");
mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "Frank"), ("$friend", "Eve")],
&[("$age", 33)],
),
)
.await
.expect("multi-statement insert+edge on feature");
// After: feature has 4 + Eve + Frank = 6 Persons.
let snap = db
.snapshot_of(ReadTarget::branch("feature"))
.await
.unwrap();
let person_ds = snap.open("node:Person").await.unwrap();
assert_eq!(
person_ds.count_rows(None).await.unwrap(),
6,
"feature should now have 6 Persons (4 seeded + Eve + Frank)"
);
// Main is untouched by feature mutations.
assert_eq!(
count_rows(&db, "node:Person").await,
4,
"main must remain at 4 Persons after feature mutations"
);
// ─────────────────────────────────────────────────────────────────
// Step 5: query on `feature` — exercise multi-modal modes.
// The fixture queries cover scalar lookup (get_person), traversal
// (friends_of), aggregation (friend_counts, total_people, age_stats).
// ─────────────────────────────────────────────────────────────────
let total_people = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!total_people.batches().is_empty(),
"total_people must return at least one batch"
);
let friends_of_alice = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Alice")], &[]),
)
.await
.unwrap();
assert!(
!friends_of_alice.batches().is_empty(),
"friends_of(Alice) must return data — Alice knows Bob and Charlie in the seed"
);
let unemployed = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"unemployed",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!unemployed.batches().is_empty(),
"unemployed (anti-join) must return Persons without WorksAt edges"
);
let friend_counts = query_branch(
&mut db,
"feature",
TEST_QUERIES,
"friend_counts",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!friend_counts.batches().is_empty(),
"friend_counts (aggregation) must return per-person counts"
);
// ─────────────────────────────────────────────────────────────────
// Step 6: mutate on `main` simultaneously — sets up a non-conflicting
// merge by touching a sibling type (Company) that feature didn't
// touch. (The test schema doesn't have a Company-mutation query, so
// we update an existing Person's age — Bob is on main but his age
// wasn't changed on feature.)
// ─────────────────────────────────────────────────────────────────
mutate_main(
&mut db,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Bob")], &[("$age", 26)]),
)
.await
.expect("set Bob's age on main");
let v_pre_merge_main = version_branch(&db, "main").await.unwrap();
// Capture the pre-merge main snapshot for time-travel verification later.
let snapshot_pre_merge = snapshot_main(&db).await.unwrap();
let pre_merge_version = snapshot_pre_merge.version();
// ─────────────────────────────────────────────────────────────────
// Step 7: branch_merge feature → main, verify merge result + audit.
// ─────────────────────────────────────────────────────────────────
let merge_outcome = db.branch_merge("feature", "main").await.unwrap();
let v_post_merge = version_branch(&db, "main").await.unwrap();
assert!(
v_post_merge > v_pre_merge_main,
"merge must advance main's manifest version: pre={}, post={}",
v_pre_merge_main,
v_post_merge,
);
let _ = merge_outcome;
// ─────────────────────────────────────────────────────────────────
// Step 8: query at the post-merge snapshot — verify both sides'
// writes are visible. Main now has 4 + Eve + Frank = 6 Persons,
// and Bob's age is 26 (from the main mutation).
// ─────────────────────────────────────────────────────────────────
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"post-merge main must have all 6 Persons"
);
// Verify Bob's age update from main carried through the merge.
let bob_after = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Bob")], &[]),
)
.await
.unwrap();
assert!(
!bob_after.batches().is_empty(),
"Bob must still be present on main post-merge"
);
// Verify Eve (from feature) is now visible on main.
let eve_after = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert!(
!eve_after.batches().is_empty(),
"Eve (from feature) must be visible on main post-merge"
);
// ─────────────────────────────────────────────────────────────────
// Step 9: snapshot_at_version(pre_merge_version) — verify time-travel
// still sees the pre-merge state (4 Persons on main, no Eve/Frank).
// ─────────────────────────────────────────────────────────────────
let pre_merge_snapshot = db.snapshot_at_version(pre_merge_version).await.unwrap();
let pre_merge_persons = pre_merge_snapshot
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_merge_persons, 4,
"time-travel to pre-merge version must show 4 Persons (pre-feature-merge state)"
);
// ─────────────────────────────────────────────────────────────────
// Step 10: optimize the post-merge graph — verify indices stay
// valid and queryable.
//
// **Known limitation**: `optimize_all_tables` calls Lance
// `compact_files` directly — it advances per-table Lance HEAD
// without updating the omnigraph `__manifest` pin. After optimize,
// the next writer's expected_table_versions captures the
// pre-optimize manifest pin, but the publisher's pre-check reads
// a higher version from the manifest dataset (because some other
// path — possibly schema-state recovery on reopen — wrote a newer
// __manifest row). The `ExpectedVersionMismatch` is benign
// (re-issuing the mutation after a snapshot refresh succeeds), but
// a composite test cannot reliably exercise post-optimize mutations
// until that path is investigated. Coverage of post-optimize
// mutations is left to a focused optimize+cleanup integration test.
// ─────────────────────────────────────────────────────────────────
let optimize_stats = db.optimize().await.unwrap();
assert!(
!optimize_stats.is_empty(),
"optimize must return per-table stats"
);
// Re-run a query to verify post-optimize correctness.
let post_optimize_total = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert!(
!post_optimize_total.batches().is_empty(),
"queries must still work after optimize"
);
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"row counts unchanged by optimize"
);
// ─────────────────────────────────────────────────────────────────
// Step 11: cleanup — keep last 10 versions, only purge versions
// older than 1 hour. With this small test, we have well under 10
// versions and nothing that old, so cleanup is a no-op except for
// any orphan files. The recovery floor (--keep ≥ 3) needed for the
// open-time recovery sweep is preserved by the keep-10 default.
// Verify the call doesn't break subsequent queries.
// ─────────────────────────────────────────────────────────────────
use omnigraph::db::CleanupPolicyOptions;
use std::time::Duration;
let _cleanup_stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: Some(10),
older_than: Some(Duration::from_secs(3600)),
})
.await
.unwrap();
// ─────────────────────────────────────────────────────────────────
// Step 12: reopen the engine — verify post-cleanup state is consistent.
// ─────────────────────────────────────────────────────────────────
drop(db);
let mut db = Omnigraph::open(uri).await.unwrap();
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"Person count consistent across reopen"
);
assert_eq!(
count_rows(&db, "node:Company").await,
2,
"Company count consistent across reopen"
);
// Branch list still contains feature.
let branches = db.branch_list().await.unwrap();
assert!(
branches.iter().any(|b| b == "feature"),
"feature branch must still be visible after reopen; got {:?}",
branches,
);
// Final query exercise — full read path works post-reopen,
// post-cleanup. Post-cleanup mutation is omitted here pending
// resolution of the optimize-vs-manifest-pin interaction documented
// in Step 10.
let final_total = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert!(!final_total.batches().is_empty());
}
/// Multi-branch sequential merges with main writes interleaved between
/// every diverge point. Catches compositional regressions that single-
/// merge tests can't see:
///
/// - **Base/LCA recomputation across two merges**: feat-b's base must be
/// the main version *at feat-b's branch creation*, not main's
/// post-feat-a-merge HEAD. A regression that uses main HEAD as the
/// merge base would re-classify Eve / Grace as unknown source-only
/// rows and re-apply them.
/// - **Manifest pin propagation through merge commits**: after merge
/// feat-a → main, main's table_branch entries for Person and Knows
/// must reflect the rewrite-on-active path; the second merge needs
/// them to compute its diff correctly.
/// - **Time-travel through merge DAG**: snapshot_at_version at three
/// distinct points (pre-feat-a-merge, post-feat-a-merge-pre-helen,
/// pre-feat-b-merge) must each return the right historical state
/// without bleed-through from later commits.
/// - **Reopen consistency over a multi-merge history**: dropping the
/// handle and reopening must replay the full merge DAG cleanly with
/// no recovery sweep activity (steady state).
///
/// All other compositional concerns (single merge mechanics, conflict
/// detection, time-travel mechanics) are covered by `branching.rs` and
/// `point_in_time.rs`. This test only exercises *composition*.
#[tokio::test]
async fn composite_flow_multi_branch_sequential_merges() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// ─────────────────────────────────────────────────────────────────
// Step 1: init + load baseline (4 Person, 2 Company, 3 Knows, 2 WorksAt
// edges from test.jsonl).
// ─────────────────────────────────────────────────────────────────
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Append).await.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, 4);
assert_eq!(count_rows(&db, "edge:Knows").await, 3);
// ─────────────────────────────────────────────────────────────────
// Step 2: mutate main — insert "Alice2" before any branching. Main
// diverges from the load baseline by exactly one row.
// ─────────────────────────────────────────────────────────────────
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Alice2")], &[("$age", 31)]),
)
.await
.expect("insert Alice2 on main");
assert_eq!(count_rows(&db, "node:Person").await, 5);
// ─────────────────────────────────────────────────────────────────
// Step 3: branch_create feat-a from main. feat-a inherits main's
// 5-Person state.
// ─────────────────────────────────────────────────────────────────
db.branch_create("feat-a").await.unwrap();
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 5);
// ─────────────────────────────────────────────────────────────────
// Step 4: mutate main — insert "Bob2" AFTER feat-a was created. main
// and feat-a now diverge: main has Bob2, feat-a does not.
// ─────────────────────────────────────────────────────────────────
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob2")], &[("$age", 26)]),
)
.await
.expect("insert Bob2 on main");
assert_eq!(count_rows(&db, "node:Person").await, 6);
assert_eq!(
count_rows_branch(&db, "feat-a", "node:Person").await,
5,
"feat-a must not see main's post-branch-create writes"
);
// ─────────────────────────────────────────────────────────────────
// Step 5: mutate feat-a — insert "Eve". feat-a now also has 6 rows,
// but the *sixth* is Eve, not Bob2.
// ─────────────────────────────────────────────────────────────────
mutate_branch(
&mut db,
"feat-a",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect("insert Eve on feat-a");
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 6);
assert_eq!(
count_rows(&db, "node:Person").await,
6,
"main must not see feat-a's writes"
);
// Branch isolation through the QUERY ENGINE (not just dataset-direct):
// `get_person` on feat-a finds Eve (uses the BTree index on Person.name);
// the same query on main finds nothing. Catches regressions where the
// planner resolves the wrong snapshot for branch-targeted reads.
let eve_on_feat_a = query_branch(
&mut db,
"feat-a",
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
eve_on_feat_a.num_rows(),
1,
"get_person(Eve) on feat-a must return 1 row through the query engine"
);
let eve_on_main = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
eve_on_main.num_rows(),
0,
"get_person(Eve) on main must return 0 rows — feat-a's writes are isolated"
);
// ─────────────────────────────────────────────────────────────────
// Step 6: branch_create feat-b from main. feat-b's base is main's
// current state (post-Bob2): 6 Persons including Bob2 but NOT Eve.
// The two branches now share neither base nor head with each other.
// ─────────────────────────────────────────────────────────────────
db.branch_create("feat-b").await.unwrap();
assert_eq!(count_rows_branch(&db, "feat-b", "node:Person").await, 6);
// ─────────────────────────────────────────────────────────────────
// Step 7: mutate feat-b — insert "Frank".
// ─────────────────────────────────────────────────────────────────
mutate_branch(
&mut db,
"feat-b",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect("insert Frank on feat-b");
assert_eq!(count_rows_branch(&db, "feat-b", "node:Person").await, 7);
// ─────────────────────────────────────────────────────────────────
// Step 8: mutate feat-a again — insert "Grace" + Knows(Grace → Eve).
// feat-a now has 7 Persons and 4 Knows edges.
// ─────────────────────────────────────────────────────────────────
mutate_branch(
&mut db,
"feat-a",
MUTATION_QUERIES,
"insert_person_and_friend",
&mixed_params(
&[("$name", "Grace"), ("$friend", "Eve")],
&[("$age", 28)],
),
)
.await
.expect("insert Grace + Knows(Grace → Eve) on feat-a");
assert_eq!(count_rows_branch(&db, "feat-a", "node:Person").await, 7);
assert_eq!(count_rows_branch(&db, "feat-a", "edge:Knows").await, 4);
assert_eq!(
count_rows(&db, "edge:Knows").await,
3,
"main's Knows must be untouched by feat-a's edge insert"
);
// Edge traversal through the QUERY ENGINE on feat-a: `friends_of(Grace)`
// exercises the Knows topology + index from feat-a's snapshot. Catches
// regressions in graph-index lookup against branch-local edge tables.
let graces_friends = query_branch(
&mut db,
"feat-a",
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
graces_friends.num_rows(),
1,
"friends_of(Grace) on feat-a must return Eve via the query engine + Knows index"
);
// ─────────────────────────────────────────────────────────────────
// Step 9: capture pre-merge-feat-a state. Both a version (for direct
// dataset open) AND a SnapshotId (for query-engine time-travel) are
// captured so we can later assert historical state through both paths.
// ─────────────────────────────────────────────────────────────────
let pre_merge_a_version = version_main(&db).await.unwrap();
let pre_merge_a_snap_id = db.resolve_snapshot("main").await.unwrap();
let pre_merge_a_persons = count_rows(&db, "node:Person").await;
assert_eq!(pre_merge_a_persons, 6);
// ─────────────────────────────────────────────────────────────────
// Step 10: merge feat-a → main. main gains Eve, Grace, and the
// Knows(Grace → Eve) edge. main's manifest version advances.
// ─────────────────────────────────────────────────────────────────
db.branch_merge("feat-a", "main").await.unwrap();
let post_merge_a_version = version_main(&db).await.unwrap();
assert!(
post_merge_a_version > pre_merge_a_version,
"merge feat-a → main must advance main's manifest version"
);
assert_eq!(count_rows(&db, "node:Person").await, 8);
assert_eq!(count_rows(&db, "edge:Knows").await, 4);
// Post-merge query-engine readback: Eve is now reachable on main via
// `get_person` (BTree index lookup) and Grace's edge to Eve survives
// the merge as a traversable edge via `friends_of`. This is the
// load-bearing check that `publish_rewritten_merge_table`'s Phase 3
// index rebuild produced a queryable result, not just data on disk.
let eve_on_main_post_merge = query_main(
&mut db,
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
eve_on_main_post_merge.num_rows(),
1,
"Eve must be findable on main post-merge through the BTree index"
);
let graces_friends_on_main = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
graces_friends_on_main.num_rows(),
1,
"friends_of(Grace) on main post-merge must traverse the rebuilt Knows index"
);
// ─────────────────────────────────────────────────────────────────
// Step 11: mutate main AFTER the first merge — insert "Helen". This
// makes feat-b's eventual merge a non-trivial one: feat-b's base
// (created in step 6) does not include Eve / Grace / Helen, but
// main now has all three on top of Bob2.
// ─────────────────────────────────────────────────────────────────
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Helen")], &[("$age", 44)]),
)
.await
.expect("insert Helen on main post-merge");
assert_eq!(count_rows(&db, "node:Person").await, 9);
// ─────────────────────────────────────────────────────────────────
// Step 12: capture pre-merge-feat-b state. Used for time-travel
// assertions in step 14.
// ─────────────────────────────────────────────────────────────────
let pre_merge_b_version = version_main(&db).await.unwrap();
let pre_merge_b_snap_id = db.resolve_snapshot("main").await.unwrap();
assert!(
pre_merge_b_version > post_merge_a_version,
"Helen insert must advance main's version past the merge"
);
// ─────────────────────────────────────────────────────────────────
// Step 13: merge feat-b → main. The diff base for this merge is
// feat-b's branch-creation point (step 6), NOT main's current head.
// A regression that uses main HEAD as the base would attempt to
// re-apply Eve/Grace/Helen as source-only rows or surface conflicts.
// ─────────────────────────────────────────────────────────────────
db.branch_merge("feat-b", "main").await.unwrap();
let post_merge_b_version = version_main(&db).await.unwrap();
assert!(
post_merge_b_version > pre_merge_b_version,
"merge feat-b → main must advance main's manifest version"
);
assert_eq!(
count_rows(&db, "node:Person").await,
10,
"main must contain all 10 Persons after both merges land"
);
// Aggregation through the QUERY ENGINE over the fully merged graph:
// `total_people` returns count(Person) = 10. Catches regressions in
// group-by/count execution against a multi-fragment table whose
// current shape was produced by two sequential merges.
let total_post_merges = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert_total(&total_post_merges, 10, "post both merges, main must total 10 Persons");
// ─────────────────────────────────────────────────────────────────
// Step 14: time-travel to pre-merge-a-version. Reads must return
// main's pre-feat-a-merge state: 6 Persons, no Eve / Grace / Frank /
// Helen. Catches snapshot leakage from later commits.
//
// Verified through TWO paths: direct dataset open (catches manifest-
// pin propagation regressions) AND `.gq` query against the captured
// SnapshotId (catches planner / index-state regressions where a
// historical query accidentally resolves against current indices
// instead of the snapshot's frozen index state).
// ─────────────────────────────────────────────────────────────────
let pre_a_snap = db.snapshot_at_version(pre_merge_a_version).await.unwrap();
let pre_a_persons = pre_a_snap
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_a_persons, 6,
"time-travel to pre-merge-a must show exactly 6 Persons (dataset-direct)"
);
let pre_a_knows = pre_a_snap
.open("edge:Knows")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_a_knows, 3,
"time-travel to pre-merge-a must show exactly 3 Knows edges (no Grace → Eve)"
);
// `.gq` query against the captured SnapshotId — the planner must
// resolve `total_people` against the historical Person snapshot,
// not main's current head. Asserts the actual count value (not just
// row count) so a planner regression that resolves to current state
// would surface here as a count mismatch (10 instead of 6).
let pre_a_total_via_query = db
.query(
ReadTarget::Snapshot(pre_merge_a_snap_id.clone()),
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert_total(
&pre_a_total_via_query,
6,
"time-travel to pre-merge-a must report 6 Persons via the query engine",
);
// Edge-traversal time-travel: Grace and her Knows(Grace → Eve) edge
// do not exist at pre_merge_a, so `friends_of(Grace)` must return 0
// even though Grace's row IS visible at later snapshots.
let pre_a_grace_friends = db
.query(
ReadTarget::Snapshot(pre_merge_a_snap_id.clone()),
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
pre_a_grace_friends.num_rows(),
0,
"friends_of(Grace) at pre-merge-a must return 0 — Grace's row predates the merge"
);
// ─────────────────────────────────────────────────────────────────
// Step 15: time-travel to pre-merge-b-version. Reads must show
// post-feat-a-merge state (Eve, Grace, Helen present) but NOT Frank.
// ─────────────────────────────────────────────────────────────────
let pre_b_snap = db.snapshot_at_version(pre_merge_b_version).await.unwrap();
let pre_b_persons = pre_b_snap
.open("node:Person")
.await
.unwrap()
.count_rows(None)
.await
.unwrap();
assert_eq!(
pre_b_persons, 9,
"time-travel to pre-merge-b must show 9 Persons (post-feat-a-merge + Helen, pre-feat-b-merge)"
);
// Frank does not exist at pre-merge-b (he was on feat-b only); a
// historical `get_person(Frank)` via the query engine must return 0.
let pre_b_frank_via_query = db
.query(
ReadTarget::Snapshot(pre_merge_b_snap_id.clone()),
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Frank")], &[]),
)
.await
.unwrap();
assert_eq!(
pre_b_frank_via_query.num_rows(),
0,
"Frank must not appear at pre-merge-b — his row only enters main when feat-b merges"
);
// Eve is present at pre-merge-b (feat-a already landed); the
// historical query must find her.
let pre_b_eve_via_query = db
.query(
ReadTarget::Snapshot(pre_merge_b_snap_id),
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Eve")], &[]),
)
.await
.unwrap();
assert_eq!(
pre_b_eve_via_query.num_rows(),
1,
"Eve must be findable at pre-merge-b — she landed on main during feat-a's merge"
);
// ─────────────────────────────────────────────────────────────────
// Step 16: query feat-b at its current head — feat-b is unchanged
// by main's merges; it still shows its own 7-row state.
// ─────────────────────────────────────────────────────────────────
assert_eq!(
count_rows_branch(&db, "feat-b", "node:Person").await,
7,
"feat-b's own snapshot must be unaffected by main's merge of feat-a"
);
// ─────────────────────────────────────────────────────────────────
// Step 17: a feature-side query exercises the read path on a branch
// whose base predates a completed merge (feat-b's base is pre-feat-a).
// ─────────────────────────────────────────────────────────────────
let frank_on_feat_b = query_branch(
&mut db,
"feat-b",
TEST_QUERIES,
"get_person",
&mixed_params(&[("$name", "Frank")], &[]),
)
.await
.unwrap();
assert!(
!frank_on_feat_b.batches().is_empty(),
"feat-b must still see its own Frank insert"
);
// ─────────────────────────────────────────────────────────────────
// Step 18: drop + reopen. Steady state — no recovery sidecars on
// disk, manifest replays cleanly, all branches and tables visible.
// ─────────────────────────────────────────────────────────────────
drop(db);
let db = Omnigraph::open(uri).await.unwrap();
assert_eq!(
count_rows(&db, "node:Person").await,
10,
"main Person count must persist across reopen"
);
assert_eq!(
count_rows(&db, "edge:Knows").await,
4,
"main Knows count must persist across reopen"
);
let branches = db.branch_list().await.unwrap();
assert!(
branches.iter().any(|b| b == "feat-a") && branches.iter().any(|b| b == "feat-b"),
"both feature branches must persist across reopen; got {:?}",
branches
);
// No recovery sidecars left behind by a clean flow.
let recovery_dir = std::path::Path::new(uri).join("__recovery");
let leftover_sidecars = if recovery_dir.exists() {
std::fs::read_dir(&recovery_dir).unwrap().count()
} else {
0
};
assert_eq!(
leftover_sidecars, 0,
"clean compositional flow must not leave recovery sidecars on disk"
);
// ─────────────────────────────────────────────────────────────────
// Step 19: post-reopen query-engine readback. Exercises the full
// read path (planner, indices, snapshot resolution) against the
// reopened engine — catches regressions where indices serialize
// correctly to disk but the reopened catalog can't bind them.
// ─────────────────────────────────────────────────────────────────
let mut db = db;
let post_reopen_total = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert_total(
&post_reopen_total,
10,
"post-reopen total_people must still report 10 Persons",
);
// Edge-traversal post-reopen: Grace's Knows(Grace → Eve) survived
// both the merge and the reopen as a queryable graph edge.
let graces_friends_post_reopen = query_main(
&mut db,
TEST_QUERIES,
"friends_of",
&mixed_params(&[("$name", "Grace")], &[]),
)
.await
.unwrap();
assert_eq!(
graces_friends_post_reopen.num_rows(),
1,
"friends_of(Grace) must traverse post-reopen — index + topology bound correctly"
);
}

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,4 @@
//! MR-793 Phase 3 — forbidden-API guard test.
//! Forbidden-API guard test.
//!
//! Engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) MUST NOT
//! call Lance's inline-commit data-write APIs directly. The
@ -29,15 +29,15 @@
//! the cross-table manifest commit. Documented exception.
//! - `crates/omnigraph/src/storage_layer.rs` — IS the trait module.
//!
//! ## Initial state (MR-793 Phase 3)
//! ## Transitional allow-list
//!
//! At the time this test was written, MR-793 has migrated three writers
//! (ensure_indices, branch_merge, schema_apply rewrites) onto staged
//! primitives. Other engine call sites (the bulk loader, exec/mutation,
//! exec/query, etc.) still use the legacy inherent `TableStore` methods
//! — they're not visible at the trait boundary, but they DO call lance
//! types. The allow-list below reflects this transitional state. Phase 9
//! tightens the allow-list as call sites migrate.
//! The migration of writers onto staged primitives is incremental.
//! Several writers (ensure_indices, branch_merge, schema_apply rewrites)
//! already route through the staged primitives; others (bulk loader,
//! exec/mutation, exec/query) still use the legacy inherent
//! `TableStore` methods — they're not visible at the trait boundary, but
//! they DO call lance types. The file-level allow-list below reflects
//! this transitional state and tightens as call sites migrate.
use std::path::{Path, PathBuf};
@ -99,6 +99,7 @@ const ALLOW_LIST_FILES: &[&str] = &[
"storage_layer.rs", // The trait module.
"commit_graph.rs", // Maintains `_graph_commits.lance` system table.
"graph_coordinator.rs", // Drives the manifest publisher / branch coordinator.
"recovery_audit.rs", // Maintains `_graph_commit_recoveries.lance` (recovery audit trail).
];
/// Directories exempt from the guard. Files under these paths may use
@ -202,7 +203,7 @@ fn engine_code_does_not_call_forbidden_lance_apis() {
if !violations.is_empty() {
panic!(
"MR-793 forbidden-API guard found {} violation(s) in engine code. \
"Forbidden-API guard found {} violation(s) in engine code. \
Engine code MUST route through the `TableStorage` trait (or its \
inherent counterparts on `TableStore`) instead of calling Lance's \
inline-commit APIs directly. If a use is genuinely justified, add \

View file

@ -1,5 +1,7 @@
#![allow(dead_code)]
pub mod recovery;
use arrow_array::{Array, RecordBatch, StringArray};
use futures::TryStreamExt;

View file

@ -0,0 +1,580 @@
use std::path::{Path, PathBuf};
use arrow_array::{Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use lance::Dataset;
use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{GraphCommit, Omnigraph, ReadTarget, SubTableEntry};
use omnigraph::error::{OmniError, Result};
use omnigraph_compiler::ir::ParamMap;
use serde::Deserialize;
const RECOVERY_ACTOR: &str = "omnigraph:recovery";
#[derive(Debug)]
pub enum RecoveryExpectation {
RolledForward { tables: Vec<TableExpectation> },
RolledBack { tables: Vec<TableExpectation> },
Deferred,
NoOp,
}
#[derive(Debug)]
pub struct TableExpectation {
pub table_key: String,
pub branch: Option<String>,
pub expected_lance_head: Option<u64>,
pub expected_main_manifest_pin: Option<u64>,
pub expected_recovery_parent_commit_id: Option<String>,
pub follow_up_mutation: Option<FollowUpMutation>,
}
#[derive(Debug)]
pub struct FollowUpMutation {
pub branch: String,
pub query_source: String,
pub query_name: String,
pub params: ParamMap,
}
#[derive(Debug, Clone)]
struct RecoveryAuditRow {
graph_commit_id: String,
recovery_kind: String,
operation_id: String,
sidecar_writer_kind: String,
per_table_outcomes: Vec<TableOutcome>,
}
#[derive(Debug, Clone, Deserialize)]
struct TableOutcome {
table_key: String,
from_version: u64,
to_version: u64,
}
impl TableExpectation {
pub fn main(table_key: impl Into<String>) -> Self {
Self::new(table_key, None::<String>)
}
pub fn branch(table_key: impl Into<String>, branch: impl Into<String>) -> Self {
Self::new(table_key, Some(branch))
}
pub fn new(table_key: impl Into<String>, branch: Option<impl Into<String>>) -> Self {
Self {
table_key: table_key.into(),
branch: branch.map(Into::into),
expected_lance_head: None,
expected_main_manifest_pin: None,
expected_recovery_parent_commit_id: None,
follow_up_mutation: None,
}
}
pub fn expected_lance_head(mut self, version: u64) -> Self {
self.expected_lance_head = Some(version);
self
}
pub fn expected_main_manifest_pin(mut self, version: u64) -> Self {
self.expected_main_manifest_pin = Some(version);
self
}
pub fn expected_recovery_parent_commit_id(mut self, commit_id: impl Into<String>) -> Self {
self.expected_recovery_parent_commit_id = Some(commit_id.into());
self
}
pub fn follow_up_mutation(mut self, mutation: FollowUpMutation) -> Self {
self.follow_up_mutation = Some(mutation);
self
}
}
impl FollowUpMutation {
pub fn new(
branch: impl Into<String>,
query_source: impl Into<String>,
query_name: impl Into<String>,
params: ParamMap,
) -> Self {
Self {
branch: branch.into(),
query_source: query_source.into(),
query_name: query_name.into(),
params,
}
}
}
pub fn single_sidecar_operation_id(repo_root: &Path) -> String {
let ids = sidecar_operation_ids(repo_root);
assert_eq!(
ids.len(),
1,
"expected exactly one recovery sidecar under __recovery/, got {:?}",
ids,
);
ids.into_iter().next().unwrap()
}
pub fn sidecar_operation_ids(repo_root: &Path) -> Vec<String> {
let dir = repo_root.join("__recovery");
if !dir.exists() {
return Vec::new();
}
let mut ids = std::fs::read_dir(&dir)
.unwrap()
.filter_map(|entry| {
let entry = entry.ok()?;
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
return None;
}
path.file_stem()
.and_then(|stem| stem.to_str())
.map(str::to_string)
})
.collect::<Vec<_>>();
ids.sort();
ids
}
pub async fn branch_head_commit_id(repo_root: &Path, branch: &str) -> Result<String> {
let graph = match branch {
"main" => CommitGraph::open(&repo_uri(repo_root)).await?,
branch => CommitGraph::open_at_branch(&repo_uri(repo_root), branch).await?,
};
graph.head_commit_id().await?.ok_or_else(|| {
OmniError::manifest_internal(format!("commit graph for branch {branch} has no head"))
})
}
pub async fn assert_post_recovery_invariants(
repo_root: &Path,
operation_id: &str,
expectation: RecoveryExpectation,
) -> Result<()> {
match expectation {
RecoveryExpectation::RolledForward { tables } => {
assert_sidecar_absent(repo_root, operation_id);
let audit = read_audit_row(repo_root, operation_id).await?;
assert_eq!(
audit.recovery_kind, "RolledForward",
"audit row for {operation_id} recorded the wrong recovery_kind",
);
assert_manifest_pins_match_lance_heads(repo_root, &tables).await?;
assert_audit_to_versions_match_lance_heads(repo_root, &audit, &tables).await?;
assert_recovery_commit_shape(repo_root, &audit, &tables).await?;
assert_non_main_did_not_move_main(repo_root, &tables).await?;
assert_idempotent_reopen(repo_root, operation_id).await?;
run_follow_up_mutations(repo_root, tables).await?;
}
RecoveryExpectation::RolledBack { tables } => {
assert_sidecar_absent(repo_root, operation_id);
let audit = read_audit_row(repo_root, operation_id).await?;
assert_eq!(
audit.recovery_kind, "RolledBack",
"audit row for {operation_id} recorded the wrong recovery_kind",
);
assert_rollback_outcomes_record_drift(&audit);
assert_recovery_commit_shape(repo_root, &audit, &tables).await?;
assert_non_main_did_not_move_main(repo_root, &tables).await?;
assert_idempotent_reopen(repo_root, operation_id).await?;
run_follow_up_mutations(repo_root, tables).await?;
}
RecoveryExpectation::Deferred => {
assert!(
sidecar_path(repo_root, operation_id).exists(),
"deferred recovery must leave sidecar {operation_id} on disk",
);
assert!(
read_audit_row(repo_root, operation_id).await.is_err(),
"deferred recovery must not record an audit row for {operation_id}",
);
}
RecoveryExpectation::NoOp => {
assert_sidecar_absent(repo_root, operation_id);
assert!(
read_audit_row(repo_root, operation_id).await.is_err(),
"no-op recovery must not record an audit row for {operation_id}",
);
}
}
Ok(())
}
fn branch_context(tables: &[TableExpectation]) -> Option<String> {
tables
.iter()
.filter_map(|table| table.branch.as_deref())
.find(|branch| *branch != "main")
.map(str::to_string)
}
fn sidecar_path(repo_root: &Path, operation_id: &str) -> PathBuf {
repo_root
.join("__recovery")
.join(format!("{operation_id}.json"))
}
fn assert_sidecar_absent(repo_root: &Path, operation_id: &str) {
assert!(
!sidecar_path(repo_root, operation_id).exists(),
"recovery sidecar {operation_id} must be deleted after successful recovery",
);
}
async fn assert_manifest_pins_match_lance_heads(
repo_root: &Path,
tables: &[TableExpectation],
) -> Result<()> {
let uri = repo_uri(repo_root);
let db = Omnigraph::open(&uri).await?;
for table in tables {
let (entry, lance_head) = entry_and_lance_head(&db, &uri, table).await?;
assert_eq!(
entry.table_version, lance_head,
"manifest pin for {} on {:?} must match Lance HEAD after roll-forward",
table.table_key, table.branch,
);
if let Some(expected) = table.expected_lance_head {
assert_eq!(
lance_head, expected,
"Lance HEAD for {} on {:?} did not match the test's expected value",
table.table_key, table.branch,
);
}
}
Ok(())
}
async fn assert_audit_to_versions_match_lance_heads(
repo_root: &Path,
audit: &RecoveryAuditRow,
tables: &[TableExpectation],
) -> Result<()> {
let uri = repo_uri(repo_root);
let db = Omnigraph::open(&uri).await?;
for table in tables {
let (_, lance_head) = entry_and_lance_head(&db, &uri, table).await?;
let outcome = audit
.per_table_outcomes
.iter()
.find(|outcome| outcome.table_key == table.table_key)
.ok_or_else(|| {
OmniError::manifest_internal(format!(
"audit row for {} has no outcome for {}",
audit.operation_id, table.table_key,
))
})?;
assert_eq!(
outcome.to_version, lance_head,
"audit to_version for {} must match the published Lance HEAD",
table.table_key,
);
}
Ok(())
}
/// For RolledBack outcomes, `from_version` records the Lance HEAD
/// observed BEFORE the restore (the actual drift) and `to_version`
/// records the manifest pin we restored to. If both equal, the audit
/// row is uninformative — operators cannot tell how far Lance HEAD
/// drifted from the manifest. This assertion catches any regression
/// that reverts `from_version` to `manifest_pinned`.
fn assert_rollback_outcomes_record_drift(audit: &RecoveryAuditRow) {
for outcome in &audit.per_table_outcomes {
assert!(
outcome.from_version > outcome.to_version,
"rollback outcome for {} must record drift via `from_version > to_version` \
(Lance HEAD before restore > manifest pin restored to); got from={}, to={}",
outcome.table_key,
outcome.from_version,
outcome.to_version,
);
}
}
async fn assert_non_main_did_not_move_main(
repo_root: &Path,
tables: &[TableExpectation],
) -> Result<()> {
let uri = repo_uri(repo_root);
let db = Omnigraph::open(&uri).await?;
let main = db.snapshot_of(ReadTarget::branch("main")).await?;
for table in tables {
let Some(expected) = table.expected_main_manifest_pin else {
continue;
};
let entry = main.entry(&table.table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"main snapshot has no entry for {}",
table.table_key,
))
})?;
assert_eq!(
entry.table_version, expected,
"non-main recovery for {} on {:?} must not move main's manifest pin",
table.table_key, table.branch,
);
}
Ok(())
}
async fn assert_recovery_commit_shape(
repo_root: &Path,
audit: &RecoveryAuditRow,
tables: &[TableExpectation],
) -> Result<()> {
let branch = branch_context(tables);
let expected_parent = expected_recovery_parent(tables)?;
let branch = branch.as_deref();
let commit = read_recovery_commit(repo_root, audit, branch).await?;
assert_eq!(
commit.actor_id.as_deref(),
Some(RECOVERY_ACTOR),
"recovery commit {} for operation {} must use actor {}",
commit.graph_commit_id,
audit.operation_id,
RECOVERY_ACTOR,
);
if let Some(expected_parent) = expected_parent {
assert_eq!(
commit.parent_commit_id.as_deref(),
Some(expected_parent.as_str()),
"recovery commit {} for operation {} recorded the wrong parent",
commit.graph_commit_id,
audit.operation_id,
);
}
if audit.sidecar_writer_kind == "BranchMerge" {
assert!(
commit.merged_parent_commit_id.is_some(),
"recovered BranchMerge must record merged_parent_commit_id",
);
if let Some(branch) = branch {
let graph = CommitGraph::open_at_branch(&repo_uri(repo_root), branch).await?;
let commits = graph.load_commits().await?;
let parent = commit.parent_commit_id.as_deref().ok_or_else(|| {
OmniError::manifest_internal(format!(
"recovered BranchMerge commit {} has no parent_commit_id",
commit.graph_commit_id,
))
})?;
assert!(
commits
.iter()
.any(|candidate| candidate.graph_commit_id == parent),
"recovered BranchMerge parent_commit_id {} is not on target branch {}",
parent,
branch,
);
}
}
Ok(())
}
fn expected_recovery_parent(tables: &[TableExpectation]) -> Result<Option<String>> {
let mut expected = None;
for table in tables {
let Some(candidate) = &table.expected_recovery_parent_commit_id else {
continue;
};
match &expected {
None => expected = Some(candidate.clone()),
Some(existing) if existing == candidate => {}
Some(existing) => {
return Err(OmniError::manifest_internal(format!(
"conflicting expected recovery parents in table expectations: {existing} vs {candidate}",
)));
}
}
}
Ok(expected)
}
async fn assert_idempotent_reopen(repo_root: &Path, operation_id: &str) -> Result<()> {
let before = matching_audit_rows(repo_root, operation_id).await?;
let uri = repo_uri(repo_root);
let _db = Omnigraph::open(&uri).await?;
assert_sidecar_absent(repo_root, operation_id);
let after = matching_audit_rows(repo_root, operation_id).await?;
assert_eq!(
after.len(),
before.len(),
"immediate reopen after recovery must be a clean no-op for {operation_id}",
);
Ok(())
}
async fn run_follow_up_mutations(repo_root: &Path, tables: Vec<TableExpectation>) -> Result<()> {
let mut db: Option<Omnigraph> = None;
for table in tables {
let Some(mutation) = table.follow_up_mutation else {
continue;
};
if db.is_none() {
db = Some(Omnigraph::open(&repo_uri(repo_root)).await?);
}
let db = db.as_mut().unwrap();
db.mutate(
&mutation.branch,
&mutation.query_source,
&mutation.query_name,
&mutation.params,
)
.await
.map_err(|err| {
OmniError::manifest_internal(format!(
"follow-up mutation {} on {} after recovery failed: {}",
mutation.query_name, table.table_key, err,
))
})?;
}
Ok(())
}
async fn entry_and_lance_head(
db: &Omnigraph,
root_uri: &str,
table: &TableExpectation,
) -> Result<(SubTableEntry, u64)> {
let branch = table.branch.as_deref().unwrap_or("main");
let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?;
let entry = snapshot
.entry(&table.table_key)
.ok_or_else(|| {
OmniError::manifest_internal(format!(
"snapshot for branch {branch} has no entry for {}",
table.table_key,
))
})?
.clone();
let lance_head = lance_head_for_entry(root_uri, &entry).await?;
Ok((entry, lance_head))
}
async fn lance_head_for_entry(root_uri: &str, entry: &SubTableEntry) -> Result<u64> {
let table_uri = format!("{}/{}", root_uri.trim_end_matches('/'), entry.table_path);
let ds = Dataset::open(&table_uri)
.await
.map_err(|err| OmniError::Lance(err.to_string()))?;
let ds = match entry.table_branch.as_deref() {
Some(branch) if branch != "main" => ds
.checkout_branch(branch)
.await
.map_err(|err| OmniError::Lance(err.to_string()))?,
_ => ds,
};
Ok(ds.version().version)
}
async fn read_recovery_commit(
repo_root: &Path,
audit: &RecoveryAuditRow,
branch: Option<&str>,
) -> Result<GraphCommit> {
let uri = repo_uri(repo_root);
let graph = match branch {
Some(branch) => CommitGraph::open_at_branch(&uri, branch).await?,
None => CommitGraph::open(&uri).await?,
};
graph
.load_commits()
.await?
.into_iter()
.find(|commit| commit.graph_commit_id == audit.graph_commit_id)
.ok_or_else(|| {
OmniError::manifest_internal(format!(
"recovery commit {} for operation {} was not found",
audit.graph_commit_id, audit.operation_id,
))
})
}
async fn read_audit_row(repo_root: &Path, operation_id: &str) -> Result<RecoveryAuditRow> {
let mut rows = matching_audit_rows(repo_root, operation_id).await?;
if rows.len() != 1 {
return Err(OmniError::manifest_internal(format!(
"expected exactly one recovery audit row for {operation_id}, got {}",
rows.len(),
)));
}
Ok(rows.remove(0))
}
async fn matching_audit_rows(
repo_root: &Path,
operation_id: &str,
) -> Result<Vec<RecoveryAuditRow>> {
let recoveries_dir = repo_root.join("_graph_commit_recoveries.lance");
if !recoveries_dir.exists() {
return Ok(Vec::new());
}
let ds = Dataset::open(recoveries_dir.to_str().unwrap())
.await
.map_err(|err| OmniError::Lance(err.to_string()))?;
let batches: Vec<RecordBatch> = ds
.scan()
.try_into_stream()
.await
.map_err(|err| OmniError::Lance(err.to_string()))?
.try_collect()
.await
.map_err(|err| OmniError::Lance(err.to_string()))?;
let mut rows = Vec::new();
for batch in batches {
let graph_commit_ids = string_column(&batch, "graph_commit_id")?;
let kinds = string_column(&batch, "recovery_kind")?;
let ops = string_column(&batch, "operation_id")?;
let writers = string_column(&batch, "sidecar_writer_kind")?;
let outcomes_json = string_column(&batch, "per_table_outcomes_json")?;
for row in 0..batch.num_rows() {
if ops.value(row) != operation_id {
continue;
}
let per_table_outcomes =
serde_json::from_str(outcomes_json.value(row)).map_err(|err| {
OmniError::manifest_internal(format!(
"failed to parse recovery audit outcomes for {operation_id}: {err}",
))
})?;
rows.push(RecoveryAuditRow {
graph_commit_id: graph_commit_ids.value(row).to_string(),
recovery_kind: kinds.value(row).to_string(),
operation_id: ops.value(row).to_string(),
sidecar_writer_kind: writers.value(row).to_string(),
per_table_outcomes,
});
}
}
Ok(rows)
}
fn string_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a StringArray> {
batch
.column_by_name(name)
.ok_or_else(|| {
OmniError::manifest_internal(format!("recovery audit batch missing '{name}'"))
})?
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
OmniError::manifest_internal(format!("recovery audit column '{name}' is not Utf8"))
})
}
fn repo_uri(repo_root: &Path) -> String {
repo_root.to_str().unwrap().to_string()
}

File diff suppressed because it is too large Load diff

View file

@ -397,8 +397,7 @@ async fn scan_with_staged_with_filter_silently_drops_staged_rows() {
If you're here because this assertion failed: either (a) Lance \
exposed a way to scan uncommitted fragments without stats-based \
pruning (good update to assert == [alice, carol, dave]), or \
(b) something changed in our scan_with_staged path. See PR #67 \
test fix discussion + .context/mr-794-step2-design.md §1.1."
(b) something changed in our scan_with_staged path."
);
// Without filter, staged data IS visible — confirms the issue is
@ -493,7 +492,7 @@ async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior
);
}
// ─── MR-793 Phase 2: stage_overwrite + scalar index staging ─────────────────
// ─── stage_overwrite + scalar index staging ─────────────────
/// `stage_overwrite` writes replacement fragments to object storage but
/// does NOT advance Lance HEAD until `commit_staged` runs. Mirrors
@ -663,11 +662,11 @@ async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
/// Pin the inline-commit behavior of `delete_where`. Lance 4.0.0 does
/// NOT expose a public `DeleteJob::execute_uncommitted`
/// (`pub(crate)` — see lance-format/lance#6658). MR-793 deliberately
/// (`pub(crate)` — see lance-format/lance#6658). The trait deliberately
/// does NOT introduce a `stage_delete` wrapper that would secretly
/// inline-commit (a side-channel — see design doc §3.2). Instead, the
/// trait keeps `delete_where` as the only delete entry point, named
/// honestly.
/// inline-commit (a side-channel between the staged and inline write
/// paths). Instead, the trait keeps `delete_where` as the only delete
/// entry point, named honestly.
///
/// **When Lance #6658 lands**: this test will need to flip — replace
/// the assertion with a `stage_delete` + `commit_staged` round-trip
@ -704,9 +703,10 @@ async fn delete_where_advances_head_inline_documents_residual() {
/// `create_vector_index`. Lance 4.0.0 vector indices take the
/// "segment commit path" which calls `build_index_metadata_from_segments`
/// (`pub(crate)` in lance-4.0.0 `src/index.rs:111`). Until upstream
/// exposes that helper (companion ticket to #6658), MR-793's trait
/// surface deliberately does NOT include `stage_create_vector_index` —
/// see design doc Appendix A.3.
/// exposes that helper (companion ticket to lance-format/lance#6658),
/// the trait surface deliberately does NOT include
/// `stage_create_vector_index` — same rationale as `stage_delete`'s
/// absence (no side-channel between staged and inline write paths).
#[tokio::test]
async fn create_vector_index_advances_head_inline_documents_residual() {
use arrow_array::FixedSizeListArray;
@ -758,3 +758,191 @@ async fn create_vector_index_advances_head_inline_documents_residual() {
);
assert!(store.has_vector_index(&ds, "embedding").await.unwrap());
}
/// Empirical pin of `Dataset::restore` semantics for the recovery sweep.
///
/// The recovery sweep depends on the `restore` invariant: from HEAD =
/// `h`, calling `Dataset::checkout_version(p).await?` then
/// `Dataset::restore().await?` produces a NEW commit at HEAD = `h + 1`
/// with content == content at version `p`.
///
/// The Lance source confirms this — `restore()` (no args) takes the
/// currently-checked-out version's content and applies it via
/// `apply_commit` against the latest manifest, advancing HEAD by one.
/// See lance-4.0.0 `src/dataset.rs:1106` and the transaction-spec
/// example at https://lance.org/format/table/transaction/.
///
/// If the lance bump (4.0.0 → 4.x) ever changes this delta or the call
/// signature, the recovery sweep's rollback path breaks; this test
/// surfaces the regression at compile/test time rather than under
/// production drift recovery.
#[tokio::test]
async fn lance_restore_appends_one_commit_with_checked_out_content() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Build version history: v1 = {alice}, v2 = {alice, bob}, v3 = {alice, bob, carol}.
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
assert_eq!(ds.version().version, 1);
store
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
assert_eq!(ds.version().version, 2);
store
.append_batch(&uri, &mut ds, person_batch(&[("carol", Some(40))]))
.await
.unwrap();
assert_eq!(ds.version().version, 3);
let head_before = ds.version().version;
// Recovery's rollback shape: open + checkout(p) + restore().
let head_ds = Dataset::open(&uri).await.unwrap();
let mut to_restore = head_ds.checkout_version(1).await.unwrap();
assert_eq!(to_restore.manifest.version, 1);
to_restore.restore().await.unwrap();
// Verify against a fresh open — the previous handle's view doesn't
// tell us what other openers see.
let post = Dataset::open(&uri).await.unwrap();
assert_eq!(
post.version().version,
head_before + 1,
"Dataset::restore must append exactly one commit (HEAD + 1). If \
this assertion fires, lance changed restore semantics re-read \
lance src/dataset.rs::restore and update the recovery sweep's \
rollback path before proceeding."
);
// Content equality: the restored HEAD must match version 1 (just alice).
let scanner = post.scan();
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(
ids,
vec!["alice".to_string()],
"post-restore content must equal version 1's content; got {:?}",
ids,
);
}
/// Empirical pin of the `Dataset::restore` concurrency hazard that
/// motivates the recovery sweep's open-time-only invocation strategy
/// and any future continuous-recovery reconciler's queue-acquisition
/// requirement.
///
/// `Dataset::restore`'s `check_restore_txn` (lance-4.0.0
/// `src/io/commit/conflict_resolver.rs:986`) returns `Ok(())` against
/// almost every other op (Append, Update, Delete, CreateIndex, Merge, …),
/// so a Restore commits successfully even with concurrent commits in
/// flight. The symmetric checks (lines 318, 473, 634, 787, 853, 947, 978,
/// 1018, 1059, 1115, 1187, 1280) classify Restore as incompatible from
/// the *other* op's POV — but the *other* op already committed before the
/// Restore arrived, so it sees no conflict. Net: the Restore appends a
/// rewind commit AFTER the legitimate concurrent Append, silently
/// orphaning that Append's data from the active timeline.
///
/// The recovery sweep sidesteps this by running only at `Omnigraph::open`
/// (before any other writers can race). A future continuous-recovery
/// reconciler must acquire per-(table_key, branch) queues for sidecar
/// tables before invoking restore — otherwise this hazard becomes
/// reachable during in-flight tenant traffic.
///
/// This test is the load-bearing constraint any future reconciler must
/// honor.
#[tokio::test]
async fn lance_restore_loses_to_concurrent_append_via_orphaning() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// v1: seed with alice.
let _ = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
// Recovery handle: opened at the latest, then checked out at v1 (the
// pin we'd "rollback" to in a real recovery scenario). This handle
// has NOT yet called restore.
let recovery_open = Dataset::open(&uri).await.unwrap();
let mut recovery_handle = recovery_open.checkout_version(1).await.unwrap();
// Concurrent legitimate writer: appends bob, advancing HEAD to v2.
// This simulates a per-table-queue model where another tenant wrote
// between recovery's open and recovery's restore call.
let mut writer_handle = Dataset::open(&uri).await.unwrap();
store
.append_batch(&uri, &mut writer_handle, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
assert_eq!(writer_handle.version().version, 2);
// Recovery now restores. Because restore's `check_restore_txn` returns
// Ok against Append, this commits at v3 with content == v1 (just alice).
recovery_handle.restore().await.unwrap();
// Re-open and inspect: HEAD is v3, content is just alice. Bob is gone
// from the active timeline.
let post = Dataset::open(&uri).await.unwrap();
assert_eq!(
post.version().version,
3,
"Restore commits at HEAD+1 even when a concurrent commit landed \
between recovery's open and recovery's restore call. If this \
assertion fails, lance changed restore-vs-append conflict \
semantics re-read check_restore_txn and update the recovery \
sweep's concurrency analysis."
);
let scanner = post.scan();
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(
ids,
vec!["alice".to_string()],
"Concurrent Append's row 'bob' was silently orphaned by the \
Restore. Active-timeline contents == v1's contents. The recovery \
sweep sidesteps this hazard via open-time-only invocation; any \
future continuous-recovery reconciler must guard against it via \
per-(table, branch) queue acquisition. Got: {:?}",
ids,
);
// Sanity: bob's commit IS still readable via explicit checkout_version(2).
// The data isn't gone from disk — it's just unreachable from HEAD until
// cleanup_old_versions reclaims the orphan.
let v2 = Dataset::open(&uri)
.await
.unwrap()
.checkout_version(2)
.await
.unwrap();
let v2_batches: Vec<RecordBatch> = v2
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let v2_ids = collect_ids(&v2_batches);
assert_eq!(v2_ids, vec!["alice".to_string(), "bob".to_string()]);
}

View file

@ -55,3 +55,9 @@ Filtered from `branch_list()` but visible to internals:
- `__schema_apply_lock__` — serializes schema migrations.
- `__run__<run-id>` — legacy from the pre-v0.4.0 Run state machine (removed in MR-771). The branch-name guard predicate `is_internal_run_branch` is kept as defense-in-depth so users cannot create a branch matching the legacy prefix; the filter will be removed once production legacy branches are swept (MR-770).
## L2 — Recovery audit trail
The four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) protect their multi-table commits with a sidecar at `__recovery/{ulid}.json` written before Phase B and deleted after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: classify per-table state, decide all-or-nothing per sidecar, roll forward / back, record an audit row.
Audit rows live in `_graph_commit_recoveries.lance` (sibling to `_graph_commits.lance`) and reference the commit graph by `graph_commit_id`. The linked recovery commit is identified by that same `graph_commit_id`, and `actor_id="omnigraph:recovery"` is stored in `_graph_commit_actors.lance` (joined by `graph_commit_id`) — `_graph_commits.lance` itself does not carry the `actor_id` column. To find recoveries for a specific original actor: `omnigraph commit list --filter actor=omnigraph:recovery`, then join to `_graph_commit_recoveries.lance` by `graph_commit_id` to read `recovery_for_actor`. Schema: see `crates/omnigraph/src/db/recovery_audit.rs`.

View file

@ -105,13 +105,13 @@ These are user-visible commitments. They state what the engine guarantees and wh
Specific defaults (timeout values, memory caps, TTL windows) are *configuration*, not invariants — see [docs/constants.md](constants.md) and per-deployment configuration. The invariant is that bounds and contracts exist, not their numerical values.
23. **Atomicity is per-query.** Every `.gq` query is atomic — multi-statement mutations are all-or-nothing via the substrate's atomic-commit primitive. No cross-query `BEGIN`/`COMMIT`; branches and merges fill that role for agent workflows.
*Status: upheld at the writer-trait surface for inserts / updates / scalar-index builds / merge_insert / overwrite after MR-793 PR #70 — the sealed `TableStorage` trait routes those through `stage_*` + `commit_staged`, so a Phase A failure (between writing fragments and committing) leaves no Lance-HEAD drift on touched tables. **Per-table commit_staged → manifest publish window remains** — a failure between commits across multiple touched tables can leave drift on the partially-committed tables. Lance has no multi-dataset atomic commit primitive; closing this requires the recovery-on-open reconciler tracked in MR-847. Additionally, two writer paths still inline-commit pending upstream Lance work: `delete_where` (lance-format/lance#6658) and `create_vector_index` (lance-format/lance#6666).*
*Status: upheld at the writer-trait surface, across process boundaries, AND in-process for the common case — the sealed `TableStorage` trait routes inserts / updates / scalar-index builds / merge_insert / overwrite through `stage_*` + `commit_staged` (Phase A is drift-free); the open-time recovery sweep in `db/manifest/recovery.rs` (sidecars at `__recovery/{ulid}.json` written by `MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) closes the per-table commit_staged → manifest publish residual on the next `Omnigraph::open`; and `Omnigraph::refresh` runs roll-forward-only recovery in-process so long-running servers close the common case (mutation/load finalize → publisher failure) without restart. The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures, recoverable across process boundaries for all writer kinds, and recoverable in-process for roll-forward-eligible sidecars. Sidecars that would require `Dataset::restore` are deferred to the next ReadWrite open (restore unsafe under concurrency); continuous in-process recovery for that case requires per-(table, branch) writer-queue acquisition and is the goal of a future background reconciler. Two writer paths still inline-commit pending upstream Lance work: `delete_where` (lance-format/lance#6658) and `create_vector_index` (lance-format/lance#6666).*
24. **Schema integrity is strict at commit.** Type validation, required-field presence (auto-filled from `@default` if declared), uniqueness across batches and versions, and referential integrity — all enforced before commit succeeds. Per-write softening flags are opt-in, never default.
*Status: aspirational — referential integrity at scale requires SIP-backed cross-table validation; not yet implemented. Cross-batch / cross-version uniqueness tracked in MR-714.*
25. **Isolation: per-query snapshot; read-your-writes within and across queries in a session.** Each query reads from one consistent manifest version. Within a multi-statement mutation, the read subplan inside each write operator sees the writes from earlier statements. Across queries in a session, reads always resolve the latest manifest version — no reader pinning to older snapshots.
*Status: upheld for inserts/updates after MR-794 step 2+`MutationStaging`'s in-memory accumulator + `TableStore::scan_with_pending` (DataFusion `MemTable` union with the committed Lance scan, with merge-shadow semantics for chained updates) implements read-your-writes within a multi-statement mutation. Delete-touching mutations are limited to delete-only by parse-time D₂; closing the within-query RYW gap for deletes requires Lance's two-phase delete API (tracked: MR-793 / Lance-upstream lance-format/lance#6658). The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures (the partial-failure test pins this), but a narrowed residual remains at the finalize→publisher boundary because Lance has no multi-dataset commit primitive — see [docs/runs.md](runs.md) "Finalize → publisher residual".*
*Status: upheld for inserts/updates — `MutationStaging`'s in-memory accumulator + `TableStore::scan_with_pending` (DataFusion `MemTable` union with the committed Lance scan, with merge-shadow semantics for chained updates) implements read-your-writes within a multi-statement mutation. Delete-touching mutations are limited to delete-only by parse-time D₂; closing the within-query RYW gap for deletes requires Lance's two-phase delete API (Lance-upstream lance-format/lance#6658). The "Lance HEAD ahead of `__manifest`" drift class is unreachable for op-execution failures (the partial-failure test pins this), and the narrower finalize→publisher residual is closed across one open cycle by the open-time recovery sweep — see [docs/runs.md](runs.md) "Open-time recovery sweep".*
26. **Durability before acknowledgement.** Commit returns only after the substrate has confirmed durable persistence. No "fast" or "fire-and-forget" durability levels.

View file

@ -16,6 +16,7 @@
- `CleanupPolicyOptions { keep_versions: Option<u32>, older_than: Option<Duration> }` — at least one is required.
- Returns `[TableCleanupStats { table_key, bytes_removed, old_versions_removed }]`.
- CLI guards with `--confirm`; without it, prints a preview line.
- **Recovery floor:** `--keep < 3` may garbage-collect Lance versions that the open-time recovery sweep needs as a rollback target (the sweep restores to the branch's manifest-pinned table version, which is HEAD-1 in the typical Phase B → Phase C drift case). Default `--keep 10` is safe.
## Tombstones

View file

@ -130,7 +130,7 @@ will replace it. Operator-driven (rare in agent workloads); document
permanently until Lance exposes `Operation::Overwrite { fragments }` as
a two-phase op.
### Finalize → publisher residual
### Open-time recovery sweep
The staged-write rewire eliminates one drift class **by construction at
the writer layer**: an op that fails before pushing to the in-memory
@ -139,26 +139,85 @@ rejection) leaves Lance HEAD untouched on every staged table. This is
the case the `partial_failure_leaves_target_queryable_and_unblocks_next_mutation`
test pins.
A second, narrower drift class remains. `MutationStaging::finalize`
runs `stage_*` + `commit_staged` per touched table sequentially, then
the publisher commits the manifest. Lance has no multi-dataset atomic
commit, so the per-table `commit_staged` calls are independent
operations: if commit_staged on table N+1 fails *after* commit_staged
on tables 1..N succeeded, or if the publisher's CAS pre-check rejects
*after* every commit_staged succeeded, tables 1..N are left at
`Lance HEAD = manifest_pinned + 1`. The next mutation against those
tables surfaces `ManifestConflictDetails::ExpectedVersionMismatch`
the same loud failure mode the rewire was designed to make rare, just
no longer "unreachable."
A second, narrower drift class — the **finalize → publisher window**
is closed across one open cycle by the open-time recovery sweep:
Triggers: transient Lance write errors during finalize (object-store
retry budget exhaustion, disk full); persistent publisher contention
exceeding `PUBLISHER_RETRY_BUDGET = 5` retries. Closing this requires
either a Lance multi-dataset atomic-commit primitive (filed upstream
alongside the two-phase delete request) or a manifest-layer journal
that replays staged commits on next open. Both are heavyweight; the
v1 stance is "narrowed window, documented residual, surface the loud
error when it fires."
`MutationStaging::finalize` runs `stage_*` + `commit_staged` per touched
table sequentially, then the publisher commits the manifest. Lance has
no multi-dataset atomic commit, so the per-table `commit_staged` calls
are independent operations: if commit_staged on table N+1 fails *after*
commit_staged on tables 1..N succeeded, or if the publisher's CAS
pre-check rejects *after* every commit_staged succeeded, tables 1..N
are left at `Lance HEAD = manifest_pinned + 1`.
**Recovery protocol** (lifecycle of every staged-write writer —
`MutationStaging::finalize`, `schema_apply::apply_schema_with_lock`,
`branch_merge_on_current_target`, `ensure_indices_for_branch`):
1. **Phase A**: writer writes a sidecar JSON to
`__recovery/{ulid}.json` BEFORE its first `commit_staged`. The
sidecar names every `(table_key, table_path, expected_version,
post_commit_pin)` it intends to commit + the writer kind +
actor_id.
2. **Phase B**: writer's per-table `commit_staged` loop runs.
3. **Phase C**: publisher commits the manifest.
4. **Phase D**: writer deletes the sidecar.
> **Phase letter convention.** Throughout the recovery code, log
> messages, failpoint names (e.g. `branch_merge.post_phase_b_pre_manifest_commit`),
> and the per-writer integration tests, "Phase A/B/C/D" refers
> exclusively to the four-step lifecycle above. The per-table
> staged-write contract (`stage_*` then `commit_staged`, two steps)
> is referred to by those API verbs — never by phase letters — so a
> reader of `recovery.rs`, `failpoints.rs`, or this document only
> encounters phase letters in the per-writer context.
A failure between Phase A and Phase D leaves the sidecar on disk. The
next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the
recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`:
- For each sidecar in `__recovery/`, compare every named table's
Lance HEAD to the manifest pin. Classify per the all-or-nothing
decision tree (RolledPastExpected / NoMovement / UnexpectedAtP1 /
UnexpectedMultistep / InvariantViolation).
- If any table is `InvariantViolation` (Lance HEAD < manifest pinned
should be impossible), **abort** with a loud error and leave the
sidecar on disk for operator review.
- Otherwise, if every table is `RolledPastExpected`, **roll forward**:
a single `ManifestBatchPublisher::publish` call extends every pin
atomically. `SchemaApply` sidecars are eligible only when schema-state
recovery promoted the matching staging files in the same recovery pass;
otherwise full open-time recovery rolls them back and refresh-time
recovery leaves them for the next read-write open.
- Otherwise **roll back**: per-table `Dataset::restore` to the
manifest-pinned table version for that branch. Rollback records the
actual restore target in the audit row's `to_version`.
- After a successful roll-forward or roll-back, an audit row is
recorded — `_graph_commits.lance` carries
a commit tagged `actor_id = "omnigraph:recovery"`, and a sibling
`_graph_commit_recoveries.lance` row carries `recovery_kind`,
`recovery_for_actor` (the original sidecar's actor), `operation_id`,
per-table outcomes. Operators run `omnigraph commit list --filter
actor=omnigraph:recovery` to find recoveries.
- Sidecar deleted as the final step.
Triggers for the residual: transient Lance write errors during finalize
(object-store retry budget exhaustion, disk full); persistent publisher
contention exceeding `PUBLISHER_RETRY_BUDGET = 5` retries.
**Long-running servers**: `Omnigraph::refresh` runs roll-forward-only
recovery in-process — the common Phase B → Phase C residual closes
without a restart. The next mutation on the same handle (after refresh)
no longer surfaces `ExpectedVersionMismatch` for the failed table.
Sidecars that would require a `Dataset::restore` (mixed / unexpected
state) are deferred to the next `OpenMode::ReadWrite` open: restore is
unsafe under concurrency because Lance's `check_restore_txn` accepts
the restore against in-flight Append/Update/Delete commits and
silently orphans them (pinned by
`tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning`).
Continuous in-process recovery for the rollback path is the goal of a
future background reconciler with per-(table, branch) writer-queue
acquisition.
The publisher-CAS contract is unchanged: a *concurrent writer* that
advances any of our touched tables between snapshot capture and

View file

@ -62,13 +62,15 @@ flowchart TB
manifest["__manifest/<br/>L2 catalog of sub-tables"]:::l2
nodes["nodes/{fnv1a64-hex}/<br/>one dataset per node type"]:::l2
edges["edges/{fnv1a64-hex}/<br/>one dataset per edge type"]:::l2
cgraph["_graph_commits.lance/<br/>_graph_commit_actors.lance/"]:::l2
cgraph["_graph_commits.lance/<br/>_graph_commit_actors.lance/<br/>_graph_commit_recoveries.lance/"]:::l2
recovery["__recovery/{ulid}.json<br/>recovery sidecars (transient)"]:::l2
refs["_refs/branches/{name}.json<br/>graph-level branches"]:::l2
repo --> manifest
repo --> nodes
repo --> edges
repo --> cgraph
repo --> recovery
repo --> refs
subgraph dataset[Inside each Lance dataset — L1]
@ -90,6 +92,8 @@ flowchart TB
- **`__manifest/`** is a Lance dataset whose rows describe which sub-table version is published at which graph-branch. Reading a snapshot starts here.
- **`nodes/`** and **`edges/`** are sibling directories holding one Lance dataset per declared type. Names are `fnv1a64-hex` of the type name to keep paths fixed-length and case-safe.
- **`_graph_commits.lance`** is an L2 dataset that records the graph-level commit DAG, with a paired `_graph_commit_actors.lance` for the actor map. (Pre-v0.4.0 repos also have inert `_graph_runs.lance` / `_graph_run_actors.lance` from the removed Run state machine; MR-770 sweeps these in production.)
- **`_graph_commit_recoveries.lance`** — one row per recovery sweep action. Joined to `_graph_commits.lance` by `graph_commit_id`; the linked commit row carries `actor_id=omnigraph:recovery`. Operators correlate recoveries with the original mutations they rolled forward / back via this join. See `crates/omnigraph/src/db/recovery_audit.rs`.
- **`__recovery/{ulid}.json`** — transient sidecar files written by the four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) before Phase B begins, deleted after Phase C succeeds. A sidecar persisting after process exit means the writer crashed in the Phase B → Phase C window; the next `Omnigraph::open` recovery sweep processes it. Steady-state directory is empty. See `crates/omnigraph/src/db/manifest/recovery.rs`.
- **`_refs/branches/{name}.json`** is graph-level branch metadata — pointers from a branch name to the manifest version it heads.
- **Inside each Lance dataset** (orange): the standard Lance directory layout. `_versions/{n}.manifest` records every commit; `data/` holds the actual Arrow fragments; `_indices/{uuid}/` holds index segments with their own `fragment_bitmap` for partial coverage; `_refs/` holds Lance-native per-dataset branches and tags.

View file

@ -32,7 +32,9 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
| `export.rs` | NDJSON streaming export filters |
| `s3_storage.rs` | S3-backed repo (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) |
| `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature) |
| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the four per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`). |
| `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path |
| `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories). |
## Fixtures