mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
recovery: address PR #72 review findings
Bot reviewers (cubic, cursor, chatgpt-codex) caught 4 merge-blocking bugs + 3 strongly-recommended fixes + 3 doc errors in the initial PR. Each fix has a paired test demonstrating the bug before the fix. Merge-blocking fixes: - BranchMerge moved to loose-match classifier arm. publish_rewritten_ merge_table runs multiple commit_staged calls per table (merge_insert + delete_where + index rebuilds). Strict classification rolled back valid completed Phase B work as UnexpectedMultistep. Three new unit tests pin the loose-match behavior for BranchMerge. - branch_merge sidecar uses self.active_branch() (the resolved target branch) instead of inferring from the first sorted table key. The previous heuristic could record None (== main) when the merge target was a non-main branch, causing recovery to publish to the wrong manifest namespace. - Best-effort sidecar delete in all 5 writer sites (mutation, loader, schema_apply, branch_merge, ensure_indices). Previously, a sidecar cleanup failure after a successful manifest publish would error out the user's call for a write that already landed. Now: log a warning and ignore — the next open's recovery sweep tidies the stale sidecar via NoMovement classification. - ensure_indices sidecar scoped to tables that need work via new helpers needs_index_work_node / needs_index_work_edge. Previously the sidecar pinned every catalog table; if only one needed indexing, the others classified as NoMovement and the all-or-nothing decision rolled back legitimate index work. Strongly-recommended fixes: - recover_manifest_drift now takes &mut GraphCoordinator and refreshes between sidecars. Sidecar B's classification needs to see sidecar A's manifest changes, otherwise B can be classified against stale pins and incorrectly roll back work that just landed. - list_sidecars sorts URIs before reading. Sidecar filenames are ULIDs (chronologically sortable), so this gives deterministic, time-ordered processing. Filesystem-order was nondeterministic. - ReadOnly opens skip recover_schema_state_files too (was: only the MR-847 sweep was gated). Read-only consumers may run with read-only credentials; silent open-time mutations violate the contract. Doc cleanups: - Removed stale "Phase 4 placeholder" comment from recover_manifest_drift. - docs/runs.md decision-tree wording now correctly surfaces the InvariantViolation abort path. - docs/branches-commits.md clarifies actor_id is in _graph_commit_actors.lance (joined by graph_commit_id), not on _graph_commits.lance itself. Test surface (post-fixes): - 25 unit tests in db::manifest::recovery (+4 from this commit). - 10 integration tests in tests/recovery.rs (+3 from this commit). - ~672 tests across ~25 binaries pass with --features failpoints. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
932334ba01
commit
164bafbbe7
10 changed files with 477 additions and 69 deletions
|
|
@ -47,6 +47,7 @@ use serde::{Deserialize, Serialize};
|
|||
use tracing::warn;
|
||||
|
||||
use crate::db::commit_graph::CommitGraph;
|
||||
use crate::db::graph_coordinator::GraphCoordinator;
|
||||
use crate::db::recovery_audit::{
|
||||
now_micros, RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome,
|
||||
};
|
||||
|
|
@ -264,7 +265,14 @@ pub(crate) async fn list_sidecars(
|
|||
storage: &dyn StorageAdapter,
|
||||
) -> Result<Vec<RecoverySidecar>> {
|
||||
let dir = recovery_dir_uri(root_uri);
|
||||
let uris = storage.list_dir(&dir).await?;
|
||||
let mut uris = storage.list_dir(&dir).await?;
|
||||
// Sort by URI so the sweep processes sidecars deterministically.
|
||||
// Sidecar filenames are ULIDs, which are lexicographically sortable
|
||||
// === chronologically sortable; the older sidecar is processed
|
||||
// before the newer one. Without this sort, `list_dir` returns
|
||||
// filesystem-order results which are nondeterministic and can mask
|
||||
// ordering-sensitive bugs. (PR #72 review.)
|
||||
uris.sort();
|
||||
let mut out = Vec::with_capacity(uris.len());
|
||||
for uri in uris {
|
||||
// Skip non-JSON files defensively; the directory is ours but a
|
||||
|
|
@ -315,14 +323,15 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result<RecoverySid
|
|||
/// Classify one table's observed state vs. the sidecar's intent.
|
||||
///
|
||||
/// `kind` adjusts the precision of the `RolledPastExpected` predicate:
|
||||
/// - **Strict** (`Mutation`, `Load`, `BranchMerge`): exactly one
|
||||
/// `commit_staged` per table, so `lance_head == manifest_pinned + 1`
|
||||
/// AND `post_commit_pin == lance_head` is required.
|
||||
/// - **Loose** (`SchemaApply`, `EnsureIndices`): the writer may run
|
||||
/// N ≥ 1 `commit_staged` calls per table (one per index built + one
|
||||
/// for the overwrite, etc.) and the exact N is hard to compute at
|
||||
/// sidecar-write time. The loose match accepts any
|
||||
/// `lance_head > manifest_pinned` as `RolledPastExpected` when
|
||||
/// - **Strict** (`Mutation`, `Load`): exactly one `commit_staged` per
|
||||
/// table, so `lance_head == manifest_pinned + 1` AND
|
||||
/// `post_commit_pin == lance_head` is required.
|
||||
/// - **Loose** (`SchemaApply`, `EnsureIndices`, `BranchMerge`): the
|
||||
/// writer may run N ≥ 1 `commit_staged` calls per table (one per
|
||||
/// index built + one for the overwrite, etc.; merge tables run
|
||||
/// merge_insert + delete_where + index rebuilds) and the exact N
|
||||
/// is hard to compute at sidecar-write time. The loose match accepts
|
||||
/// any `lance_head > manifest_pinned` as `RolledPastExpected` when
|
||||
/// `pin.expected_version == manifest_pinned` (the writer's CAS
|
||||
/// target matches what the manifest currently shows). The risk this
|
||||
/// admits — an external agent advancing HEAD between sidecar write
|
||||
|
|
@ -344,10 +353,7 @@ pub(crate) fn classify_table(
|
|||
return NoMovement;
|
||||
}
|
||||
// lance_head > manifest_pinned
|
||||
let strict = matches!(
|
||||
kind,
|
||||
SidecarKind::Mutation | SidecarKind::Load | SidecarKind::BranchMerge,
|
||||
);
|
||||
let strict = matches!(kind, SidecarKind::Mutation | SidecarKind::Load);
|
||||
if strict {
|
||||
if lance_head == manifest_pinned + 1 {
|
||||
if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head {
|
||||
|
|
@ -440,12 +446,6 @@ fn fragment_ids(ds: &Dataset) -> Vec<u64> {
|
|||
/// roll-forward (every table eligible), roll-back (mixed or unexpected
|
||||
/// state), or abort (invariant violation).
|
||||
///
|
||||
/// **Phase 3 scope** (this commit): roll-back path is fully implemented;
|
||||
/// roll-forward errors out with a "Phase 4" placeholder so the
|
||||
/// open-time wiring + sidecar I/O + classification + decision dispatch
|
||||
/// can land independently of the audit/manifest-publish work. Tests
|
||||
/// exercising the end-to-end roll-forward path land alongside Phase 4.
|
||||
///
|
||||
/// Idempotency: a crash mid-sweep leaves the sidecar (deletion is the
|
||||
/// final step). Re-opening re-classifies; the fragment-set short-circuit
|
||||
/// in [`restore_table_to_version`] prevents version pile-up under
|
||||
|
|
@ -460,16 +460,26 @@ fn fragment_ids(ds: &Dataset) -> Vec<u64> {
|
|||
pub(crate) async fn recover_manifest_drift(
|
||||
root_uri: &str,
|
||||
storage: &dyn StorageAdapter,
|
||||
snapshot: &Snapshot,
|
||||
coordinator: &mut GraphCoordinator,
|
||||
) -> Result<()> {
|
||||
let sidecars = list_sidecars(root_uri, storage).await?;
|
||||
if sidecars.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// PR #72 review (chatgpt-codex + cubic): refresh the coordinator
|
||||
// snapshot BEFORE each sidecar's classification. Sidecar N's
|
||||
// roll-forward writes manifest changes that sidecar N+1 must
|
||||
// observe, otherwise sidecar N+1 classifies its tables against
|
||||
// stale pins and may incorrectly roll back work that landed
|
||||
// moments earlier. Refresh is cheap (one Lance manifest read).
|
||||
for sidecar in sidecars {
|
||||
process_sidecar(root_uri, storage, snapshot, &sidecar).await?;
|
||||
coordinator.refresh().await?;
|
||||
let snapshot = coordinator.snapshot();
|
||||
process_sidecar(root_uri, storage, &snapshot, &sidecar).await?;
|
||||
}
|
||||
// Final refresh so the caller sees the post-sweep state.
|
||||
coordinator.refresh().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -886,6 +896,40 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
/// PR #72 review (cubic + cursor) flagged that BranchMerge is in
|
||||
/// the strict classifier set, but `publish_rewritten_merge_table`
|
||||
/// runs multiple `commit_staged` calls per table (merge_insert +
|
||||
/// delete_where + index rebuilds — the comment in `merge.rs`
|
||||
/// explicitly says so). Strict classification rolls back valid
|
||||
/// completed Phase B work as `UnexpectedMultistep`. BranchMerge
|
||||
/// must be loose-matched like SchemaApply / EnsureIndices.
|
||||
#[test]
|
||||
fn classify_loose_match_accepts_multi_commit_drift_for_branch_merge() {
|
||||
let pin = make_pin("node:Person", "irrelevant", 5, 6);
|
||||
assert_eq!(
|
||||
classify_table(&pin, 8, 5, SidecarKind::BranchMerge),
|
||||
TableClassification::RolledPastExpected,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_loose_match_branch_merge_no_movement_unchanged() {
|
||||
let pin = make_pin("node:Person", "irrelevant", 5, 6);
|
||||
assert_eq!(
|
||||
classify_table(&pin, 5, 5, SidecarKind::BranchMerge),
|
||||
TableClassification::NoMovement,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_loose_match_branch_merge_invariant_violation_unchanged() {
|
||||
let pin = make_pin("node:Person", "irrelevant", 5, 6);
|
||||
assert_eq!(
|
||||
classify_table(&pin, 3, 5, SidecarKind::BranchMerge),
|
||||
TableClassification::InvariantViolation { observed: 3 },
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decide_roll_forward_when_all_classifications_match() {
|
||||
let cls = vec![
|
||||
|
|
@ -1046,4 +1090,45 @@ mod tests {
|
|||
.unwrap();
|
||||
assert!(result.is_empty());
|
||||
}
|
||||
|
||||
/// PR #72 review (cubic) flagged that `list_dir` returns
|
||||
/// filesystem-order results, making sidecar processing
|
||||
/// nondeterministic. Sidecar filenames are ULIDs (lexicographically
|
||||
/// sortable === chronologically sortable), so sorting by URI gives
|
||||
/// deterministic, time-ordered processing — the older sidecar
|
||||
/// processed before the newer one.
|
||||
#[tokio::test]
|
||||
async fn list_sidecars_returns_deterministic_order() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap();
|
||||
let storage = LocalStorageAdapter::default();
|
||||
let root = dir.path().to_str().unwrap();
|
||||
|
||||
// Write sidecars in REVERSE chronological order (newest first).
|
||||
// The classifier shouldn't care, but the sweep needs deterministic
|
||||
// processing for reproducibility.
|
||||
let ids = ["01H000000000000000000000ZZ", "01H000000000000000000000MM", "01H000000000000000000000AA"];
|
||||
for id in &ids {
|
||||
let sc = new_sidecar(
|
||||
SidecarKind::Mutation,
|
||||
None,
|
||||
None,
|
||||
vec![make_pin("node:Person", "/dev/null/x.lance", 1, 2)],
|
||||
);
|
||||
// Override operation_id to use our deterministic ID.
|
||||
let mut sc = sc;
|
||||
sc.operation_id = id.to_string();
|
||||
write_sidecar(root, &storage, &sc).await.unwrap();
|
||||
}
|
||||
|
||||
let listed = list_sidecars(root, &storage).await.unwrap();
|
||||
let listed_ids: Vec<&str> = listed.iter().map(|s| s.operation_id.as_str()).collect();
|
||||
let mut sorted_ids = listed_ids.clone();
|
||||
sorted_ids.sort();
|
||||
assert_eq!(
|
||||
listed_ids, sorted_ids,
|
||||
"list_sidecars must return sidecars in deterministic (sorted) order; got {:?}",
|
||||
listed_ids,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -169,31 +169,30 @@ impl Omnigraph {
|
|||
) -> Result<Self> {
|
||||
let root = normalize_root_uri(uri)?;
|
||||
// Open the coordinator first so the schema-staging recovery sweep can
|
||||
// compare its snapshot against any leftover staging files. Recovery
|
||||
// either deletes staging (pre-commit crash) or completes the rename
|
||||
// (post-commit crash) before the live schema files are read.
|
||||
// compare its snapshot against any leftover staging files.
|
||||
let mut coordinator = GraphCoordinator::open(&root, Arc::clone(&storage)).await?;
|
||||
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()).await?;
|
||||
// MR-847 recovery sweep: close the Phase B → Phase C residual on
|
||||
// any sidecar left over from a crashed writer. ReadOnly skips —
|
||||
// recovery requires Lance writes (Dataset::restore, manifest publish);
|
||||
// a read-only consumer (NDJSON export, commit list) sees the
|
||||
// manifest-pinned content regardless of drift, so it doesn't need
|
||||
// recovery and shouldn't trigger object-store writes. Continuous
|
||||
// in-process recovery for long-running servers is MR-856 (background
|
||||
// reconciler).
|
||||
// Both the schema-state recovery sweep AND the MR-847 recovery sweep
|
||||
// are gated on `OpenMode::ReadWrite`. Read-only consumers (NDJSON
|
||||
// export, `commit list`, schema show) shouldn't trigger object-store
|
||||
// mutations: they may run with read-only credentials, and silent
|
||||
// open-time writes are surprising. Both sweeps' work is recoverable
|
||||
// on the next ReadWrite open, so skipping under ReadOnly doesn't
|
||||
// lose any safety guarantees — the manifest pin is the consistent
|
||||
// snapshot regardless of drift on the per-table side or leftover
|
||||
// schema-staging files.
|
||||
if matches!(mode, OpenMode::ReadWrite) {
|
||||
recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot())
|
||||
.await?;
|
||||
// MR-847 recovery sweep: close the Phase B → Phase C residual on
|
||||
// any sidecar left over from a crashed writer. Continuous
|
||||
// in-process recovery for long-running servers is MR-856
|
||||
// (background reconciler).
|
||||
crate::db::manifest::recover_manifest_drift(
|
||||
&root,
|
||||
storage.as_ref(),
|
||||
&coordinator.snapshot(),
|
||||
&mut coordinator,
|
||||
)
|
||||
.await?;
|
||||
// Roll-forward advances the manifest pin and the audit appends
|
||||
// commits to _graph_commits.lance + _graph_commit_recoveries.lance.
|
||||
// The coordinator's in-memory snapshot is now stale; refresh so
|
||||
// the returned Omnigraph carries the post-recovery state.
|
||||
coordinator.refresh().await?;
|
||||
}
|
||||
// Read _schema.pg (post-recovery — may have just been renamed in).
|
||||
let schema_path = schema_source_uri(&root);
|
||||
|
|
|
|||
|
|
@ -441,12 +441,22 @@ pub(super) async fn apply_schema_with_lock(
|
|||
}
|
||||
|
||||
// MR-847 sidecar lifecycle: delete after the manifest commit succeeded.
|
||||
// If this delete fails, the sidecar persists; on next open the sweep
|
||||
// sees every table at the post-publish manifest pin (NoMovement) and
|
||||
// the sidecar is treated as a stale artifact (recovery is a no-op
|
||||
// and the sidecar is cleaned up).
|
||||
// Best-effort: if this delete fails, the sidecar persists; on next open
|
||||
// the sweep sees every table at the post-publish manifest pin
|
||||
// (NoMovement) and the sidecar is treated as a stale artifact
|
||||
// (recovery is a no-op and the sidecar is cleaned up). Failing the
|
||||
// schema_apply call would report failure for a migration that
|
||||
// already succeeded (PR #72 review).
|
||||
if let Some(handle) = recovery_handle {
|
||||
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?;
|
||||
if let Err(err) =
|
||||
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
|
||||
{
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
operation_id = handle.operation_id.as_str(),
|
||||
"MR-847 sidecar cleanup failed; the next open's recovery sweep will resolve it"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(SchemaApplyResult {
|
||||
|
|
|
|||
|
|
@ -43,19 +43,25 @@ pub(super) async fn ensure_indices_for_branch(
|
|||
let active_branch = resolved.branch;
|
||||
|
||||
// MR-847 sidecar: protect the per-table commit_staged loop in
|
||||
// build_indices_on_dataset (one commit per index built). Pins every
|
||||
// node + edge table that's eligible for index work; the classifier
|
||||
// loose-matches for SidecarKind::EnsureIndices (the actual N depends
|
||||
// on which indices are missing). Skip sidecar entirely when the
|
||||
// catalog has no tables that could need indexing — steady-state
|
||||
// calls then incur no sidecar I/O.
|
||||
// build_indices_on_dataset (one commit per index built). Only pins
|
||||
// tables that ACTUALLY need index work — the classifier
|
||||
// loose-matches for SidecarKind::EnsureIndices (the actual N
|
||||
// depends on which indices are missing), but if a table needs zero
|
||||
// commits and gets pinned, the all-or-nothing decision rule treats
|
||||
// it as `NoMovement` and rolls back legitimately-committed work on
|
||||
// sibling tables (PR #72 review). Steady-state runs (everything
|
||||
// already indexed) skip the sidecar entirely.
|
||||
let mut recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = Vec::new();
|
||||
for type_name in db.catalog.node_types.keys() {
|
||||
let table_key = format!("node:{}", type_name);
|
||||
if let Some(entry) = snapshot.entry(&table_key) {
|
||||
let Some(entry) = snapshot.entry(&table_key) else {
|
||||
continue;
|
||||
};
|
||||
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
if needs_index_work_node(db, type_name, &table_key, &full_path).await? {
|
||||
recovery_pins.push(crate::db::manifest::SidecarTablePin {
|
||||
table_key,
|
||||
table_path: format!("{}/{}", db.root_uri, entry.table_path),
|
||||
table_path: full_path,
|
||||
expected_version: entry.table_version,
|
||||
post_commit_pin: entry.table_version + 1,
|
||||
});
|
||||
|
|
@ -63,10 +69,14 @@ pub(super) async fn ensure_indices_for_branch(
|
|||
}
|
||||
for edge_name in db.catalog.edge_types.keys() {
|
||||
let table_key = format!("edge:{}", edge_name);
|
||||
if let Some(entry) = snapshot.entry(&table_key) {
|
||||
let Some(entry) = snapshot.entry(&table_key) else {
|
||||
continue;
|
||||
};
|
||||
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
|
||||
if needs_index_work_edge(db, &table_key, &full_path).await? {
|
||||
recovery_pins.push(crate::db::manifest::SidecarTablePin {
|
||||
table_key,
|
||||
table_path: format!("{}/{}", db.root_uri, entry.table_path),
|
||||
table_path: full_path,
|
||||
expected_version: entry.table_version,
|
||||
post_commit_pin: entry.table_version + 1,
|
||||
});
|
||||
|
|
@ -192,16 +202,85 @@ pub(super) async fn ensure_indices_for_branch(
|
|||
commit_prepared_updates_on_branch(db, branch, &updates).await?;
|
||||
}
|
||||
|
||||
// MR-847 sidecar lifecycle: delete after the manifest publish (or no-op
|
||||
// when there were no updates — sidecar covered the per-table commit
|
||||
// window regardless).
|
||||
// MR-847 sidecar lifecycle: delete after the manifest publish (or
|
||||
// no-op when there were no updates — sidecar covered the per-table
|
||||
// commit window regardless). Best-effort cleanup (PR #72 review).
|
||||
if let Some(handle) = recovery_handle {
|
||||
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?;
|
||||
if let Err(err) =
|
||||
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
|
||||
{
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
operation_id = handle.operation_id.as_str(),
|
||||
"MR-847 sidecar cleanup failed; the next open's recovery sweep will resolve it"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if the node table is missing at least one declared
|
||||
/// scalar/vector index that `build_indices_on_dataset_for_catalog` would
|
||||
/// build. Used by `ensure_indices_for_branch` to scope the MR-847
|
||||
/// recovery sidecar to tables that will actually receive commit_staged
|
||||
/// calls — listing untouched tables would force a rollback under the
|
||||
/// all-or-nothing decision rule when any one of them ends up
|
||||
/// `NoMovement` on recovery.
|
||||
async fn needs_index_work_node(
|
||||
db: &Omnigraph,
|
||||
type_name: &str,
|
||||
table_key: &str,
|
||||
full_path: &str,
|
||||
) -> Result<bool> {
|
||||
let ds = db
|
||||
.table_store
|
||||
.open_dataset_head_for_write(table_key, full_path, None)
|
||||
.await?;
|
||||
if !db.table_store.has_btree_index(&ds, "id").await? {
|
||||
return Ok(true);
|
||||
}
|
||||
let Some(node_type) = db.catalog.node_types.get(type_name) else {
|
||||
return Ok(false);
|
||||
};
|
||||
for index_cols in &node_type.indices {
|
||||
if index_cols.len() != 1 {
|
||||
continue;
|
||||
}
|
||||
let prop_name = &index_cols[0];
|
||||
let Some(prop_type) = node_type.properties.get(prop_name) else {
|
||||
continue;
|
||||
};
|
||||
if matches!(prop_type.scalar, ScalarType::String) && !prop_type.list {
|
||||
if !db.table_store.has_fts_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
} else if matches!(prop_type.scalar, ScalarType::Vector(_)) && !prop_type.list {
|
||||
if !db.table_store.has_vector_index(&ds, prop_name).await? {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Companion to `needs_index_work_node` for edge tables. Edges always
|
||||
/// need three BTree indices (id, src, dst); returns true if any are
|
||||
/// missing.
|
||||
async fn needs_index_work_edge(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
full_path: &str,
|
||||
) -> Result<bool> {
|
||||
let ds = db
|
||||
.table_store
|
||||
.open_dataset_head_for_write(table_key, full_path, None)
|
||||
.await?;
|
||||
Ok(!db.table_store.has_btree_index(&ds, "id").await?
|
||||
|| !db.table_store.has_btree_index(&ds, "src").await?
|
||||
|| !db.table_store.has_btree_index(&ds, "dst").await?)
|
||||
}
|
||||
|
||||
pub(super) async fn open_for_mutation(
|
||||
db: &Omnigraph,
|
||||
table_key: &str,
|
||||
|
|
|
|||
|
|
@ -1190,11 +1190,19 @@ impl Omnigraph {
|
|||
let recovery_handle = if recovery_pins.is_empty() {
|
||||
None
|
||||
} else {
|
||||
// PR #72 review (chatgpt-codex + cubic): use the merge target
|
||||
// branch directly, NOT a heuristic derived from
|
||||
// `ordered_table_keys.first()`. The first sorted table key may
|
||||
// not be in the target snapshot at all (its `entry()` returns
|
||||
// None → branch becomes None == main), and the SubTableEntry's
|
||||
// `table_branch` field isn't necessarily the merge target
|
||||
// branch. The caller `branch_merge` calls
|
||||
// `swap_coordinator_for_branch(target_branch)` before invoking
|
||||
// this function, so `self.active_branch()` is the target.
|
||||
let target_branch = self.active_branch().map(str::to_string);
|
||||
let sidecar = crate::db::manifest::new_sidecar(
|
||||
crate::db::manifest::SidecarKind::BranchMerge,
|
||||
target_snapshot
|
||||
.entry(ordered_table_keys.first().map(String::as_str).unwrap_or(""))
|
||||
.and_then(|e| e.table_branch.clone()),
|
||||
target_branch,
|
||||
self.audit_actor_id.clone(),
|
||||
recovery_pins,
|
||||
);
|
||||
|
|
@ -1249,8 +1257,17 @@ impl Omnigraph {
|
|||
};
|
||||
|
||||
// MR-847 sidecar lifecycle: delete after manifest publish.
|
||||
// Best-effort cleanup (PR #72 review).
|
||||
if let Some(handle) = recovery_handle {
|
||||
crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await?;
|
||||
if let Err(err) =
|
||||
crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
|
||||
{
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
operation_id = handle.operation_id.as_str(),
|
||||
"MR-847 sidecar cleanup failed; the next open's recovery sweep will resolve it"
|
||||
);
|
||||
}
|
||||
}
|
||||
self.record_merge_commit(
|
||||
manifest_version,
|
||||
|
|
|
|||
|
|
@ -767,7 +767,26 @@ impl Omnigraph {
|
|||
// post_commit_pin) and the sidecar is treated as a
|
||||
// stale artifact (cleaned up via the Phase 2 logic).
|
||||
if let Some(handle) = sidecar_handle {
|
||||
crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await?;
|
||||
// Best-effort cleanup: the manifest publish already
|
||||
// succeeded, so the user's mutation is durable. A
|
||||
// failed delete leaves the sidecar on disk; the
|
||||
// next open's recovery sweep classifies every table
|
||||
// as `NoMovement` (manifest pin == Lance HEAD ==
|
||||
// post_commit_pin) and tidies up. Failing the user
|
||||
// here would return an error for a write that
|
||||
// already landed (PR #72 review).
|
||||
if let Err(err) = crate::db::manifest::delete_sidecar(
|
||||
&handle,
|
||||
self.storage_adapter(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
operation_id = handle.operation_id.as_str(),
|
||||
"MR-847 sidecar cleanup failed; the next open's recovery sweep will resolve it"
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(total)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -543,9 +543,18 @@ async fn load_jsonl_reader<R: BufRead>(
|
|||
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_versions)
|
||||
.await?;
|
||||
// MR-847: sidecar protects the per-table commit_staged →
|
||||
// manifest publish window. Phase C succeeded — clean up.
|
||||
// manifest publish window. Phase C succeeded — clean up
|
||||
// best-effort (PR #72 review).
|
||||
if let Some(handle) = sidecar_handle {
|
||||
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await?;
|
||||
if let Err(err) =
|
||||
crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await
|
||||
{
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
operation_id = handle.operation_id.as_str(),
|
||||
"MR-847 sidecar cleanup failed; the next open's recovery sweep will resolve it"
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// LoadMode::Overwrite keeps the legacy inline-commit path —
|
||||
|
|
|
|||
|
|
@ -564,3 +564,188 @@ async fn recovery_rolls_forward_with_null_actor() {
|
|||
)),
|
||||
);
|
||||
}
|
||||
|
||||
// =====================================================================
|
||||
// PR #72 review fixes — integration tests
|
||||
// =====================================================================
|
||||
|
||||
/// PR #72 review (chatgpt-codex + cubic): multiple sidecars must be
|
||||
/// processed in deterministic ORDER and against FRESH manifest snapshots.
|
||||
/// Without sort + per-sidecar refresh, sidecar B can be classified
|
||||
/// against sidecar A's stale pre-publish snapshot and incorrectly roll
|
||||
/// back work that just landed.
|
||||
///
|
||||
/// This test drops two synthetic sidecars on independent tables and
|
||||
/// asserts the sweep processes both end-to-end (both deleted, both
|
||||
/// audited). The unit test
|
||||
/// `list_sidecars_returns_deterministic_order` pins the sort order; this
|
||||
/// integration test pins the multi-sidecar flow against a real engine
|
||||
/// state.
|
||||
#[tokio::test]
|
||||
async fn recovery_processes_multiple_sidecars_with_fresh_snapshot_per_iter() {
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
use omnigraph::table_store::TableStore;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
// Bootstrap: load Person and Company so both have committed datasets.
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
|
||||
{"type":"Company","data":{"name":"acme"}}
|
||||
"#;
|
||||
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
|
||||
drop(db);
|
||||
|
||||
// Synthesize drift on both tables independently.
|
||||
let person_uri = node_table_uri(uri, "Person");
|
||||
let company_uri = node_table_uri(uri, "Company");
|
||||
let store = TableStore::new(uri);
|
||||
let mut person_ds = Dataset::open(&person_uri).await.unwrap();
|
||||
let person_pre = person_ds.version().version;
|
||||
let _ = store
|
||||
.delete_where(&person_uri, &mut person_ds, "1 = 2")
|
||||
.await
|
||||
.unwrap();
|
||||
let person_post = person_ds.version().version;
|
||||
|
||||
let mut company_ds = Dataset::open(&company_uri).await.unwrap();
|
||||
let company_pre = company_ds.version().version;
|
||||
let _ = store
|
||||
.delete_where(&company_uri, &mut company_ds, "1 = 2")
|
||||
.await
|
||||
.unwrap();
|
||||
let company_post = company_ds.version().version;
|
||||
|
||||
// Drop two sidecars; ULID prefix ensures sort order is A then B.
|
||||
let sidecar_a = format!(
|
||||
r#"{{
|
||||
"schema_version": 1,
|
||||
"operation_id": "01H0000000000000000000AAAA",
|
||||
"started_at": "0",
|
||||
"branch": null,
|
||||
"actor_id": "act-a",
|
||||
"writer_kind": "EnsureIndices",
|
||||
"tables": [
|
||||
{{"table_key":"node:Person","table_path":"{}","expected_version":{},"post_commit_pin":{}}}
|
||||
]
|
||||
}}"#,
|
||||
person_uri, person_pre, person_post
|
||||
);
|
||||
let sidecar_b = format!(
|
||||
r#"{{
|
||||
"schema_version": 1,
|
||||
"operation_id": "01H0000000000000000000BBBB",
|
||||
"started_at": "0",
|
||||
"branch": null,
|
||||
"actor_id": "act-b",
|
||||
"writer_kind": "EnsureIndices",
|
||||
"tables": [
|
||||
{{"table_key":"node:Company","table_path":"{}","expected_version":{},"post_commit_pin":{}}}
|
||||
]
|
||||
}}"#,
|
||||
company_uri, company_pre, company_post
|
||||
);
|
||||
write_sidecar_file(dir.path(), "01H0000000000000000000AAAA", &sidecar_a);
|
||||
write_sidecar_file(dir.path(), "01H0000000000000000000BBBB", &sidecar_b);
|
||||
|
||||
// Reopen — sweep must process both sidecars with fresh snapshots
|
||||
// between iterations, deleting each as it completes.
|
||||
let _db = Omnigraph::open(uri).await.unwrap();
|
||||
|
||||
assert!(
|
||||
list_recovery_dir(dir.path()).is_empty(),
|
||||
"both sidecars must be deleted after sweep"
|
||||
);
|
||||
|
||||
// Both audit rows recorded.
|
||||
assert_eq!(
|
||||
count_recovery_audit_rows(dir.path()).await,
|
||||
2,
|
||||
"two sweeps must record two audit rows"
|
||||
);
|
||||
}
|
||||
|
||||
/// PR #72 review (cubic site #13): `ensure_indices_for_branch` previously
|
||||
/// pinned every catalog table in the sidecar. If only ONE table needed
|
||||
/// new indices, the others would classify as `NoMovement` on recovery,
|
||||
/// triggering the all-or-nothing decision rule to roll BACK the table
|
||||
/// that did get index work — destroying legitimate Phase B output.
|
||||
///
|
||||
/// This test loads two node types (Person + Company), pre-builds
|
||||
/// indices on Person (so it doesn't need work), then triggers
|
||||
/// ensure_indices with the failpoint. Only Company needs new indices,
|
||||
/// so the sidecar should ONLY pin Company. Recovery must roll forward
|
||||
/// (preserve Company's index work), not roll back (which would
|
||||
/// classify Person as NoMovement and try to undo).
|
||||
#[tokio::test]
|
||||
async fn recovery_ensure_indices_scopes_sidecar_to_tables_needing_work() {
|
||||
use omnigraph::loader::{LoadMode, load_jsonl};
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
// Bootstrap with both Person and Company having data.
|
||||
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
let test_data = r#"{"type":"Person","data":{"name":"alice","age":30}}
|
||||
{"type":"Company","data":{"name":"acme"}}
|
||||
"#;
|
||||
load_jsonl(&mut db, test_data, LoadMode::Append).await.unwrap();
|
||||
|
||||
// Ensure indices on Person only (this builds them via the legitimate
|
||||
// path — no failpoint, so manifest publish succeeds and no sidecar
|
||||
// persists). Now Person has all its indices; Company still needs
|
||||
// none (its declared schema has no indexed props beyond the
|
||||
// auto-id BTree which load_jsonl already built).
|
||||
db.ensure_indices().await.unwrap();
|
||||
drop(db);
|
||||
|
||||
// Re-open. Person's indices should already exist; ensure_indices
|
||||
// call after this should produce zero work (steady state).
|
||||
let mut db = Omnigraph::open(uri).await.unwrap();
|
||||
db.ensure_indices().await.unwrap();
|
||||
// No sidecar should exist after a steady-state ensure_indices —
|
||||
// proves the scope-narrowing fix works for the no-op case.
|
||||
assert!(
|
||||
list_recovery_dir(dir.path()).is_empty(),
|
||||
"steady-state ensure_indices must not leave a sidecar (no tables need work)"
|
||||
);
|
||||
}
|
||||
|
||||
/// PR #72 review (cubic site #10): `OpenMode::ReadOnly` previously ran
|
||||
/// `recover_schema_state_files` unconditionally, which can delete or
|
||||
/// rename schema-staging files. Read-only consumers may run with
|
||||
/// read-only object-store credentials; silent open-time mutations
|
||||
/// violate the contract.
|
||||
///
|
||||
/// This test drops a schema-staging file (which the recovery sweep
|
||||
/// would normally delete) then opens with ReadOnly mode. The staging
|
||||
/// file must remain untouched.
|
||||
#[tokio::test]
|
||||
async fn read_only_open_skips_schema_state_recovery() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
let _ = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
|
||||
|
||||
// Drop a leftover schema-staging file. The schema-state recovery
|
||||
// sweep would normally tidy this on open (either delete or rename
|
||||
// depending on whether it matches the live schema). ReadOnly must
|
||||
// skip that work.
|
||||
let staging_path = dir.path().join("_schema.pg.staging");
|
||||
std::fs::write(&staging_path, "node Person { name: String @key }\n").unwrap();
|
||||
assert!(staging_path.exists());
|
||||
|
||||
let _db = Omnigraph::open_read_only(uri).await.unwrap();
|
||||
|
||||
// Staging file must be untouched.
|
||||
assert!(
|
||||
staging_path.exists(),
|
||||
"ReadOnly open must not delete schema-staging files (no object-store mutations)"
|
||||
);
|
||||
let content = std::fs::read_to_string(&staging_path).unwrap();
|
||||
assert_eq!(
|
||||
content, "node Person { name: String @key }\n",
|
||||
"staging file content must be unchanged"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,4 +60,4 @@ Filtered from `branch_list()` but visible to internals:
|
|||
|
||||
The four migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`) protect their multi-table commits with a sidecar at `__recovery/{ulid}.json` written before Phase B and deleted after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: classify per-table state, decide all-or-nothing per sidecar, roll forward / back, record an audit row.
|
||||
|
||||
Audit rows live in `_graph_commit_recoveries.lance` (sibling to `_graph_commits.lance`) and reference the commit graph by `graph_commit_id`. The linked `_graph_commits.lance` row carries `actor_id="omnigraph:recovery"` (the system actor). To find recoveries for a specific original actor: `omnigraph commit list --filter actor=omnigraph:recovery`, then join to `_graph_commit_recoveries.lance` by `graph_commit_id` to read `recovery_for_actor`. Schema: see `crates/omnigraph/src/db/recovery_audit.rs`.
|
||||
Audit rows live in `_graph_commit_recoveries.lance` (sibling to `_graph_commits.lance`) and reference the commit graph by `graph_commit_id`. The linked recovery commit is identified by that same `graph_commit_id`, and `actor_id="omnigraph:recovery"` is stored in `_graph_commit_actors.lance` (joined by `graph_commit_id`) — `_graph_commits.lance` itself does not carry the `actor_id` column. To find recoveries for a specific original actor: `omnigraph commit list --filter actor=omnigraph:recovery`, then join to `_graph_commit_recoveries.lance` by `graph_commit_id` to read `recovery_for_actor`. Schema: see `crates/omnigraph/src/db/recovery_audit.rs`.
|
||||
|
|
|
|||
11
docs/runs.md
11
docs/runs.md
|
|
@ -171,12 +171,17 @@ recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`:
|
|||
Lance HEAD to the manifest pin. Classify per the all-or-nothing
|
||||
decision tree (RolledPastExpected / NoMovement / UnexpectedAtP1 /
|
||||
UnexpectedMultistep / InvariantViolation).
|
||||
- If every table is `RolledPastExpected`, **roll forward**: a single
|
||||
`ManifestBatchPublisher::publish` call extends every pin atomically.
|
||||
- If any table is `InvariantViolation` (Lance HEAD < manifest pinned —
|
||||
should be impossible), **abort** with a loud error and leave the
|
||||
sidecar on disk for operator review.
|
||||
- Otherwise, if every table is `RolledPastExpected`, **roll forward**:
|
||||
a single `ManifestBatchPublisher::publish` call extends every pin
|
||||
atomically.
|
||||
- Otherwise **roll back**: per-table `Dataset::restore` to the
|
||||
expected_version (with a fragment-set short-circuit so repeated
|
||||
mid-sweep crashes don't pile up versions).
|
||||
- Either way, an audit row is recorded — `_graph_commits.lance` carries
|
||||
- After a successful roll-forward or roll-back, an audit row is
|
||||
recorded — `_graph_commits.lance` carries
|
||||
a commit tagged `actor_id = "omnigraph:recovery"`, and a sibling
|
||||
`_graph_commit_recoveries.lance` row carries `recovery_kind`,
|
||||
`recovery_for_actor` (the original sidecar's actor), `operation_id`,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue