diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b2b18d8..56ef3e3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -365,6 +365,20 @@ jobs: - name: Run RustFS CLI smoke run: cargo test --locked -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow -- --nocapture + - name: Run RustFS recovery-sidecar lifecycle + # Sidecar put/list/delete through the S3 storage backend on a + # real bucket (the failpoint only wedges the publisher; the + # sidecar I/O is exercised for real). Name filter `s3_` matches + # the bucket-gated tests in the failpoints target only; the + # grep guards against the filter going vacuous (cargo passes + # with 0 tests matched) if those tests are ever renamed. + run: | + output=$(cargo test --locked -p omnigraph-engine --features failpoints --test failpoints s3_ -- --nocapture 2>&1); status=$? + echo "$output" + [ "$status" -eq 0 ] || exit "$status" + echo "$output" | grep -Eq "test result: ok\. [1-9][0-9]* passed" \ + || { echo "::error::filter 's3_' matched no tests — vacuous pass"; exit 1; } + - name: Dump RustFS logs on failure if: failure() run: docker logs rustfs diff --git a/AGENTS.md b/AGENTS.md index 87d6a46..d9e0c45 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -240,7 +240,7 @@ omnigraph policy explain --actor act-alice --action change --branch main | Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing | | Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables | | Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering | -| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). Continuous in-process recovery (no restart needed between Phase B failure and recovery) is the goal of a future background reconciler. Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete_where` and `create_vector_index` stay inline until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. | +| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete_where` and `create_vector_index` stay inline until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. | | Compaction (`compact_files`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; **publishes each compacted table's new version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe compaction and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage; **refuses on an unrecovered graph** (errors if a `__recovery` sidecar is pending); **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair` instead of interpreting it; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) | | Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. | | Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy | diff --git a/Cargo.toml b/Cargo.toml index 17990ea..918ac05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ base64 = "0.22" ariadne = "0.4" regex = "1" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } -object_store = { version = "0.12.5", default-features = false, features = ["aws"] } +object_store = { version = "0.12.5", default-features = false, features = ["aws", "fs"] } fail = "0.5" time = { version = "0.3", features = ["formatting"] } axum = { version = "0.8", features = ["json", "macros"] } diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index 4d33d2c..620df96 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -169,31 +169,16 @@ impl ClusterStore { .map_err(|err| err.to_string()) } - /// JSON object write with the strongest atomicity the backend offers: - /// temp + rename on the filesystem (no torn JSON after a crash; the - /// pre-port behavior), a single atomic PUT on object stores (where - /// copy+delete would be weaker, not stronger). + /// JSON object write. Atomic visibility is the storage adapter's + /// contract on every backend (staged temp + rename on the filesystem, + /// a single atomic PUT on object stores) — no torn JSON after a crash, + /// no per-backend branch needed here. async fn put_json(&self, relative: &str, payload: &str) -> Result<(), String> { let target = self.uri(relative); - match self.kind() { - StorageKind::Local => { - let tmp = format!("{target}.tmp.{}", Ulid::new()); - self.adapter - .write_text(&tmp, payload) - .await - .map_err(|err| err.to_string())?; - if let Err(err) = self.adapter.rename_text(&tmp, &target).await { - let _ = self.adapter.delete(&tmp).await; - return Err(err.to_string()); - } - Ok(()) - } - StorageKind::S3 => self - .adapter - .write_text(&target, payload) - .await - .map_err(|err| err.to_string()), - } + self.adapter + .write_text(&target, payload) + .await + .map_err(|err| err.to_string()) } /// Shared list-and-parse for the sidecar/approval directories: id diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index 63e7da7..805ecda 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -1950,13 +1950,29 @@ graphs: } #[tokio::test] + #[cfg(unix)] async fn refresh_flags_unreadable_payload_as_error() { let dir = fixture(); init_derived_graph(dir.path()).await; let blob = converge_fixture(dir.path()).await; - // A same-named directory yields a non-NotFound IO error portably. - fs::remove_file(&blob).unwrap(); - fs::create_dir(&blob).unwrap(); + // Make the payload unreadable without removing it: permission + // denied is a genuine non-NotFound IO error. (A same-named + // directory no longer triggers this path: object-store semantics + // classify a directory at an object path as NotFound — "only + // objects exist" — which is the missing-payload case, not the + // unreadable one.) + let mut perms = fs::metadata(&blob).unwrap().permissions(); + std::os::unix::fs::PermissionsExt::set_mode(&mut perms, 0o000); + fs::set_permissions(&blob, perms).unwrap(); + // Root reads straight through mode 000 (container dev runners + // commonly run as root): skip rather than fail — the contract + // under test needs a genuine permission error. + if fs::read(&blob).is_ok() { + eprintln!( + "skipping refresh_flags_unreadable_payload_as_error: running as root (mode 000 is still readable)" + ); + return; + } let out = refresh_config_dir(dir.path()).await; assert!(!out.ok); diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 5bf1f87..6cd271a 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -36,7 +36,8 @@ use publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; pub(crate) use recovery::{ RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, - list_sidecars, new_sidecar, recover_manifest_drift, write_sidecar, + heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar, recover_manifest_drift, + schema_apply_serial_queue_key, write_sidecar, }; pub use state::SubTableEntry; #[cfg(test)] diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 3b0f147..d49e86a 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -28,10 +28,11 @@ //! CreateIndex/Merge — see `check_restore_txn` at lance-6.0.1 //! `src/io/commit/conflict_resolver.rs:986`. The hazard is documented //! by `tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning`. -//! This module sidesteps the hazard by running recovery only at -//! `Omnigraph::open` (before any other writers can race). A future -//! continuous in-process recovery reconciler will need to guard via -//! per-(table_key, branch) queue acquisition. +//! The open-time sweep sidesteps the hazard by running before any +//! other writers can race; the in-process heal +//! ([`heal_pending_sidecars_roll_forward`]) never restores (roll- +//! forward only) and guards via per-(table_key, branch) queue +//! acquisition. use std::collections::HashMap; @@ -316,8 +317,8 @@ pub(crate) fn sidecar_uri(root_uri: &str, operation_id: &str) -> String { /// Write a sidecar atomically and return a handle for later deletion. /// /// The atomicity contract is inherited from [`StorageAdapter::write_text`]: -/// LocalStorageAdapter writes via `tokio::fs::write` (whole-file replace); -/// S3StorageAdapter writes via PutObject (atomic at the object level). +/// the local backend publishes via a staged temp file + rename (atomic on +/// POSIX); object stores write via PutObject (atomic at the object level). /// Both are sufficient for sidecar semantics — readers either see the /// complete sidecar or none. pub(crate) async fn write_sidecar( @@ -325,6 +326,9 @@ pub(crate) async fn write_sidecar( storage: &dyn StorageAdapter, sidecar: &RecoverySidecar, ) -> Result { + // Failpoint: models a storage put failure (S3 PutObject / fs write) + // in Phase A — every writer must abort before any HEAD advance. + crate::failpoints::maybe_fail("recovery.sidecar_write")?; debug_assert_eq!(sidecar.schema_version, SIDECAR_SCHEMA_VERSION); let uri = sidecar_uri(root_uri, &sidecar.operation_id); let json = serde_json::to_string_pretty(sidecar).map_err(|err| { @@ -342,6 +346,10 @@ pub(crate) async fn delete_sidecar( handle: &RecoverySidecarHandle, storage: &dyn StorageAdapter, ) -> Result<()> { + // Failpoint: models a storage delete failure (S3 DeleteObject) in + // Phase D — callers swallow it (the write already published) and the + // stale sidecar is healed by the next write or open. + crate::failpoints::maybe_fail("recovery.sidecar_delete")?; storage.delete(&handle.sidecar_uri).await } @@ -356,6 +364,10 @@ pub(crate) async fn list_sidecars( root_uri: &str, storage: &dyn StorageAdapter, ) -> Result> { + // Failpoint: models a storage list failure (S3 ListObjectsV2) — every + // consumer (open-time sweep, write-entry heal) must fail loudly + // rather than silently skipping recovery. + crate::failpoints::maybe_fail("recovery.sidecar_list")?; let dir = recovery_dir_uri(root_uri); let mut uris = storage.list_dir(&dir).await?; // Sort by URI so the sweep processes sidecars deterministically. @@ -534,6 +546,240 @@ pub(crate) async fn restore_table_to_version( Ok(()) } +/// In-process heal for pending recovery sidecars — the entry point for +/// long-lived handles (`Omnigraph::refresh` and the staged-write entry +/// points `load_as` / `mutate_as`). +/// +/// Steady-state cost is one `list_dir` of `__recovery/` (typically +/// empty → immediate return), so write entry points can afford to call +/// this on every request. When sidecars exist, each is processed in +/// `RecoveryMode::RollForwardOnly`: the common Phase B → Phase C +/// residual (per-table `commit_staged` landed, manifest publish did +/// not) rolls forward in-process; rollback-eligible or invariant- +/// violating sidecars are deferred to the next ReadWrite open, exactly +/// as `Omnigraph::refresh` documents. +/// +/// Concurrency: unlike the open-time sweep, this runs while other +/// writers may be in flight. Every sidecar writer (mutation/load +/// finalize, schema_apply, branch_merge, ensure_indices, optimize) +/// acquires its per-`(table_key, table_branch)` write queues *before* +/// `write_sidecar` and holds them until after `delete_sidecar` — so +/// acquiring the same queues here blocks until that writer either +/// finished (sidecar deleted; the existence re-check skips it) or died +/// (sidecar is genuinely orphaned; safe to process). Without this, the +/// heal could observe a live writer's sidecar in its commit→publish +/// window, roll it forward, and fail that writer's own publish CAS. +/// Lock order is queues → coordinator, matching every writer's +/// commit→publish path. +/// +/// The schema-staging reconcile runs lazily, per SchemaApply sidecar, +/// AFTER that sidecar's queue guards are held and its existence is +/// re-confirmed — never up front. An up-front reconcile can promote a +/// LIVE schema apply's staging files and steal its commit (pinned by +/// `tests/failpoints.rs::heal_does_not_promote_live_schema_apply_staging`). +/// +/// Returns `true` when at least one sidecar was processed (the caller +/// should invalidate per-snapshot caches). +pub(crate) async fn heal_pending_sidecars_roll_forward( + root_uri: &str, + storage: std::sync::Arc, + coordinator: &tokio::sync::RwLock, + write_queue: &crate::db::write_queue::WriteQueueManager, +) -> Result { + let sidecars = list_sidecars(root_uri, storage.as_ref()).await?; + if sidecars.is_empty() { + return Ok(false); + } + let mut processed_any = false; + for sidecar in sidecars { + // Serialize against a possibly-live writer (see fn docs). Guards + // are scoped per sidecar so two sidecars never hold queues + // simultaneously (no cross-sidecar lock-order surface). + let mut queue_keys: Vec = sidecar + .tables + .iter() + .map(|pin| (pin.table_key.clone(), pin.table_branch.clone())) + .collect(); + let is_schema_apply = matches!(sidecar.writer_kind, SidecarKind::SchemaApply); + if is_schema_apply { + // A SchemaApply sidecar's per-table pins don't cover a + // registration-only migration (no existing tables touched, + // but staging files + a sidecar on disk). The schema-apply + // writer holds this serialization key from before its + // sidecar write until after its sidecar delete, so blocking + // on it — then re-checking sidecar existence — guarantees + // the writer is gone before the reconcile below mutates + // schema staging. + queue_keys.push(schema_apply_serial_queue_key()); + } + let _guards = write_queue.acquire_many(&queue_keys).await; + // Re-check after the wait: the writer we blocked on may have + // completed Phase C and deleted its sidecar. + if !storage + .exists(&sidecar_uri(root_uri, &sidecar.operation_id)) + .await? + { + continue; + } + // Schema-staging reconcile, per SchemaApply sidecar, UNDER the + // sidecar's guards: a sidecar still on disk after the queue wait + // belongs to a dead writer, so promoting its staging files can no + // longer race the live apply's own renames or steal its commit. + // It also re-runs per sidecar, so a multi-sidecar pass never + // classifies against a reconcile result an earlier roll-forward + // staled. Non-SchemaApply sidecars never consult the value. + let schema_state_recovery = if is_schema_apply { + let snapshot = { + let mut coord = coordinator.write().await; + coord.refresh().await?; + coord.snapshot() + }; + crate::db::schema_state::recover_schema_state_files( + root_uri, + std::sync::Arc::clone(&storage), + &snapshot, + ) + .await? + } else { + SchemaStateRecovery::Noop + }; + // Fresh per-branch snapshot — same rationale as + // `recover_manifest_drift`: classify against the branch the + // sidecar's writer targeted, refreshed after any prior + // sidecar's roll-forward. + let branch_snapshot = match sidecar.branch.as_deref() { + Some(b) => { + // Orphan check against the manifest's branch list (the + // authority) BEFORE opening: a deferred sidecar whose + // branch was deleted would otherwise wedge every write + // on the dead-branch open. + let (branch_exists, main_version) = { + let mut coord = coordinator.write().await; + coord.refresh().await?; + let exists = coord.all_branches().await?.iter().any(|name| name == b); + (exists, coord.snapshot().version()) + }; + if !branch_exists { + discard_orphaned_branch_sidecar( + root_uri, + storage.as_ref(), + &sidecar, + main_version, + ) + .await?; + processed_any = true; + continue; + } + let mut branch_coord = + GraphCoordinator::open_branch(root_uri, b, std::sync::Arc::clone(&storage)) + .await?; + branch_coord.refresh().await?; + branch_coord.snapshot() + } + None => { + let mut coord = coordinator.write().await; + coord.refresh().await?; + coord.snapshot() + } + }; + if process_sidecar( + root_uri, + storage.as_ref(), + &branch_snapshot, + &sidecar, + RecoveryMode::RollForwardOnly, + schema_state_recovery, + ) + .await? + { + processed_any = true; + } + } + // Re-read coordinator state so the caller's handle observes the + // post-heal manifest. + coordinator.write().await.refresh().await?; + Ok(processed_any) +} + +/// Discard a sidecar whose branch no longer exists in the manifest (the +/// authority — callers must key the orphan classification off the branch +/// LIST, never off a `Not found` from an open, which could be a transient +/// storage error masking real recovery intent). The branch's tree and +/// per-table forks are already reclaimed, so the drift the sidecar pins is +/// unreachable and the sidecar is provably moot; leaving it would wedge +/// every heal (write entry) and every ReadWrite open on a dead-branch +/// open, with `repair` refusing while it pends. Records an +/// `OrphanedBranchDiscarded` audit row (commit appended on main — the +/// sidecar's own branch has no commit graph anymore). +async fn discard_orphaned_branch_sidecar( + root_uri: &str, + storage: &dyn StorageAdapter, + sidecar: &RecoverySidecar, + manifest_version: u64, +) -> Result<()> { + warn!( + operation_id = sidecar.operation_id.as_str(), + writer_kind = ?sidecar.writer_kind, + branch = sidecar.branch.as_deref().unwrap_or(""), + "recovery: discarding sidecar for a deleted branch (drift unreachable; audit recorded)" + ); + let mut audit = RecoveryAudit::open(root_uri).await?; + // Idempotency across a Phase D delete fault: the audit row + commit + // land before the sidecar delete, so a failed delete re-enters here + // with the audit already durable. Append only once per operation — + // the retry's sole remaining job is finishing the delete. (Cold + // path: the list scan runs only when an orphaned sidecar exists.) + // + // Documented residual: the commit append and the audit append are + // two writes. A failure BETWEEN them leaves a recovery commit with + // no audit row; the retry (keyed on the audit row, the operator- + // facing record) appends a second commit before the audit lands — + // bounded commit-graph noise, audit row still exactly-once. Same + // not-atomic-pair-write tolerance as `record_audit` and the + // manifest→commit-graph Known Gap; keying on commit rows instead + // would need an operation_id column on `_graph_commits`, and + // audit-before-commit would dangle the `graph_commit_id` join. + let already_recorded = audit.list().await?.iter().any(|record| { + record.operation_id == sidecar.operation_id + && record.recovery_kind == RecoveryKind::OrphanedBranchDiscarded + }); + if !already_recorded { + let mut graph = CommitGraph::open(root_uri).await?; + let graph_commit_id = graph + .append_commit(None, manifest_version, Some(RECOVERY_ACTOR)) + .await?; + // Failpoint: the residual window above — commit appended, audit + // not yet durable. + crate::failpoints::maybe_fail("recovery.orphan_discard_audit_append")?; + audit + .append(RecoveryAuditRecord { + graph_commit_id, + recovery_kind: RecoveryKind::OrphanedBranchDiscarded, + recovery_for_actor: sidecar.actor_id.clone(), + operation_id: sidecar.operation_id.clone(), + sidecar_writer_kind: format!("{:?}", sidecar.writer_kind), + per_table_outcomes: Vec::new(), + created_at: now_micros()?, + }) + .await?; + } + let handle = RecoverySidecarHandle { + operation_id: sidecar.operation_id.clone(), + sidecar_uri: sidecar_uri(root_uri, &sidecar.operation_id), + }; + delete_sidecar(&handle, storage).await +} + +/// The write-queue key serializing schema-apply's sidecar lifecycle +/// against the write-entry heal. The schema-apply writer acquires it +/// (alongside its per-table keys) from before `write_sidecar` until +/// after `delete_sidecar`; the heal acquires it before reconciling +/// schema staging or processing a SchemaApply sidecar. The name cannot +/// collide with real table keys (those are `node:`/`edge:`-prefixed). +pub(crate) fn schema_apply_serial_queue_key() -> crate::db::write_queue::TableQueueKey { + ("__schema_apply__".to_string(), None) +} + /// Open-time recovery sweep — the entry point invoked from /// `Omnigraph::open` (gated on `OpenMode::ReadWrite`). /// @@ -549,9 +795,10 @@ pub(crate) async fn restore_table_to_version( /// /// Concurrency: today recovery runs synchronously in `Omnigraph::open` /// *before* the engine is wrapped in the server's `Arc>`. -/// No request handlers can race. A future per-(table_key, branch) writer -/// queue model (paired with a background reconciler) will need to acquire -/// queues before the sweep restores or publishes. +/// No request handlers can race, so this sweep does NOT acquire write +/// queues. In-process callers (refresh, write entry points) must use +/// [`heal_pending_sidecars_roll_forward`] instead, which serializes +/// against live writers via per-(table_key, branch) queue acquisition. pub(crate) async fn recover_manifest_drift( root_uri: &str, storage: std::sync::Arc, @@ -578,6 +825,21 @@ pub(crate) async fn recover_manifest_drift( for sidecar in sidecars { let branch_snapshot = match sidecar.branch.as_deref() { Some(b) => { + // Orphan check against the manifest's branch list (the + // authority) BEFORE opening — same classification as the + // write-entry heal: a deferred sidecar whose branch was + // deleted would otherwise fail every ReadWrite open. + coordinator.refresh().await?; + if !coordinator.all_branches().await?.iter().any(|name| name == b) { + discard_orphaned_branch_sidecar( + root_uri, + storage.as_ref(), + &sidecar, + coordinator.snapshot().version(), + ) + .await?; + continue; + } let mut branch_coord = GraphCoordinator::open_branch(root_uri, b, std::sync::Arc::clone(&storage)) .await?; @@ -611,7 +873,11 @@ async fn process_sidecar( sidecar: &RecoverySidecar, mode: RecoveryMode, schema_state_recovery: SchemaStateRecovery, -) -> Result<()> { +) -> Result { + // Returns whether durable state changed (roll-forward, roll-back, or + // stale-sidecar audit recovery). `false` = the sidecar was deferred + // untouched -- callers must not treat that as a completed heal (no + // schema reload / cache invalidation is warranted). let mut states = Vec::with_capacity(sidecar.tables.len()); for pin in &sidecar.tables { let lance_head = open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?; @@ -655,7 +921,7 @@ async fn process_sidecar( writer_kind = ?sidecar.writer_kind, "recovery: deferring sidecar with invariant violation to next ReadWrite open" ); - Ok(()) + Ok(false) } }, SidecarDecision::RollBack => { @@ -701,7 +967,8 @@ async fn process_sidecar( return record_audit_recovery_rollforward( root_uri, storage, snapshot, sidecar, &states, ) - .await; + .await + .map(|()| true); } if matches!(mode, RecoveryMode::RollForwardOnly) { // In-process recovery cannot run Dataset::restore safely @@ -713,14 +980,16 @@ async fn process_sidecar( writer_kind = ?sidecar.writer_kind, "recovery: deferring rollback-eligible sidecar to next ReadWrite open" ); - return Ok(()); + return Ok(false); } warn!( operation_id = sidecar.operation_id.as_str(), writer_kind = ?sidecar.writer_kind, "recovery: rolling back sidecar (mixed or unexpected state)" ); - roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await + roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states) + .await + .map(|()| true) } SidecarDecision::RollForward => { if matches!(sidecar.writer_kind, SidecarKind::SchemaApply) @@ -733,7 +1002,9 @@ async fn process_sidecar( "recovery: rolling back SchemaApply sidecar because schema staging \ files were not promoted in this recovery pass" ); - roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await + roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states) + .await + .map(|()| true) } RecoveryMode::RollForwardOnly => { warn!( @@ -741,7 +1012,7 @@ async fn process_sidecar( "recovery: deferring SchemaApply sidecar because schema staging files \ were not promoted in this recovery pass" ); - Ok(()) + Ok(false) } }; } @@ -788,7 +1059,7 @@ async fn process_sidecar( ) .await?; delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?; - Ok(()) + Ok(true) } } } @@ -1134,6 +1405,11 @@ async fn record_audit( kind: RecoveryKind, outcomes: Vec, ) -> Result<()> { + // Failpoint: models an audit write failure after the roll-forward / + // roll-back publish already landed — the sweep aborts, the sidecar + // stays, and re-entry records the audit row (see the retry note in + // the doc comment above). + crate::failpoints::maybe_fail("recovery.record_audit")?; // Non-main recovery commits must be appended on the sidecar branch's // commit graph, otherwise parent_commit_id comes from the global // main head. BranchMerge additionally records the source branch's @@ -1260,7 +1536,7 @@ pub(crate) fn new_sidecar( #[cfg(test)] mod tests { use super::*; - use crate::storage::LocalStorageAdapter; + use crate::storage::ObjectStorageAdapter; use crate::table_store::TableStore; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; @@ -1573,7 +1849,7 @@ mod tests { #[tokio::test] async fn list_sidecars_returns_empty_when_dir_missing() { let dir = tempfile::tempdir().unwrap(); - let storage = LocalStorageAdapter::default(); + let storage = ObjectStorageAdapter::local(); let result = list_sidecars(dir.path().to_str().unwrap(), &storage) .await .unwrap(); @@ -1583,10 +1859,10 @@ mod tests { #[tokio::test] async fn write_then_list_then_delete_round_trip() { let dir = tempfile::tempdir().unwrap(); - // Create the __recovery/ subdir so write_sidecar's parent exists - // (LocalStorageAdapter::write_text doesn't mkdir parents). - std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap(); - let storage = LocalStorageAdapter::default(); + // No pre-created __recovery/ subdir: the storage backend creates + // missing parents on put, which is what the first sidecar write + // of a fresh graph relies on. + let storage = ObjectStorageAdapter::local(); let root = dir.path().to_str().unwrap(); let sidecar = new_sidecar( @@ -1617,7 +1893,7 @@ mod tests { "noise", ) .unwrap(); - let storage = LocalStorageAdapter::default(); + let storage = ObjectStorageAdapter::local(); let result = list_sidecars(dir.path().to_str().unwrap(), &storage) .await .unwrap(); @@ -1633,7 +1909,7 @@ mod tests { async fn list_sidecars_returns_deterministic_order() { let dir = tempfile::tempdir().unwrap(); std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap(); - let storage = LocalStorageAdapter::default(); + let storage = ObjectStorageAdapter::local(); let root = dir.path().to_str().unwrap(); // Write sidecars in REVERSE chronological order (newest first). diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 50f5d34..779a2e0 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -378,10 +378,11 @@ impl Omnigraph { recover_schema_state_files(&root, Arc::clone(&storage), &coordinator.snapshot()) .await?; // Recovery sweep: close the Phase B → Phase C residual on - // any sidecar left over from a crashed writer. Continuous - // in-process recovery for long-running servers (no restart - // required between Phase B failure and recovery) is a - // separate background-reconciler effort. + // any sidecar left over from a crashed writer. Long-running + // processes additionally converge in-process: the staged- + // write entry points and `refresh` run the roll-forward-only + // heal (`heal_pending_sidecars_roll_forward`); only + // rollback-eligible sidecars wait for this open-time sweep. crate::db::manifest::recover_manifest_drift( &root, Arc::clone(&storage), @@ -755,7 +756,7 @@ impl Omnigraph { /// /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s /// recovery sequence, in the same order, with one restriction: the - /// manifest-drift sweep runs in `RollForwardOnly` mode (rollback / + /// manifest-drift heal runs in `RollForwardOnly` mode (rollback / /// abort cases defer to the next ReadWrite open because /// `Dataset::restore` is unsafe under concurrency). Each step: /// @@ -767,49 +768,123 @@ impl Omnigraph { /// SchemaApply roll-forward doesn't publish the manifest while /// the staging files remain unrenamed (which would corrupt the /// graph: data on new schema, catalog on old). - /// 3. `recover_manifest_drift(... RollForwardOnly)` — close the + /// 3. `heal_pending_sidecars_roll_forward` — close the /// finalize→publisher residual via roll-forward; defer rollback - /// work to next ReadWrite open. + /// work to next ReadWrite open. Serializes against live writers + /// by acquiring each sidecar's per-(table_key, branch) write + /// queues, so refresh never rolls forward an in-flight writer's + /// sidecar from under it. /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches. /// /// Steady state cost: one `list_dir` of `__recovery/` (typically /// returns empty → early return for both passes). No additional /// Lance reads. /// - /// Engine-internal callers that already hold an in-flight sidecar - /// (e.g. `schema_apply` mid-write) MUST use + /// The staged-write entry points (`load_as`, `mutate_as`) run the + /// same heal via + /// [`heal_pending_recovery_sidecars`](Self::heal_pending_recovery_sidecars), + /// so a long-lived server converges on the next write without an + /// explicit refresh. Engine-internal callers that already hold an + /// in-flight sidecar (e.g. `schema_apply` mid-write) MUST use /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to /// avoid the recovery sweep racing their own sidecar. pub async fn refresh(&self) -> Result<()> { - // Scope the coord write guard to the recovery section only. + // Standalone schema-staging reconcile ONLY when no recovery + // sidecar exists (legacy/manual staging residue). When sidecars + // exist, the heal below owns the reconcile — per SchemaApply + // sidecar, under that sidecar's queue guards — because an + // unserialized reconcile can promote a LIVE schema apply's + // staging files from under it, and a pre-promoted result would + // make the heal's own guarded reconcile see clean staging and + // wrongly defer the sidecar. The no-sidecar case cannot race a + // live apply: its sidecar is on disk before its staging files. + // + // Scope the coord write guard to the schema-state section only. // `reload_schema_if_source_changed` (below) acquires // `self.coordinator.read().await` when the on-disk schema source // has drifted from the cached `schema_source`. Tokio's RwLock is // not reentrant, so holding the write across that call deadlocks. // Pinned by `composite_flow_schema_apply_then_branch_ops_no_deadlock_in_refresh`. + // The heal also takes the lock itself (queues → coordinator + // order), so it must run after this guard is released. { - let mut coord = self.coordinator.write().await; - coord.refresh().await?; - let schema_state_recovery = recover_schema_state_files( - &self.root_uri, - Arc::clone(&self.storage), - &coord.snapshot(), - ) - .await?; - crate::db::manifest::recover_manifest_drift( - &self.root_uri, - Arc::clone(&self.storage), - &mut *coord, - crate::db::manifest::RecoveryMode::RollForwardOnly, - schema_state_recovery, - ) - .await?; - } // ← write guard released before reload's read acquisition + // Hold the schema-apply serialization key across the + // list-then-reconcile pair: without it, a live apply can + // write its sidecar + staging between the empty check and + // the reconcile (the same race, through a smaller window). + // Queue before coordinator — the documented lock order. + // + // Liveness note: with a pending NON-SchemaApply sidecar + // (e.g. a Mutation residual), this gate skips the standalone + // reconcile and the heal below reconciles only per + // SchemaApply sidecar — so pre-sidecar-era orphaned staging + // residue waits for the NEXT refresh after the sidecars are + // consumed. Convergence holds, one pass late. Do not "fix" + // by re-running the reconcile unserialized here: that is + // exactly the live-apply race this block exists to close. + let _serial = self + .write_queue + .acquire(&crate::db::manifest::schema_apply_serial_queue_key()) + .await; + if crate::db::manifest::list_sidecars(&self.root_uri, self.storage.as_ref()) + .await? + .is_empty() + { + let mut coord = self.coordinator.write().await; + coord.refresh().await?; + recover_schema_state_files( + &self.root_uri, + Arc::clone(&self.storage), + &coord.snapshot(), + ) + .await?; + } + } // ← guards released before the heal's queue acquisition + crate::db::manifest::heal_pending_sidecars_roll_forward( + &self.root_uri, + Arc::clone(&self.storage), + &self.coordinator, + &self.write_queue, + ) + .await?; self.reload_schema_if_source_changed().await?; self.runtime_cache.invalidate_all().await; Ok(()) } + /// Write-entry heal: converge any pending recovery sidecars (a + /// previously failed writer's Phase B → Phase C residual) before + /// starting a new staged write, so a long-lived process (the HTTP + /// server, an embedded handle) recovers on its next write instead + /// of wedging every write on the commit-time drift guard until + /// restart. Roll-forward only; rollback-eligible sidecars defer to + /// the next ReadWrite open exactly as [`refresh`](Self::refresh) + /// does. + /// + /// Steady-state cost: one `list_dir` of `__recovery/` (typically + /// empty → immediate return). See + /// `recovery::heal_pending_sidecars_roll_forward` for the + /// concurrency contract (per-table write-queue acquisition). + pub(crate) async fn heal_pending_recovery_sidecars(&self) -> Result<()> { + let processed = crate::db::manifest::heal_pending_sidecars_roll_forward( + &self.root_uri, + Arc::clone(&self.storage), + &self.coordinator, + &self.write_queue, + ) + .await?; + if processed { + // A rolled-forward SchemaApply sidecar moved disk + manifest + // to the new schema (staging promoted, registrations + // published); the in-memory catalog must follow or the very + // write that triggered the heal validates against the stale + // schema. Same post-heal step as `refresh`. + self.reload_schema_if_source_changed().await?; + self.runtime_cache.invalidate_all().await; + } + Ok(()) + } + async fn reload_schema_if_source_changed(&self) -> Result<()> { let schema_path = schema_source_uri(&self.root_uri); let schema_source = self.storage.read_text(&schema_path).await?; @@ -1951,7 +2026,7 @@ mod tests { use serde_json::Value; use std::sync::{Arc, Mutex}; - use crate::storage::{LocalStorageAdapter, StorageAdapter, join_uri}; + use crate::storage::{ObjectStorageAdapter, StorageAdapter, join_uri}; const TEST_SCHEMA: &str = r#" node Person { @@ -1967,9 +2042,9 @@ edge Knows: Person -> Person { edge WorksAt: Person -> Company "#; - #[derive(Debug, Default)] + #[derive(Debug)] struct RecordingStorageAdapter { - inner: LocalStorageAdapter, + inner: ObjectStorageAdapter, reads: Mutex>, writes: Mutex>, exists_checks: Mutex>, @@ -1977,6 +2052,19 @@ edge WorksAt: Person -> Company deletes: Mutex>, } + impl Default for RecordingStorageAdapter { + fn default() -> Self { + Self { + inner: ObjectStorageAdapter::local(), + reads: Mutex::default(), + writes: Mutex::default(), + exists_checks: Mutex::default(), + renames: Mutex::default(), + deletes: Mutex::default(), + } + } + } + impl RecordingStorageAdapter { fn reads(&self) -> Vec { self.reads.lock().unwrap().clone() @@ -2052,7 +2140,7 @@ edge WorksAt: Person -> Company #[derive(Debug)] struct InitRaceStorageAdapter { - inner: LocalStorageAdapter, + inner: ObjectStorageAdapter, root: String, barrier: Arc, } @@ -2117,7 +2205,7 @@ edge WorksAt: Person -> Company let uri = dir.path().to_str().unwrap().to_string(); let root = normalize_root_uri(&uri).unwrap(); let storage: Arc = Arc::new(InitRaceStorageAdapter { - inner: LocalStorageAdapter, + inner: ObjectStorageAdapter::local(), root, barrier: Arc::new(tokio::sync::Barrier::new(2)), }); diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 506db36..f965ad4 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -144,6 +144,14 @@ where actor, )?; + // Converge any pending recovery sidecar before planning: a table + // rewrite over sidecar-covered drift would otherwise re-plan from + // the manifest pin and orphan the drifted Phase-B commit (silently + // dropping its rows) while the stale sidecar lingers to misclassify + // against the post-apply pins. Runs before the apply's own sidecar + // exists, so the heal can never observe it. + db.heal_pending_recovery_sidecars().await?; + acquire_schema_apply_lock(db).await?; let result = apply_schema_with_lock(db, desired_schema_source, options, validate_catalog).await; let release_result = release_schema_apply_lock(db).await; @@ -428,19 +436,30 @@ where // per-table acquisitions are uncontended. They exist for symmetry // with future MR-870 recovery, which will need queue acquisition // before any `Dataset::restore` it issues for SchemaApply sidecars. - let schema_apply_queue_keys: Vec<(String, Option)> = recovery_pins + let mut schema_apply_queue_keys: Vec<(String, Option)> = recovery_pins .iter() .map(|pin| (pin.table_key.clone(), pin.table_branch.clone())) .collect(); + // The serialization key the write-entry heal acquires before touching + // schema staging or a SchemaApply sidecar. Per-table keys alone don't + // cover a registration-only migration (no pins, but a sidecar and + // staging files on disk) — without this, a concurrent write's heal can + // promote this apply's staging files and publish its registrations out + // from under it. Acquired whenever a sidecar will be written, held + // through Phase D (the guards live to the end of this function). + let writes_sidecar = !(recovery_pins.is_empty() + && sidecar_registrations.is_empty() + && sidecar_tombstones.is_empty()); + if writes_sidecar { + schema_apply_queue_keys + .push(crate::db::manifest::schema_apply_serial_queue_key()); + } let _schema_apply_queue_guards = db .write_queue() .acquire_many(&schema_apply_queue_keys) .await; - let recovery_handle = if recovery_pins.is_empty() - && sidecar_registrations.is_empty() - && sidecar_tombstones.is_empty() - { + let recovery_handle = if !writes_sidecar { None } else { // `branch=None` because schema_apply publishes against main — diff --git a/crates/omnigraph/src/db/recovery_audit.rs b/crates/omnigraph/src/db/recovery_audit.rs index b9e8e7b..2aab6bc 100644 --- a/crates/omnigraph/src/db/recovery_audit.rs +++ b/crates/omnigraph/src/db/recovery_audit.rs @@ -43,6 +43,11 @@ const RECOVERIES_DIR: &str = "_graph_commit_recoveries.lance"; pub(crate) enum RecoveryKind { RolledForward, RolledBack, + /// The sidecar's branch no longer exists in the manifest: its tree + /// and forks are reclaimed, the pinned drift is unreachable, and the + /// sidecar is provably moot — discarded with this audit row instead + /// of wedging every heal/sweep on a dead-branch open. + OrphanedBranchDiscarded, } impl RecoveryKind { @@ -50,6 +55,7 @@ impl RecoveryKind { match self { RecoveryKind::RolledForward => "RolledForward", RecoveryKind::RolledBack => "RolledBack", + RecoveryKind::OrphanedBranchDiscarded => "OrphanedBranchDiscarded", } } @@ -57,6 +63,7 @@ impl RecoveryKind { match s { "RolledForward" => Ok(RecoveryKind::RolledForward), "RolledBack" => Ok(RecoveryKind::RolledBack), + "OrphanedBranchDiscarded" => Ok(RecoveryKind::OrphanedBranchDiscarded), other => Err(OmniError::manifest_internal(format!( "unknown recovery_kind '{}' in _graph_commit_recoveries.lance", other diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index f245d15..ea16b15 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1081,6 +1081,13 @@ impl Omnigraph { actor_id, )?; self.ensure_schema_apply_idle("branch_merge").await?; + // Converge any pending recovery sidecar before the merge + // captures its target snapshot: the merge's publish would + // otherwise make the drifted Phase-B commit visible as an + // unattributed side effect (manifest catches up to HEAD with no + // recovery audit row) and leave the stale sidecar behind. Runs + // before the merge's own sidecar exists. + self.heal_pending_recovery_sidecars().await?; self.branch_merge_impl(source, target, actor_id).await } diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index e537d0d..e9051c4 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -715,6 +715,14 @@ impl Omnigraph { actor_id: Option<&str>, ) -> Result { self.ensure_schema_state_valid().await?; + // Converge any pending recovery sidecar (a previously failed + // writer's Phase B → Phase C residual) before executing: the + // inline delete path advances Lance HEAD during execution and + // the staged path's commit-time drift guard refuses + // sidecar-covered drift, so a long-lived handle must heal here + // — not at restart. One `list_dir` when no sidecars exist (the + // steady state). + self.heal_pending_recovery_sidecars().await?; let requested = Self::normalize_branch_name(branch)?; // Reject internal `__run__*` / system-prefixed branches at the // public write boundary. Direct-publish paths assert this diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index a3932b0..cbfd52d 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -599,9 +599,53 @@ impl StagedMutation { ))); } if head > current { + // Error path only: tell the operator which drift class + // this is. Uncovered drift (external raw Lance write, + // pre-fix maintenance) goes through `omnigraph repair`. + // Sidecar-covered drift reaching this guard means the + // write-entry heal deferred it (rollback-eligible), and + // `repair` refuses while a sidecar is pending — the + // recovery path is a read-write reopen. A list failure + // must not mask the conflict — and must not pick a + // class confidently either: "could not classify" names + // both paths and the cause, never routing the operator + // to a command that will refuse. + let action = match crate::db::manifest::list_sidecars( + db.root_uri(), + db.storage_adapter(), + ) + .await + { + Ok(sidecars) => { + let covered = sidecars.iter().any(|sidecar| { + sidecar.tables.iter().any(|pin| { + // Branch-aware: a sidecar pinning the + // same table on ANOTHER branch does not + // cover this branch's drift — a reopen + // would recover that sidecar but leave + // this drift for `repair`. + pin.table_key == entry.table_key + && pin.table_branch == entry.path.table_branch + }) + }); + if covered { + "a pending recovery sidecar requires rollback — reopen the \ + graph read-write (e.g. restart the server) to recover" + .to_string() + } else { + "run `omnigraph repair` before writing".to_string() + } + } + Err(list_err) => format!( + "could not classify the drift (sidecar listing failed: {}); \ + run `omnigraph repair`, or reopen the graph read-write if \ + repair reports a pending recovery sidecar", + list_err + ), + }; return Err(OmniError::manifest_conflict(format!( - "table '{}' has Lance HEAD version {} ahead of manifest version {}; run `omnigraph repair` before writing", - entry.table_key, head, current + "table '{}' has Lance HEAD version {} ahead of manifest version {}; {}", + entry.table_key, head, current, action ))); } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 09c2f7c..69ada79 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -188,6 +188,13 @@ impl Omnigraph { actor_id, )?; self.ensure_schema_state_valid().await?; + // Converge any pending recovery sidecar (a previously failed + // writer's Phase B → Phase C residual) before staging anything: + // without this, sidecar-covered drift wedges every load on the + // commit-time drift guard until a process restart — `repair` + // refuses while a sidecar is pending. One `list_dir` when no + // sidecars exist (the steady state). + self.heal_pending_recovery_sidecars().await?; // Reject internal `__run__*` / system-prefixed branches at the // public write boundary. Direct-publish paths assert this // explicitly so a caller can't write to legacy or system diff --git a/crates/omnigraph/src/storage.rs b/crates/omnigraph/src/storage.rs index 187a6d6..1f96b39 100644 --- a/crates/omnigraph/src/storage.rs +++ b/crates/omnigraph/src/storage.rs @@ -1,14 +1,15 @@ use std::env; use std::fmt::Debug; -use std::path::{Path, PathBuf}; +use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use async_trait::async_trait; use futures::TryStreamExt; use object_store::aws::AmazonS3Builder; +use object_store::local::LocalFileSystem; +use object_store::memory::InMemory; use object_store::path::Path as ObjectPath; use object_store::{DynObjectStore, ObjectStore, PutMode, PutPayload}; -use tokio::io::AsyncWriteExt; use url::Url; use crate::error::{OmniError, Result}; @@ -38,20 +39,28 @@ pub trait StorageAdapter: Debug + Send + Sync { /// List all files (non-recursively, files only) directly under `dir_uri`. /// Returns full URIs (same scheme as `dir_uri`). The result is unordered. /// Returns Ok(empty) if the directory does not exist or is empty. + /// Consumers must tolerate non-payload residue appearing in storage + /// (backend staging files are filtered by the backend, but crash residue + /// of any future producer may not be) — filter by suffix, never assume + /// every entry is yours. async fn list_dir(&self, dir_uri: &str) -> Result>; - /// Read a text object together with its backend version token (S3: the - /// object's ETag; local: sha256 of the content). The token is opaque — - /// valid only for `write_text_if_match` against the same adapter. + /// Read a text object together with its backend version token (stores + /// with conditional-update support: the object's ETag; local: sha256 of + /// the content). The token is opaque — valid only for + /// `write_text_if_match` against the same adapter. async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)>; /// Replace the object at `uri` only if its current version still matches /// `expected_version` (obtained from a prior versioned read/write on this /// adapter). Returns `Ok(Some(new_version))` on success and `Ok(None)` /// when the precondition failed (a concurrent writer won — the CAS-lost - /// case callers must surface, never swallow). S3 uses a conditional put - /// (If-Match); local compares content then replaces via temp + rename — - /// the same single-machine semantics the callers had before this trait, - /// safe under the callers' own lock protocol but not a cross-process - /// barrier by itself. + /// case callers must surface, never swallow). Stores with conditional + /// updates (S3, in-memory) use a true conditional put (If-Match); the + /// local filesystem has no such primitive (`PutMode::Update` is + /// unimplemented upstream), so local compares content then replaces via + /// an atomic staged write — the same single-machine semantics the + /// callers had before this trait, safe under the callers' own lock + /// protocol but not a cross-process barrier by itself (see the Known + /// Gaps entry in docs/dev/invariants.md). async fn write_text_if_match( &self, uri: &str, @@ -59,14 +68,18 @@ pub trait StorageAdapter: Debug + Send + Sync { expected_version: &str, ) -> Result>; /// Recursively delete every object under `prefix_uri`. Returns Ok(()) - /// when nothing exists there (idempotent). Local: `remove_dir_all`; - /// S3: list + delete (NOT atomic — callers must tolerate partial - /// prefixes on crash, which the cluster delete protocol does by retry). + /// when nothing exists there (idempotent). Local: `remove_dir_all` + /// (directories are a local-FS concept; list+delete would leave empty + /// directory skeletons that local existence probes report as present); + /// object stores: list + delete (NOT atomic — callers must tolerate + /// partial prefixes on crash, which the cluster delete protocol does by + /// retry). async fn delete_prefix(&self, prefix_uri: &str) -> Result<()>; } -/// Version token for local files: content identity. ETags are unavailable -/// on the filesystem; sha256 is stable, cheap at these object sizes, and +/// Version token for local files: content identity. The local filesystem +/// backend reports mtime-derived ETags too coarse for CAS (sub-granularity +/// rewrites collide); sha256 is stable, cheap at these object sizes, and /// already the cluster ledger's CAS vocabulary. fn local_version_token(bytes: &[u8]) -> String { use sha2::{Digest, Sha256}; @@ -80,13 +93,34 @@ pub enum StorageKind { S3, } -#[derive(Debug, Default)] -pub struct LocalStorageAdapter; - +/// The one storage implementation: every backend is an +/// [`object_store::ObjectStore`], so the semantics (atomic-visibility puts, +/// conditional creates, path-delimited listing) are upstream-maintained and +/// identical across backends by construction. The per-backend residue is +/// confined to [`UriCodec`] (URI ↔ object path mapping) and the +/// `supports_conditional_update` capability flag (false only for the local +/// filesystem, where upstream `PutMode::Update` is unimplemented). #[derive(Debug)] -pub struct S3StorageAdapter { - bucket: String, +pub struct ObjectStorageAdapter { store: Arc, + codec: UriCodec, + /// Whether the backend implements `PutMode::Update` (ETag-conditioned + /// put). Gates BOTH the version-token source in `read_text_versioned` + /// and the `write_text_if_match` strategy — the two must agree or every + /// CAS loses. + supports_conditional_update: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum UriCodec { + /// Plain absolute/relative paths or `file://` URIs, mapped onto a + /// root-anchored [`LocalFileSystem`]. + Local, + /// `s3://{bucket}/{key}` URIs, mapped onto a bucket-scoped store. + S3 { bucket: String }, + /// Opaque keys for the in-memory test/embedded backend; leading + /// slashes are stripped. + Memory, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -95,357 +129,22 @@ struct S3Location { key: String, } -#[async_trait] -impl StorageAdapter for LocalStorageAdapter { - async fn read_text(&self, uri: &str) -> Result { - let path = local_path_from_uri(uri)?; - Ok(tokio::fs::read_to_string(&path).await?) - } - - async fn write_text(&self, uri: &str, contents: &str) -> Result<()> { - let path = local_path_from_uri(uri)?; - // Ensure parent directory exists. S3 has no equivalent (PutObject - // is path-agnostic). For local fs, callers like the recovery - // sidecar protocol expect transparent directory creation under - // the graph root (the `__recovery/` directory doesn't pre-exist; - // first sidecar write creates it). - if let Some(parent) = path.parent() { - if !parent.as_os_str().is_empty() { - tokio::fs::create_dir_all(parent).await?; - } - } - tokio::fs::write(&path, contents).await?; - Ok(()) - } - - async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result { - let path = local_path_from_uri(uri)?; - if let Some(parent) = path.parent() { - if !parent.as_os_str().is_empty() { - tokio::fs::create_dir_all(parent).await?; - } - } - let mut file = match tokio::fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&path) - .await - { - Ok(file) => file, - Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => return Ok(false), - Err(err) => return Err(err.into()), - }; - if let Err(err) = file.write_all(contents.as_bytes()).await { - let _ = tokio::fs::remove_file(&path).await; - return Err(err.into()); - } - // tokio's async File buffers internally: without an explicit flush, - // write_all only fills the buffer and the actual OS write happens in - // a background task AFTER this fn returns — a reader can then see - // the created-but-still-empty file (caught twice in CI as an - // "EOF while parsing" on a state.json read right after import). - // Flushing before Ok restores write-then-read consistency, matching - // tokio::fs::write (which flushes internally) used by every other - // write path here. - if let Err(err) = file.flush().await { - let _ = tokio::fs::remove_file(&path).await; - return Err(err.into()); - } - Ok(true) - } - - async fn exists(&self, uri: &str) -> Result { - Ok(local_path_from_uri(uri)?.exists()) - } - - async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { - let from = local_path_from_uri(from_uri)?; - let to = local_path_from_uri(to_uri)?; - tokio::fs::rename(&from, &to).await?; - Ok(()) - } - - async fn delete(&self, uri: &str) -> Result<()> { - let path = local_path_from_uri(uri)?; - match tokio::fs::remove_file(&path).await { - Ok(()) => Ok(()), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(err) => Err(err.into()), +impl ObjectStorageAdapter { + /// Local-filesystem backend rooted at `/`. URIs are plain paths or + /// `file://` URIs; relative paths are lexically absolutized against the + /// current working directory. + pub fn local() -> Self { + Self { + store: Arc::new(LocalFileSystem::new()), + codec: UriCodec::Local, + supports_conditional_update: false, } } - async fn list_dir(&self, dir_uri: &str) -> Result> { - let path = local_path_from_uri(dir_uri)?; - let mut out = Vec::new(); - let mut entries = match tokio::fs::read_dir(&path).await { - Ok(e) => e, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(out), - Err(err) => return Err(err.into()), - }; - let dir_str = dir_uri.trim_end_matches('/'); - while let Some(entry) = entries.next_entry().await? { - let ft = entry.file_type().await?; - if !ft.is_file() { - continue; - } - if let Some(name) = entry.file_name().to_str() { - out.push(format!("{}/{}", dir_str, name)); - } - } - Ok(out) - } - - async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { - let path = local_path_from_uri(uri)?; - let bytes = tokio::fs::read(&path).await?; - let version = local_version_token(&bytes); - let text = String::from_utf8(bytes).map_err(|err| { - OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err)) - })?; - Ok((text, version)) - } - - async fn write_text_if_match( - &self, - uri: &str, - contents: &str, - expected_version: &str, - ) -> Result> { - let path = local_path_from_uri(uri)?; - let current = match tokio::fs::read(&path).await { - Ok(bytes) => bytes, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(err) => return Err(err.into()), - }; - if local_version_token(¤t) != expected_version { - return Ok(None); - } - let tmp = path.with_extension(format!("tmp.{}", ulid::Ulid::new())); - tokio::fs::write(&tmp, contents.as_bytes()).await?; - if let Err(err) = tokio::fs::rename(&tmp, &path).await { - let _ = tokio::fs::remove_file(&tmp).await; - return Err(err.into()); - } - Ok(Some(local_version_token(contents.as_bytes()))) - } - - async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { - let path = local_path_from_uri(prefix_uri)?; - match tokio::fs::remove_dir_all(&path).await { - Ok(()) => Ok(()), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(err) => Err(err.into()), - } - } -} - -#[async_trait] -impl StorageAdapter for S3StorageAdapter { - async fn read_text(&self, uri: &str) -> Result { - let location = self.object_path(uri)?; - let bytes = self - .store - .get(&location) - .await - .map_err(|err| storage_backend_error("read", uri, err))? - .bytes() - .await - .map_err(|err| storage_backend_error("read", uri, err))?; - - String::from_utf8(bytes.to_vec()).map_err(|err| { - OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err)) - }) - } - - async fn write_text(&self, uri: &str, contents: &str) -> Result<()> { - let location = self.object_path(uri)?; - self.store - .put(&location, PutPayload::from(contents.as_bytes().to_vec())) - .await - .map_err(|err| storage_backend_error("write", uri, err))?; - Ok(()) - } - - async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result { - let location = self.object_path(uri)?; - match self - .store - .put_opts( - &location, - PutPayload::from(contents.as_bytes().to_vec()), - PutMode::Create.into(), - ) - .await - { - Ok(_) => Ok(true), - Err(object_store::Error::AlreadyExists { .. }) - | Err(object_store::Error::Precondition { .. }) => Ok(false), - Err(err) => Err(storage_backend_error("write_if_absent", uri, err)), - } - } - - async fn exists(&self, uri: &str) -> Result { - let location = self.object_path(uri)?; - match self.store.head(&location).await { - Ok(_) => Ok(true), - Err(object_store::Error::NotFound { .. }) => { - let mut entries = self.store.list(Some(&location)); - let has_prefix_entries = entries - .try_next() - .await - .map_err(|err| storage_backend_error("exists", uri, err))? - .is_some(); - Ok(has_prefix_entries) - } - Err(err) => Err(storage_backend_error("exists", uri, err)), - } - } - - async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { - // S3 has no atomic rename. Copy then delete; if the copy succeeds and - // the delete fails (or the process crashes between them), both - // source and destination exist with the same content. Recovery code - // must tolerate this case — see schema_state::recover_schema_state_files. - let from = self.object_path(from_uri)?; - let to = self.object_path(to_uri)?; - self.store - .copy(&from, &to) - .await - .map_err(|err| storage_backend_error("rename:copy", from_uri, err))?; - self.store - .delete(&from) - .await - .map_err(|err| storage_backend_error("rename:delete", from_uri, err))?; - Ok(()) - } - - async fn delete(&self, uri: &str) -> Result<()> { - let location = self.object_path(uri)?; - match self.store.delete(&location).await { - Ok(()) => Ok(()), - Err(object_store::Error::NotFound { .. }) => Ok(()), - Err(err) => Err(storage_backend_error("delete", uri, err)), - } - } - - async fn list_dir(&self, dir_uri: &str) -> Result> { - // Normalize: ensure the URI describes a directory (trailing '/') so - // we don't match sibling paths with a shared prefix - // (e.g. listing `__recovery` shouldn't match `__recovery_log/...`). - let dir_with_slash = if dir_uri.ends_with('/') { - dir_uri.to_string() - } else { - format!("{}/", dir_uri) - }; - // object_store::Path strips the trailing '/'; re-add it for filtering. - let prefix_loc = self.object_path(&dir_with_slash)?; - let prefix_with_slash = format!("{}/", prefix_loc.as_ref()); - - let mut entries = self.store.list(Some(&prefix_loc)); - let mut out = Vec::new(); - let bucket_root = format!("{}{}/", S3_SCHEME_PREFIX, self.bucket); - while let Some(meta) = entries - .try_next() - .await - .map_err(|err| storage_backend_error("list_dir", dir_uri, err))? - { - let key_str = meta.location.as_ref(); - // Require the directory boundary to filter out sibling-prefix - // matches (object_store's `list` is prefix-based, not dir-based). - if !key_str.starts_with(&prefix_with_slash) { - continue; - } - let suffix = &key_str[prefix_with_slash.len()..]; - // Non-recursive: skip anything inside a sub-directory. - if suffix.contains('/') { - continue; - } - out.push(format!("{}{}", bucket_root, key_str)); - } - Ok(out) - } - - async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { - let location = self.object_path(uri)?; - let result = self - .store - .get(&location) - .await - .map_err(|err| storage_backend_error("read", uri, err))?; - let etag = result.meta.e_tag.clone(); - let bytes = result - .bytes() - .await - .map_err(|err| storage_backend_error("read", uri, err))?; - // Every S3-compatible store we target returns ETags; fall back to a - // content token rather than failing if one ever omits it. - let version = etag.unwrap_or_else(|| local_version_token(&bytes)); - let text = String::from_utf8(bytes.to_vec()).map_err(|err| { - OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err)) - })?; - Ok((text, version)) - } - - async fn write_text_if_match( - &self, - uri: &str, - contents: &str, - expected_version: &str, - ) -> Result> { - let location = self.object_path(uri)?; - let mode = PutMode::Update(object_store::UpdateVersion { - e_tag: Some(expected_version.to_string()), - version: None, - }); - match self - .store - .put_opts( - &location, - PutPayload::from(contents.as_bytes().to_vec()), - mode.into(), - ) - .await - { - Ok(result) => Ok(Some( - result - .e_tag - .unwrap_or_else(|| local_version_token(contents.as_bytes())), - )), - Err(object_store::Error::Precondition { .. }) - | Err(object_store::Error::NotFound { .. }) => Ok(None), - Err(err) => Err(storage_backend_error("write_if_match", uri, err)), - } - } - - async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { - let dir_with_slash = if prefix_uri.ends_with('/') { - prefix_uri.to_string() - } else { - format!("{}/", prefix_uri) - }; - let prefix_loc = self.object_path(&dir_with_slash)?; - let mut entries = self.store.list(Some(&prefix_loc)); - let mut locations = Vec::new(); - while let Some(meta) = entries - .try_next() - .await - .map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))? - { - locations.push(meta.location); - } - for location in locations { - match self.store.delete(&location).await { - Ok(()) => {} - Err(object_store::Error::NotFound { .. }) => {} - Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)), - } - } - Ok(()) - } -} - -impl S3StorageAdapter { - fn from_root_uri(root_uri: &str) -> Result { + /// S3 backend scoped to the bucket named in `root_uri`. Credentials and + /// endpoint come from the standard `AWS_*` environment variables (the + /// same ones Lance reads for its dataset stores). + pub fn s3_from_root_uri(root_uri: &str) -> Result { let location = parse_s3_uri(root_uri)?; let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket); @@ -471,29 +170,311 @@ impl S3StorageAdapter { })?; Ok(Self { - bucket: location.bucket, store: Arc::new(store), + codec: UriCodec::S3 { + bucket: location.bucket, + }, + supports_conditional_update: true, }) } + /// In-memory backend for tests and embedded experiments. Implements the + /// FULL contract including true conditional updates (unlike the local + /// filesystem), so contract tests exercise the strong-CAS path without a + /// bucket. State lives only as long as the adapter. + pub fn in_memory() -> Self { + Self { + store: Arc::new(InMemory::new()), + codec: UriCodec::Memory, + supports_conditional_update: true, + } + } + fn object_path(&self, uri: &str) -> Result { - let location = parse_s3_uri(uri)?; - if location.bucket != self.bucket { - return Err(OmniError::manifest_internal(format!( - "s3 storage bucket mismatch for '{}': expected '{}', found '{}'", - uri, self.bucket, location.bucket - ))); + match &self.codec { + UriCodec::Local => { + let path = absolutize_lexically(local_path_from_uri(uri)?)?; + ObjectPath::from_absolute_path(&path).map_err(|err| { + OmniError::manifest_internal(format!( + "invalid local object path for '{}': {}", + uri, err + )) + }) + } + UriCodec::S3 { bucket } => { + let location = parse_s3_uri(uri)?; + if &location.bucket != bucket { + return Err(OmniError::manifest_internal(format!( + "s3 storage bucket mismatch for '{}': expected '{}', found '{}'", + uri, bucket, location.bucket + ))); + } + if location.key.is_empty() { + return Err(OmniError::manifest_internal(format!( + "s3 storage path is empty for '{}'", + uri + ))); + } + ObjectPath::parse(&location.key).map_err(|err| { + OmniError::manifest_internal(format!( + "invalid s3 object path for '{}': {}", + uri, err + )) + }) + } + UriCodec::Memory => { + ObjectPath::parse(uri.trim_start_matches('/')).map_err(|err| { + OmniError::manifest_internal(format!( + "invalid memory object path for '{}': {}", + uri, err + )) + }) + } } - if location.key.is_empty() { - return Err(OmniError::manifest_internal(format!( - "s3 storage path is empty for '{}'", - uri - ))); - } - ObjectPath::parse(&location.key).map_err(|err| { - OmniError::manifest_internal(format!("invalid s3 object path for '{}': {}", uri, err)) + } +} + +#[async_trait] +impl StorageAdapter for ObjectStorageAdapter { + async fn read_text(&self, uri: &str) -> Result { + let location = self.object_path(uri)?; + let bytes = self + .store + .get(&location) + .await + .map_err(|err| storage_backend_error("read", uri, err))? + .bytes() + .await + .map_err(|err| storage_backend_error("read", uri, err))?; + + String::from_utf8(bytes.to_vec()).map_err(|err| { + OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err)) }) } + + async fn write_text(&self, uri: &str, contents: &str) -> Result<()> { + // Atomic visibility is the backend's contract: object stores via + // PutObject; LocalFileSystem via an internal staged-temp + rename + // (a reader sees the old object or the new one, never a truncated + // in-progress write). Callers (sidecar protocol, cluster state) + // assume it. + let location = self.object_path(uri)?; + self.store + .put(&location, PutPayload::from(contents.as_bytes().to_vec())) + .await + .map_err(|err| storage_backend_error("write", uri, err))?; + Ok(()) + } + + async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result { + // PutMode::Create: atomic no-replace publish on every backend — + // exactly one of N concurrent claimants wins, and the winner's + // object is fully readable at the instant it becomes visible + // (LocalFileSystem stages the temp file completely, then + // hard_links it; pinned by + // `local_write_text_if_absent_is_read_visible_on_return`). + let location = self.object_path(uri)?; + match self + .store + .put_opts( + &location, + PutPayload::from(contents.as_bytes().to_vec()), + PutMode::Create.into(), + ) + .await + { + Ok(_) => Ok(true), + Err(object_store::Error::AlreadyExists { .. }) + | Err(object_store::Error::Precondition { .. }) => Ok(false), + Err(err) => Err(storage_backend_error("write_if_absent", uri, err)), + } + } + + async fn exists(&self, uri: &str) -> Result { + // head() answers for objects; the list fallback answers for + // "directory-shaped" URIs (e.g. a Lance dataset root, whose + // `_versions/*.manifest` makes any committed dataset non-empty). + // Object-store semantics throughout: only objects exist — + // an EMPTY local directory does not (callers that probe local + // directories use std::fs directly). + let location = self.object_path(uri)?; + match self.store.head(&location).await { + Ok(_) => Ok(true), + Err(object_store::Error::NotFound { .. }) => { + let mut entries = self.store.list(Some(&location)); + let has_prefix_entries = entries + .try_next() + .await + .map_err(|err| storage_backend_error("exists", uri, err))? + .is_some(); + Ok(has_prefix_entries) + } + Err(err) => Err(storage_backend_error("exists", uri, err)), + } + } + + async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { + // ObjectStore::rename: LocalFileSystem overrides it with an atomic + // fs::rename (creating missing destination parents); object stores + // use the default copy + delete — if the copy succeeds and the + // delete fails (or the process crashes between them), both source + // and destination exist with the same content. Recovery code must + // tolerate this case — see schema_state::recover_schema_state_files. + let from = self.object_path(from_uri)?; + let to = self.object_path(to_uri)?; + self.store + .rename(&from, &to) + .await + .map_err(|err| storage_backend_error("rename", from_uri, err))?; + Ok(()) + } + + async fn delete(&self, uri: &str) -> Result<()> { + let location = self.object_path(uri)?; + match self.store.delete(&location).await { + Ok(()) => Ok(()), + Err(object_store::Error::NotFound { .. }) => Ok(()), + Err(err) => Err(storage_backend_error("delete", uri, err)), + } + } + + async fn list_dir(&self, dir_uri: &str) -> Result> { + // list_with_delimiter is non-recursive and path-delimited on every + // backend (no sibling-prefix bleed: listing `__recovery` cannot + // match `__recovery_log/...`), and returns Ok(empty) for a missing + // directory. Output URIs are anchored on the INPUT `dir_uri` plus + // the entry filename, so the strings round-trip byte-identically + // into read_text/delete regardless of scheme (plain path, file://, + // s3://). + let anchor = dir_uri.trim_end_matches('/'); + let prefix = self.object_path(anchor)?; + let listing = self + .store + .list_with_delimiter(Some(&prefix)) + .await + .map_err(|err| storage_backend_error("list_dir", dir_uri, err))?; + let mut out = Vec::with_capacity(listing.objects.len()); + for meta in listing.objects { + if let Some(name) = meta.location.filename() { + out.push(format!("{}/{}", anchor, name)); + } + } + Ok(out) + } + + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { + let location = self.object_path(uri)?; + let result = self + .store + .get(&location) + .await + .map_err(|err| storage_backend_error("read", uri, err))?; + let etag = result.meta.e_tag.clone(); + let bytes = result + .bytes() + .await + .map_err(|err| storage_backend_error("read", uri, err))?; + // The token SOURCE must agree with the write_text_if_match strategy + // below: conditional-update backends compare ETags server-side, so + // the token is the ETag; the local emulation compares content, so + // the token is the content hash. Mixing them makes every CAS lose. + let version = if self.supports_conditional_update { + // Every S3-compatible store we target returns ETags; fall back + // to a content token rather than failing if one ever omits it. + etag.unwrap_or_else(|| local_version_token(&bytes)) + } else { + local_version_token(&bytes) + }; + let text = String::from_utf8(bytes.to_vec()).map_err(|err| { + OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err)) + })?; + Ok((text, version)) + } + + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result> { + let location = self.object_path(uri)?; + if self.supports_conditional_update { + let mode = PutMode::Update(object_store::UpdateVersion { + e_tag: Some(expected_version.to_string()), + version: None, + }); + return match self + .store + .put_opts( + &location, + PutPayload::from(contents.as_bytes().to_vec()), + mode.into(), + ) + .await + { + Ok(result) => Ok(Some( + result + .e_tag + .unwrap_or_else(|| local_version_token(contents.as_bytes())), + )), + Err(object_store::Error::Precondition { .. }) + | Err(object_store::Error::NotFound { .. }) => Ok(None), + Err(err) => Err(storage_backend_error("write_if_match", uri, err)), + }; + } + // Local emulation: content-compare then atomic replace. NOT a + // cross-process CAS (check-then-act gap) — safe under the callers' + // lock protocol only; tracked in docs/dev/invariants.md Known Gaps. + let current = match self.store.get(&location).await { + Ok(result) => result + .bytes() + .await + .map_err(|err| storage_backend_error("read", uri, err))?, + Err(object_store::Error::NotFound { .. }) => return Ok(None), + Err(err) => return Err(storage_backend_error("read", uri, err)), + }; + if local_version_token(¤t) != expected_version { + return Ok(None); + } + self.store + .put(&location, PutPayload::from(contents.as_bytes().to_vec())) + .await + .map_err(|err| storage_backend_error("write_if_match", uri, err))?; + Ok(Some(local_version_token(contents.as_bytes()))) + } + + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { + // Directories are a local-FS concept: a list+delete loop would + // leave empty directory skeletons that local existence probes + // (cluster graph_root_exists uses std Path::exists) report as + // still-present. remove_dir_all reclaims them in one call. + if self.codec == UriCodec::Local { + let path = absolutize_lexically(local_path_from_uri(prefix_uri)?)?; + return match tokio::fs::remove_dir_all(&path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + }; + } + let prefix = self.object_path(prefix_uri.trim_end_matches('/'))?; + let mut entries = self.store.list(Some(&prefix)); + let mut locations = Vec::new(); + while let Some(meta) = entries + .try_next() + .await + .map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))? + { + locations.push(meta.location); + } + for location in locations { + match self.store.delete(&location).await { + Ok(()) => {} + Err(object_store::Error::NotFound { .. }) => {} + Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)), + } + } + Ok(()) + } } pub fn storage_kind_for_uri(uri: &str) -> StorageKind { @@ -506,8 +487,8 @@ pub fn storage_kind_for_uri(uri: &str) -> StorageKind { pub fn storage_for_uri(uri: &str) -> Result> { match storage_kind_for_uri(uri) { - StorageKind::Local => Ok(Arc::new(LocalStorageAdapter)), - StorageKind::S3 => Ok(Arc::new(S3StorageAdapter::from_root_uri(uri)?)), + StorageKind::Local => Ok(Arc::new(ObjectStorageAdapter::local())), + StorageKind::S3 => Ok(Arc::new(ObjectStorageAdapter::s3_from_root_uri(uri)?)), } } @@ -553,6 +534,38 @@ fn local_path_from_uri(uri: &str) -> Result { Ok(PathBuf::from(uri)) } +/// Lexically absolutize a local path: join relative paths onto the current +/// working directory and fold `.` / `..` components, without touching the +/// filesystem. Required because `object_store::path::Path` rejects +/// relative and dot segments, while callers (the CLI in particular) pass +/// paths like `./graph.omni` verbatim. +fn absolutize_lexically(path: PathBuf) -> Result { + let joined = if path.is_absolute() { + path + } else { + std::env::current_dir() + .map_err(|err| { + OmniError::manifest_internal(format!( + "cannot resolve relative storage path '{}': {}", + path.display(), + err + )) + })? + .join(path) + }; + let mut out = PathBuf::new(); + for component in joined.components() { + match component { + Component::CurDir => {} + Component::ParentDir => { + out.pop(); + } + other => out.push(other), + } + } + Ok(out) +} + fn local_path_from_file_uri(uri: &str) -> Result { let url = Url::parse(uri).map_err(|err| { OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err)) @@ -610,17 +623,185 @@ fn env_var_truthy(key: &str) -> bool { #[cfg(test)] mod tests { + use super::*; - /// Regression for the write_text_if_absent buffering bug: a reader - /// immediately after Ok(true) must never see the created file empty. - /// The failure is timing-dependent (tokio's background write task), so - /// this loop is a best-effort local reproducer — the recorded red is - /// two CI failures ("EOF while parsing" on a state.json read right - /// after cluster import). + /// The executable backend contract: every assertion here must hold for + /// EVERY backend (the divergence class this adapter closed was "two + /// implementations, one prose contract, no referee"). The S3 variant + /// runs bucket-gated in `tests/s3_storage.rs` + /// (`s3_adapter_conditional_writes_contract`). + async fn contract_suite(adapter: &dyn StorageAdapter, root: &str) { + // Write/read round-trip; replace is in-place and atomic. + let a = format!("{root}/contract/a.json"); + adapter.write_text(&a, "v1").await.unwrap(); + assert_eq!(adapter.read_text(&a).await.unwrap(), "v1"); + adapter.write_text(&a, "v2").await.unwrap(); + assert_eq!(adapter.read_text(&a).await.unwrap(), "v2"); + + // exists: object yes; missing no; non-empty prefix yes (the + // directory-shaped probe Lance dataset roots rely on). + assert!(adapter.exists(&a).await.unwrap()); + assert!( + !adapter + .exists(&format!("{root}/contract/missing.json")) + .await + .unwrap() + ); + assert!(adapter.exists(&format!("{root}/contract")).await.unwrap()); + + // if_absent: exactly one claim wins; the loser leaves the winner's + // object untouched. + let claim = format!("{root}/contract/claim.json"); + assert!(adapter.write_text_if_absent(&claim, "first").await.unwrap()); + assert!(!adapter.write_text_if_absent(&claim, "second").await.unwrap()); + assert_eq!(adapter.read_text(&claim).await.unwrap(), "first"); + + // Versioned CAS: fresh token wins, stale token loses with Ok(None) + // (never a silent overwrite), missing object can't match. + let state = format!("{root}/contract/state.json"); + adapter.write_text(&state, "s1").await.unwrap(); + let (text, v1) = adapter.read_text_versioned(&state).await.unwrap(); + assert_eq!(text, "s1"); + let v2 = adapter + .write_text_if_match(&state, "s2", &v1) + .await + .unwrap() + .expect("fresh token must win"); + assert_ne!(v2, v1); + assert!( + adapter + .write_text_if_match(&state, "s3", &v1) + .await + .unwrap() + .is_none() + ); + assert_eq!(adapter.read_text(&state).await.unwrap(), "s2"); + assert!( + adapter + .write_text_if_match(&format!("{root}/contract/absent.json"), "x", &v1) + .await + .unwrap() + .is_none() + ); + + // rename: destination is replaced; source is gone. + let src = format!("{root}/contract/src.json"); + adapter.write_text(&src, "moved").await.unwrap(); + adapter.rename_text(&src, &a).await.unwrap(); + assert_eq!(adapter.read_text(&a).await.unwrap(), "moved"); + assert!(!adapter.exists(&src).await.unwrap()); + + // list_dir: direct children only, no sibling-prefix bleed, output + // URIs round-trip verbatim into read_text, missing dir is empty. + let dir_uri = format!("{root}/contract/list"); + adapter + .write_text(&format!("{dir_uri}/one.json"), "1") + .await + .unwrap(); + adapter + .write_text(&format!("{dir_uri}/two.json"), "2") + .await + .unwrap(); + adapter + .write_text(&format!("{dir_uri}/sub/three.json"), "3") + .await + .unwrap(); + adapter + .write_text(&format!("{root}/contract/list_log/x.json"), "x") + .await + .unwrap(); + let mut listed = adapter.list_dir(&dir_uri).await.unwrap(); + listed.sort(); + assert_eq!( + listed, + vec![ + format!("{dir_uri}/one.json"), + format!("{dir_uri}/two.json") + ] + ); + for uri in &listed { + adapter.read_text(uri).await.unwrap(); + } + assert!( + adapter + .list_dir(&format!("{root}/contract/nope")) + .await + .unwrap() + .is_empty() + ); + + // delete: idempotent. + adapter.delete(&claim).await.unwrap(); + adapter.delete(&claim).await.unwrap(); + assert!(!adapter.exists(&claim).await.unwrap()); + + // delete_prefix: recursive + idempotent; nothing under the prefix + // (including local directory skeletons) survives. + adapter + .delete_prefix(&format!("{root}/contract")) + .await + .unwrap(); + assert!(!adapter.exists(&a).await.unwrap()); + assert!(!adapter.exists(&format!("{root}/contract")).await.unwrap()); + adapter + .delete_prefix(&format!("{root}/contract")) + .await + .unwrap(); + } + + #[tokio::test] + async fn contract_suite_local() { + let dir = tempfile::tempdir().unwrap(); + let adapter = ObjectStorageAdapter::local(); + contract_suite(&adapter, dir.path().to_str().unwrap()).await; + } + + #[tokio::test] + async fn contract_suite_in_memory() { + // InMemory implements true conditional updates, so this runs the + // strong-CAS path (ETag tokens + PutMode::Update) without a bucket. + let adapter = ObjectStorageAdapter::in_memory(); + contract_suite(&adapter, "mem-root").await; + } + + /// `write_text_if_absent` must make the contents visible to any + /// subsequent reader before it returns — callers acknowledge + /// success the moment it resolves (cluster state bootstrap reads + /// the file back; init ownership claims depend on it). + /// Regression: the previous hand-rolled local adapter wrote through a + /// buffered `tokio::fs::File` without flushing, so the bytes could + /// still be in flight on the blocking pool while a reader saw an empty + /// or partial file. Reads back through `std::fs` deliberately — + /// cross-API visibility is the point. + #[tokio::test] + async fn local_write_text_if_absent_is_read_visible_on_return() { + let dir = tempfile::tempdir().unwrap(); + let adapter = ObjectStorageAdapter::local(); + let payload = "x".repeat(8 * 1024); + for i in 0..1000 { + let path = dir.path().join(format!("obj-{i}.json")); + let uri = format!("{}", path.display()); + assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap()); + let read = std::fs::read_to_string(&path).unwrap(); + assert_eq!( + read.len(), + payload.len(), + "iteration {i}: write_text_if_absent returned before its \ + contents reached the file" + ); + } + } + + /// Regression for the write_text_if_absent buffering bug, via the + /// `storage_for_uri` + `file://` construction path and a multi-thread + /// runtime (complements `local_write_text_if_absent_is_read_visible_- + /// on_return`, which uses the direct constructor and plain paths): a + /// reader immediately after Ok(true) must never see the created file + /// empty or short. #[tokio::test(flavor = "multi_thread")] async fn write_text_if_absent_is_read_consistent_immediately() { let dir = tempfile::tempdir().unwrap(); - let adapter = super::storage_for_uri(&format!("file://{}", dir.path().display())).unwrap(); + let adapter = storage_for_uri(&format!("file://{}", dir.path().display())).unwrap(); let payload = "x".repeat(64 * 1024); for i in 0..200 { let uri = format!("file://{}/f{}.json", dir.path().display(), i); @@ -630,55 +811,73 @@ mod tests { } } + /// Object-store semantics on the local filesystem: only objects exist. + /// An empty directory is not an object and not a non-empty prefix — + /// callers that genuinely probe local directories use std::fs. #[tokio::test] - async fn local_versioned_cas_roundtrip() { + async fn local_exists_is_object_semantics_for_directories() { let dir = tempfile::tempdir().unwrap(); - let uri = format!("{}/state.json", dir.path().display()); - let adapter = LocalStorageAdapter; - adapter.write_text(&uri, "v1").await.unwrap(); - let (text, version) = adapter.read_text_versioned(&uri).await.unwrap(); - assert_eq!(text, "v1"); - - // Matching token replaces and returns the next token. - let next = adapter - .write_text_if_match(&uri, "v2", &version) - .await - .unwrap() - .expect("fresh token must win"); - assert_ne!(next, version); - // The stale token must lose (CAS-lost is Ok(None), never silent). + let probe = dir.path().join("maybe-dataset"); + let adapter = ObjectStorageAdapter::local(); + std::fs::create_dir(&probe).unwrap(); assert!( - adapter - .write_text_if_match(&uri, "v3", &version) - .await - .unwrap() - .is_none() + !adapter.exists(probe.to_str().unwrap()).await.unwrap(), + "an empty directory is not an object" ); - let (text, _) = adapter.read_text_versioned(&uri).await.unwrap(); - assert_eq!(text, "v2"); - // Missing object: precondition can't hold. - let missing = format!("{}/absent.json", dir.path().display()); + std::fs::write(probe.join("1.manifest"), "m").unwrap(); assert!( - adapter - .write_text_if_match(&missing, "x", &version) - .await - .unwrap() - .is_none() + adapter.exists(probe.to_str().unwrap()).await.unwrap(), + "a non-empty prefix exists (the Lance dataset-root probe shape)" ); } + /// list_dir output is anchored on the INPUT dir_uri, so `file://` + /// anchors and paths with spaces round-trip byte-identically into + /// read_text — the cluster store passes file://-schemed roots. #[tokio::test] - async fn local_delete_prefix_is_recursive_and_idempotent() { + async fn local_list_round_trips_file_scheme_and_spaces() { let dir = tempfile::tempdir().unwrap(); - let root = format!("{}/tree", dir.path().display()); - let adapter = LocalStorageAdapter; - adapter.write_text(&format!("{root}/a.txt"), "a").await.unwrap(); - adapter.write_text(&format!("{root}/sub/b.txt"), "b").await.unwrap(); - adapter.delete_prefix(&root).await.unwrap(); - assert!(!adapter.exists(&format!("{root}/a.txt")).await.unwrap()); - adapter.delete_prefix(&root).await.unwrap(); // absent -> Ok + let root = dir.path().join("with space"); + let adapter = ObjectStorageAdapter::local(); + let plain = format!("{}/x.json", root.display()); + adapter.write_text(&plain, "x").await.unwrap(); + + let listed = adapter.list_dir(root.to_str().unwrap()).await.unwrap(); + assert_eq!(listed, vec![plain.clone()]); + assert_eq!(adapter.read_text(&listed[0]).await.unwrap(), "x"); + + let file_anchor = format!("file://{}", root.display()); + let listed = adapter.list_dir(&file_anchor).await.unwrap(); + assert_eq!(listed, vec![format!("{file_anchor}/x.json")]); + assert_eq!(adapter.read_text(&listed[0]).await.unwrap(), "x"); + } + + /// Relative and dot-segment paths are lexically absolutized before + /// hitting the object-path layer (which rejects them) — the CLI passes + /// `./graph.omni`-shaped URIs verbatim. + #[tokio::test] + async fn local_paths_with_dot_segments_are_absolutized() { + let dir = tempfile::tempdir().unwrap(); + let adapter = ObjectStorageAdapter::local(); + let uri = format!("{}/sub/../dotted.json", dir.path().display()); + adapter.write_text(&uri, "x").await.unwrap(); + assert_eq!(adapter.read_text(&uri).await.unwrap(), "x"); + assert!(dir.path().join("dotted.json").exists()); + } + + /// Upstream local rename creates missing destination parents — more + /// lenient than the previous bare fs::rename; pinned so an upstream + /// regression is loud. + #[tokio::test] + async fn local_rename_creates_missing_destination_parents() { + let dir = tempfile::tempdir().unwrap(); + let adapter = ObjectStorageAdapter::local(); + let src = format!("{}/src.json", dir.path().display()); + adapter.write_text(&src, "x").await.unwrap(); + let dst = format!("{}/new-sub/dst.json", dir.path().display()); + adapter.rename_text(&src, &dst).await.unwrap(); + assert_eq!(adapter.read_text(&dst).await.unwrap(), "x"); } - use super::*; #[test] fn storage_backend_selection_is_scheme_aware() { @@ -732,15 +931,4 @@ mod tests { assert_eq!(location.key, "graph/_schema.pg"); } - #[tokio::test] - async fn local_write_text_if_absent_creates_once_without_overwrite() { - let dir = tempfile::tempdir().unwrap(); - let uri = dir.path().join("claim.txt"); - let uri = uri.to_str().unwrap(); - let storage = LocalStorageAdapter; - - assert!(storage.write_text_if_absent(uri, "first").await.unwrap()); - assert!(!storage.write_text_if_absent(uri, "second").await.unwrap()); - assert_eq!(storage.read_text(uri).await.unwrap(), "first"); - } } diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 3be0a56..b45cfa0 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -1190,6 +1190,1222 @@ async fn refresh_runs_roll_forward_recovery_in_process() { assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); } +/// The long-lived-process contract for `load`: a Phase B → Phase C +/// failure (per-table `commit_staged` advanced Lance HEAD, manifest +/// publish did not land, sidecar persists) must not wedge subsequent +/// loads on the same engine handle. This is the server shape — `POST +/// /ingest` calls `load_as` on a shared handle with no reopen between +/// requests — so the follow-up load must heal the sidecar-covered +/// drift in-process: no restart, no explicit `refresh()`, no +/// `omnigraph repair`. +#[tokio::test] +async fn load_after_finalize_publisher_failure_heals_without_reopen() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + // Failed multi-table load: Person + Company + WorksAt all run + // commit_staged (Lance HEAD advances on three tables), then the + // publisher is wedged before the manifest commit. + { + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +{"type":"Person","data":{"name":"Bob","age":25}} +{"type":"Company","data":{"name":"Acme"}} +{"edge":"WorksAt","from":"Alice","to":"Acme"} +"#, + LoadMode::Merge, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 1, + "exactly one sidecar must persist after the finalize failure" + ); + } + + // Follow-up load on the SAME handle, touching the drifted tables. + // Must succeed without manual intervention. + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Carol","age":41}} +{"type":"Company","data":{"name":"Globex"}} +"#, + LoadMode::Merge, + ) + .await + .expect( + "a follow-up load on the same handle must heal sidecar-covered \ + drift in-process instead of demanding repair/restart", + ); + + // Both batches are visible: the first load rolled forward, the + // second landed normally on top of it. + assert_eq!(helpers::count_rows(&db, "node:Person").await, 3); + assert_eq!(helpers::count_rows(&db, "node:Company").await, 2); + assert_eq!(helpers::count_rows(&db, "edge:WorksAt").await, 1); + + // The sidecar was consumed by the in-process roll-forward. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "sidecar must be consumed by the in-process roll-forward" + ); + } +} + +/// Phase A storage-fault contract: a sidecar PUT failure (S3 PutObject / +/// fs write, injected at `recovery.sidecar_write`) must abort the load +/// BEFORE any Lance HEAD advances — no sidecar, no drift, nothing to +/// recover — and the same handle must write normally once the fault +/// clears (a transient storage error never wedges the graph). +#[tokio::test] +async fn sidecar_write_failure_aborts_load_with_no_head_advance() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + let person_uri = node_table_uri(&uri, "Person"); + let pre_head = lance::Dataset::open(&person_uri) + .await + .unwrap() + .version() + .version; + + { + let _failpoint = ScopedFailPoint::new("recovery.sidecar_write", "return"); + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +{"type":"Company","data":{"name":"Acme"}} +"#, + LoadMode::Merge, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: recovery.sidecar_write"), + "unexpected error: {err}" + ); + } + + // Phase A ordering: the sidecar write precedes the first + // commit_staged, so the failed load left no sidecar and moved no + // Lance HEAD — manifest and HEAD agree, nothing to recover. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "a Phase A put failure must not leave a sidecar" + ); + } + let post_head = lance::Dataset::open(&person_uri) + .await + .unwrap() + .version() + .version; + assert_eq!( + pre_head, post_head, + "a Phase A put failure must abort before any Lance HEAD advance" + ); + let manifest_pin = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap() + .entry("node:Person") + .unwrap() + .table_version; + assert_eq!(manifest_pin, post_head, "no drift after a Phase A abort"); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 0); + + // Fault cleared: the same handle writes normally — no wedge, no + // recovery required. + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +{"type":"Company","data":{"name":"Acme"}} +"#, + LoadMode::Merge, + ) + .await + .expect("a transient sidecar put failure must not wedge later writes"); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 1); + assert_eq!(helpers::count_rows(&db, "node:Company").await, 1); +} + +/// Real-backend coverage of the sidecar lifecycle: the same-handle heal +/// scenario on an S3-compatible store, exercising sidecar put / list / +/// delete through the S3 object-store backend instead of the +/// local filesystem backend. Skips unless `OMNIGRAPH_S3_TEST_BUCKET` is set +/// (same gate as `s3_storage.rs`); CI runs it against RustFS. +#[tokio::test] +async fn s3_load_recovers_after_publisher_failure_without_reopen() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let Some(uri) = helpers::s3_test_graph_uri("failpoints") else { + eprintln!( + "skipping s3_load_recovers_after_publisher_failure_without_reopen: \ + OMNIGRAPH_S3_TEST_BUCKET is not set" + ); + return; + }; + + let _scenario = FailScenario::setup(); + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + // Failed load: commit_staged lands on S3, manifest publish does not; + // the sidecar PUT went through the S3 adapter. + { + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +{"type":"Company","data":{"name":"Acme"}} +"#, + LoadMode::Merge, + ) + .await + .err() + .expect("finalize failpoint must fail the load"); + assert!( + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), + "unexpected error: {err}" + ); + } + + // Same-handle follow-up load: the entry heal LISTs __recovery/ on + // S3, rolls the sidecar forward, DELETEs it, and the write lands. + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Bob","age":25}} +"#, + LoadMode::Merge, + ) + .await + .expect("the same-handle heal must converge on an S3-backed graph"); + + assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); + assert_eq!(helpers::count_rows(&db, "node:Company").await, 1); + + // Reopen cross-check: nothing left for the open-time sweep, state + // converged (the heal consumed the sidecar on S3). + drop(db); + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); +} + +/// Storage-fault contract for the recovery AUDIT write (injected at +/// `recovery.record_audit`): a failure after the roll-forward's manifest +/// publish aborts that recovery attempt loudly and keeps the sidecar; +/// re-entry detects the already-published manifest (stale-sidecar path), +/// records exactly one `RolledForward` audit row, and converges — the +/// documented retry tolerance in `record_audit`'s contract, exercised +/// end-to-end through a real injected failure. +#[tokio::test] +async fn record_audit_failure_after_roll_forward_converges_on_next_write() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + // Pending sidecar with real drift. + { + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +"#, + LoadMode::Merge, + ) + .await + .err() + .expect("finalize failpoint must fail the load"); + } + + // The next write's heal rolls forward (manifest publish lands) but + // the audit write fails — the write must fail loudly and the sidecar + // must survive for the retry. + { + let _failpoint = ScopedFailPoint::new("recovery.record_audit", "return"); + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Bob","age":25}} +"#, + LoadMode::Merge, + ) + .await + .err() + .expect("an audit write failure mid-heal must fail the write"); + assert!( + err.to_string() + .contains("injected failpoint triggered: recovery.record_audit"), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 1, + "the sidecar must survive an audit write failure so the retry can record it" + ); + } + + // Fault cleared: the next write converges — stale-sidecar audit + // recovery (manifest already advanced) + the write itself. + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Carol","age":41}} +"#, + LoadMode::Merge, + ) + .await + .expect("recovery must converge once the audit fault clears"); + + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0); + } + // Alice (rolled forward) + Carol (clean). Bob's write failed before + // staging anything — the heal error aborted his load at entry. + assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); + // Exactly one audit row despite two recovery attempts: the first + // attempt's audit failed before any row landed; the retry recorded + // the roll-forward once. + let audit_uri = format!( + "{}/_graph_commit_recoveries.lance", + uri.trim_end_matches('/') + ); + let audit_rows = lance::Dataset::open(&audit_uri) + .await + .expect("audit dataset exists after the retried recovery") + .count_rows(None) + .await + .unwrap(); + assert_eq!(audit_rows, 1, "exactly one recovery audit row"); +} + +/// Storage-fault contract for the `__recovery/` LIST (S3 ListObjectsV2, +/// injected at `recovery.sidecar_list`): every consumer fails loudly — +/// the write-entry heal fails the write, the open-time sweep fails the +/// open — rather than silently skipping recovery over a pending sidecar +/// (which would be consumer tolerance of drift). Once the fault clears, +/// open recovers normally. +#[tokio::test] +async fn sidecar_list_failure_fails_write_and_open_loudly_then_clears() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + // Pending sidecar via the usual finalize → publisher failure. + { + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +"#, + LoadMode::Merge, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1); + } + + let _failpoint = ScopedFailPoint::new("recovery.sidecar_list", "return"); + + // Write-entry heal: the list failure surfaces as the write's error — + // no silent skip that would proceed over the pending sidecar. + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Bob","age":25}} +"#, + LoadMode::Merge, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: recovery.sidecar_list"), + "the write-entry heal must surface a list failure loudly; got: {err}" + ); + + // Open-time sweep: a fresh ReadWrite open fails on the same fault. + drop(db); + let err = Omnigraph::open(&uri) + .await + .err() + .expect("open must fail while the sidecar list fault is active"); + assert!( + err.to_string() + .contains("injected failpoint triggered: recovery.sidecar_list"), + "the open-time sweep must surface a list failure loudly; got: {err}" + ); + + // Fault cleared: open recovers the pending sidecar normally. + drop(_failpoint); + let db = Omnigraph::open(&uri).await.unwrap(); + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "open after the fault clears must recover the sidecar" + ); + } + assert_eq!(helpers::count_rows(&db, "node:Person").await, 1); +} + +/// Phase D storage-fault contract: a sidecar DELETE failure (S3 +/// DeleteObject, injected at `recovery.sidecar_delete`) after a +/// successful manifest publish must NOT fail the user's write — the +/// data is durable and visible. The stale sidecar it leaves behind is +/// consumed by the next write's entry heal (attributed `RolledForward` +/// audit row), not by an operator. +#[tokio::test] +async fn sidecar_delete_failure_keeps_write_success_and_next_write_heals() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + { + let _failpoint = ScopedFailPoint::new("recovery.sidecar_delete", "return"); + // The load itself must succeed: commit_staged + manifest publish + // landed; only the Phase D cleanup failed (swallowed + logged). + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +"#, + LoadMode::Merge, + ) + .await + .expect("a Phase D delete failure must not fail a write that already published"); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 1); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 1, + "the swallowed delete leaves a stale sidecar behind" + ); + } + + // Fault cleared: the next write's entry heal consumes the stale + // sidecar (manifest pin already caught up — the stale-sidecar + // roll-forward audit path) and the write lands. + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Bob","age":25}} +"#, + LoadMode::Merge, + ) + .await + .expect("a stale sidecar from a failed Phase D delete must not block later writes"); + + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "the stale sidecar must be consumed by the next write's heal" + ); + } + assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); +} + +/// Phase A storage-fault contract for branch_merge — the multi-table +/// writer where sidecar-before-commit ordering matters most. A sidecar +/// PUT failure must abort the merge before any target-table HEAD moves; +/// retrying after the fault clears merges cleanly. +#[tokio::test] +async fn sidecar_write_failure_aborts_branch_merge_with_no_head_advance() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + + db.branch_create("feature").await.unwrap(); + // Diverge BOTH sides so Person is a RewriteMerged candidate (the + // merge path that pins a recovery sidecar; an unchanged target would + // adopt source state without one). + helpers::mutate_branch( + &mut db, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + helpers::mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Mallory")], &[("$age", 35)]), + ) + .await + .unwrap(); + + let person_uri = node_table_uri(&uri, "Person"); + let pre_head = lance::Dataset::open(&person_uri) + .await + .unwrap() + .version() + .version; + + { + let _failpoint = ScopedFailPoint::new("recovery.sidecar_write", "return"); + let err = db.branch_merge("feature", "main").await.unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: recovery.sidecar_write"), + "unexpected error: {err}" + ); + } + + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "a Phase A put failure must not leave a sidecar" + ); + } + let post_head = lance::Dataset::open(&person_uri) + .await + .unwrap() + .version() + .version; + assert_eq!( + pre_head, post_head, + "a Phase A put failure must abort the merge before any target \ + Lance HEAD advance" + ); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); + + // Fault cleared: the merge lands cleanly. + db.branch_merge("feature", "main") + .await + .expect("a transient sidecar put failure must not wedge the merge"); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 3); +} + +/// Same contract as +/// `load_after_finalize_publisher_failure_heals_without_reopen`, for the +/// mutation entry point: after a failed mutation leaves a sidecar, the +/// next mutation on the same handle heals it in-process — no explicit +/// `refresh()` (which `refresh_runs_roll_forward_recovery_in_process` +/// covers), no reopen. +#[tokio::test] +async fn mutation_after_finalize_publisher_failure_heals_without_reopen() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + { + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let err = mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 1, + "exactly one sidecar must persist after the finalize failure" + ); + } + + // Follow-up mutation on the SAME handle, same table. No refresh, no + // reopen — the write entry point heals the drift itself. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Frank")], &[("$age", 33)]), + ) + .await + .expect( + "a follow-up mutation on the same handle must heal sidecar-covered \ + drift in-process instead of demanding repair/restart", + ); + + // Eve rolled forward, Frank landed normally. + assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); + + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "sidecar must be consumed by the in-process roll-forward" + ); + } +} + +/// Same heal contract as the load/mutation variants, for the schema +/// apply entry point: a pending roll-forward-eligible sidecar (here +/// from a failed load) must be healed in-process before the migration +/// runs, so a long-lived handle can evolve the schema without a +/// restart after a Phase B → Phase C failure. +#[tokio::test] +async fn schema_apply_after_finalize_publisher_failure_heals_without_reopen() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + + { + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +{"type":"Company","data":{"name":"Acme"}} +"#, + LoadMode::Merge, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1); + } + + // Additive migration on the SAME handle. Must heal the load's + // sidecar first, then apply normally. + let desired = format!("{}\nnode Tag {{ name: String @key }}\n", helpers::TEST_SCHEMA); + db.apply_schema(&desired).await.expect( + "schema apply on the same handle must heal sidecar-covered \ + drift in-process instead of failing until restart", + ); + + // The failed load rolled forward; the migration landed. + assert_eq!(helpers::count_rows(&db, "node:Person").await, 1); + assert_eq!(helpers::count_rows(&db, "node:Company").await, 1); + assert_eq!(helpers::count_rows(&db, "node:Tag").await, 0); + + // No sidecar remains (the load's was consumed by the heal; schema + // apply deleted its own after publish). + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "no sidecar may remain after heal + successful schema apply" + ); + } +} + +/// Same heal contract for the branch-merge entry point: a pending +/// roll-forward-eligible sidecar on the target branch must be healed +/// (with its recovery audit row) before the merge reads its target +/// snapshot — not silently folded into the merge's publish. +#[tokio::test] +async fn branch_merge_after_finalize_publisher_failure_heals_without_reopen() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + + // A feature branch with its own write, to merge back later. + db.branch_create("feature").await.unwrap(); + helpers::mutate_branch( + &mut db, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + + // Failed load on MAIN: Person drifts ahead of the manifest with a + // sidecar covering it. + { + let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return"); + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Bob","age":25}} +"#, + LoadMode::Merge, + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1); + } + + // Merge on the SAME handle. The entry heal must consume the load's + // sidecar (publishing Bob with a recovery audit row) BEFORE the + // merge captures its target snapshot. + db.branch_merge("feature", "main").await.expect( + "branch merge on the same handle must heal sidecar-covered \ + drift in-process instead of failing or folding it silently", + ); + + // No sidecar remains: the heal consumed the load's sidecar; the + // merge deleted its own after publish. Without the entry heal the + // merge's publish makes the drifted commit visible as a side effect + // (manifest catches up to HEAD) and the stale sidecar lingers + // until some later sweep — recovery must be attributed, not + // incidental. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 0, + "the load's sidecar must be consumed by the entry heal, not left behind" + ); + } + + // All three writes are visible on main: Alice (clean load), Bob + // (rolled forward), Eve (merged). + assert_eq!(helpers::count_rows(&db, "node:Person").await, 3); +} + +/// Discarding an orphaned-branch sidecar must be idempotent across a +/// Phase D delete failure: the audit row + commit land before the +/// sidecar delete, so a delete fault leaves the sidecar on disk with +/// the audit already written — the retry must NOT append a second +/// audit row for the same operation, only finish the delete. +#[tokio::test] +async fn orphaned_branch_discard_is_idempotent_across_delete_failure() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n", + LoadMode::Merge, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + helpers::mutate_branch( + &mut db, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + + // Deferred-shape sidecar pinned to feature (head < expected ⇒ + // invariant violation ⇒ every roll-forward-only pass defers it). + let person_uri = node_table_uri(&uri, "Person"); + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H000000000000000000000ID", + "started_at": "0", + "branch": "feature", + "actor_id": null, + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{person_uri}", + "expected_version": 999, + "post_commit_pin": 1000, + "table_branch": "feature" + }} + ] + }}"# + ); + let recovery_dir = dir.path().join("__recovery"); + std::fs::create_dir_all(&recovery_dir).unwrap(); + std::fs::write( + recovery_dir.join("01H000000000000000000000ID.json"), + &sidecar_json, + ) + .unwrap(); + + // Orphan the sidecar. + db.branch_delete("feature").await.unwrap(); + + // First write: the discard path writes its audit row, then the + // sidecar delete fails (injected). The write fails loudly. + { + let _failpoint = ScopedFailPoint::new("recovery.sidecar_delete", "return"); + let err = load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", + LoadMode::Merge, + ) + .await + .err() + .expect("a sidecar-delete fault mid-discard must fail the write"); + assert!( + err.to_string() + .contains("injected failpoint triggered: recovery.sidecar_delete"), + "unexpected error: {err}" + ); + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1); + } + + // Retry: must finish the delete WITHOUT a second audit row. + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", + LoadMode::Merge, + ) + .await + .expect("the retry must complete the orphan discard and the write"); + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0); + let orphan_rows = helpers::recovery::recovery_audit_kinds(dir.path()) + .await + .into_iter() + .filter(|kind| kind == "OrphanedBranchDiscarded") + .count(); + assert_eq!( + orphan_rows, 1, + "exactly one OrphanedBranchDiscarded audit row despite the delete-fault retry" + ); +} + +/// When the commit-time drift guard cannot LIST sidecars to classify +/// the drift (transient storage fault on the guard's list, after the +/// entry heal's list succeeded), it must say so and name BOTH recovery +/// paths — not confidently route to `omnigraph repair`, which refuses +/// while a sidecar is pending. Sequenced failpoint: first list (entry +/// heal) passes, second list (the guard) fails. +#[tokio::test] +async fn drift_guard_names_both_paths_when_sidecar_list_fails() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n", + LoadMode::Append, + ) + .await + .unwrap(); + + // Rollback-eligible (deferred) sidecar covering main's Person drift — + // same shape as refresh_defers_rollback_eligible_sidecar_to_next_open. + let snapshot = db + .snapshot_of(omnigraph::db::ReadTarget::branch("main")) + .await + .unwrap(); + let entry = snapshot.entry("node:Person").unwrap(); + let person_uri = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path); + let manifest_pin = entry.table_version; + let mut ds = lance::Dataset::open(&person_uri).await.unwrap(); + helpers::lance_delete_inline(&mut ds, "1 = 2").await; + let head_after_drift = ds.version().version; + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H0000000000000000000LSTF", + "started_at": "0", + "branch": null, + "actor_id": null, + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key":"node:Person", + "table_path":"{}", + "expected_version":{}, + "post_commit_pin":{} + }} + ] + }}"#, + person_uri, + manifest_pin - 1, + head_after_drift, + ); + let recovery_dir = dir.path().join("__recovery"); + std::fs::create_dir_all(&recovery_dir).unwrap(); + std::fs::write( + recovery_dir.join("01H0000000000000000000LSTF.json"), + &sidecar_json, + ) + .unwrap(); + + // First list (entry heal) passes and defers the sidecar; second + // list (the guard's classification) fails. + let _failpoint = ScopedFailPoint::new("recovery.sidecar_list", "1*off->1*return"); + let err = load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"bob\",\"age\":25}}\n", + LoadMode::Merge, + ) + .await + .err() + .expect("drift must still fail the write"); + let msg = err.to_string(); + assert!( + msg.contains("could not classify the drift") + && msg.contains("omnigraph repair") + && msg.contains("reopen the graph read-write"), + "an unclassifiable drift must name BOTH recovery paths, not \ + confidently route to repair; got: {msg}" + ); +} + +/// The other half of the orphan-discard fault matrix: the audit append +/// fails AFTER the recovery commit landed. The retry (keyed on the +/// audit row, the operator-facing record) must converge to exactly one +/// audit row and a consumed sidecar. The second recovery commit the +/// retry appends is the documented not-atomic-pair-write tolerance +/// (same class as `record_audit` and the manifest→commit-graph Known +/// Gap): bounded commit-graph noise, never a lost or duplicated audit +/// record under clean failures. +#[tokio::test] +async fn orphaned_branch_discard_converges_across_audit_append_failure() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n", + LoadMode::Merge, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + helpers::mutate_branch( + &mut db, + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + + // Deferred-shape sidecar pinned to feature, then orphaned. + let person_uri = node_table_uri(&uri, "Person"); + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H000000000000000000000AF", + "started_at": "0", + "branch": "feature", + "actor_id": null, + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{person_uri}", + "expected_version": 999, + "post_commit_pin": 1000, + "table_branch": "feature" + }} + ] + }}"# + ); + let recovery_dir = dir.path().join("__recovery"); + std::fs::create_dir_all(&recovery_dir).unwrap(); + std::fs::write( + recovery_dir.join("01H000000000000000000000AF.json"), + &sidecar_json, + ) + .unwrap(); + db.branch_delete("feature").await.unwrap(); + + // First write: the recovery commit lands, then the audit append + // fails (injected). The write fails loudly; the sidecar survives so + // the discard is retried with the audit still owed. + { + let _failpoint = ScopedFailPoint::new("recovery.orphan_discard_audit_append", "return"); + let err = load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", + LoadMode::Merge, + ) + .await + .err() + .expect("an audit-append fault mid-discard must fail the write"); + assert!( + err.to_string() + .contains("injected failpoint triggered: recovery.orphan_discard_audit_append"), + "unexpected error: {err}" + ); + assert_eq!( + std::fs::read_dir(&recovery_dir).unwrap().count(), + 1, + "the sidecar must survive an audit-append fault so the discard is retried" + ); + let orphan_rows = helpers::recovery::recovery_audit_kinds(dir.path()) + .await + .into_iter() + .filter(|kind| kind == "OrphanedBranchDiscarded") + .count(); + assert_eq!(orphan_rows, 0, "no audit row landed before the fault"); + } + + // Retry: converges — sidecar consumed, exactly one audit row. + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", + LoadMode::Merge, + ) + .await + .expect("the retry must complete the orphan discard and the write"); + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0); + let orphan_rows = helpers::recovery::recovery_audit_kinds(dir.path()) + .await + .into_iter() + .filter(|kind| kind == "OrphanedBranchDiscarded") + .count(); + assert_eq!( + orphan_rows, 1, + "exactly one OrphanedBranchDiscarded audit row despite the audit-fault retry" + ); +} + +/// After the write-entry heal rolls a SchemaApply sidecar forward (a +/// crashed apply on the SAME handle: staging promoted, registrations +/// published), the handle's in-memory catalog must be reloaded — disk +/// and manifest are on the new schema, and validating subsequent +/// writes against the stale catalog rejects rows of types the graph +/// already has. +#[tokio::test] +async fn load_after_schema_apply_phase_b_failure_uses_recovered_catalog() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n", + LoadMode::Append, + ) + .await + .unwrap(); + + // v2: a Person property (rewritten_tables work) + a new Tag type + // (table-set change, keeps the staging disambiguator decisive). + let v2_schema = r#"node Person { + name: String @key + age: I32? + city: String? +} + +node Company { + name: String @key +} + +node Tag { + label: String @key +} + +edge Knows: Person -> Person { + since: Date? +} + +edge WorksAt: Person -> Company +"#; + { + let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return"); + let err = db.apply_schema(v2_schema).await.unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: schema_apply.after_staging_write"), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1); + } + + // Same handle: a load of the NEW type. The entry heal rolls the + // apply forward (staging promoted, manifest registers node:Tag) — + // and the loader must then validate against the RECOVERED catalog, + // not the stale in-memory one. + load_jsonl( + &mut db, + "{\"type\":\"Tag\",\"data\":{\"label\":\"t1\"}}\n", + LoadMode::Merge, + ) + .await + .expect( + "after the heal rolls the schema apply forward, the same handle \ + must accept rows of the recovered schema's types", + ); + assert_eq!(helpers::count_rows(&db, "node:Tag").await, 1); + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0); + } +} + +/// A concurrent write's entry heal must NOT promote a LIVE schema +/// apply's staging files. The apply pauses just after writing its +/// staging files (sidecar on disk from Phase A, staging on disk, +/// manifest not yet committed); a load on the same handle fires the +/// heal in that window. If the heal's schema-staging reconcile runs +/// unserialized, it promotes the staging files from under the live +/// apply — putting the NEW catalog live against the OLD manifest — and +/// the resumed apply's own renames then fail on the missing sources: +/// an error (and a corrupted catalog) for an otherwise-healthy apply. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn heal_does_not_promote_live_schema_apply_staging() { + use omnigraph::loader::LoadMode; + use std::sync::Arc; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + let db = Arc::new(Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap()); + + // Pause the apply right after its staging files land (its sidecar is + // already on disk from Phase A; the manifest commit has not run). + let failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "pause"); + + let apply_db = Arc::clone(&db); + let desired = format!("{}\nnode Tag {{ name: String @key }}\n", helpers::TEST_SCHEMA); + let apply = tokio::spawn(async move { apply_db.apply_schema(&desired).await }); + + // Wait until the apply is parked in the window: staging on disk. + let staging_pg = dir.path().join("_schema.pg.staging"); + for _ in 0..500 { + if staging_pg.exists() { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + assert!(staging_pg.exists(), "schema apply never reached the paused window"); + + // Concurrent load on the same handle: its entry heal runs while the + // apply is paused. The load itself may fail (schema apply in + // progress) — what matters is what its heal does to the live apply. + let load_db = Arc::clone(&db); + let load = tokio::spawn(async move { + load_db + .load_as( + "main", + None, + "{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n", + LoadMode::Merge, + None, + ) + .await + }); + + // Give the load's heal time to act inside the window. Broken code + // completes the load here (its heal promoted the staging files and + // stole the apply's commit); fixed code leaves the load blocked on + // the schema-apply serialization key until the apply finishes. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + drop(failpoint); + + let apply_result = apply.await.unwrap(); + let _ = tokio::time::timeout(std::time::Duration::from_secs(30), load) + .await + .expect("load must complete once the apply releases its guards") + .unwrap(); + apply_result.expect( + "a concurrent write's heal must not promote the live schema \ + apply's staging files out from under it", + ); + + // The migration landed and nothing recovery-shaped remains. + assert_eq!(helpers::count_rows(&db, "node:Tag").await, 0); + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0); + } +} + /// Refresh-time recovery must NOT call `Dataset::restore` — it can /// silently orphan a concurrent writer's commit. Sidecars that would /// require rollback must be left on disk for the next ReadWrite open. @@ -1306,6 +2522,26 @@ async fn refresh_defers_rollback_eligible_sidecar_to_next_open() { pre_head={pre_head}, post_head={post_head}", ); + // A write attempt while the rollback-eligible sidecar is deferred: + // the write-entry heal defers it again (roll-forward-only), and the + // commit-time drift guard must name the actual recovery path (a + // read-write reopen) — NOT `omnigraph repair`, which refuses while + // a sidecar is pending. + let err = mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Grace")], &[("$age", 50)]), + ) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("a pending recovery sidecar requires rollback"), + "drift guard must point at a read-write reopen for sidecar-covered \ + rollback-eligible drift; got: {err}" + ); + // Cross-check: drop the engine and reopen — full sweep handles // the rollback (will use Dataset::restore safely; no concurrent // writers at open time). diff --git a/crates/omnigraph/tests/helpers/recovery.rs b/crates/omnigraph/tests/helpers/recovery.rs index 90d9a25..4cb45e0 100644 --- a/crates/omnigraph/tests/helpers/recovery.rs +++ b/crates/omnigraph/tests/helpers/recovery.rs @@ -143,6 +143,39 @@ pub fn sidecar_operation_ids(graph_root: &Path) -> Vec { ids } +/// Recovery-audit rows' `recovery_kind` values at `graph_root`, in +/// storage order. Empty when the audit dataset doesn't exist yet. +pub async fn recovery_audit_kinds(graph_root: &Path) -> Vec { + let recoveries_dir = graph_root.join("_graph_commit_recoveries.lance"); + if !recoveries_dir.exists() { + return Vec::new(); + } + let ds = Dataset::open(recoveries_dir.to_str().unwrap()) + .await + .expect("recoveries dataset opens"); + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let mut out = Vec::new(); + for batch in batches { + let kinds = batch + .column_by_name("recovery_kind") + .expect("recovery_kind column present") + .as_any() + .downcast_ref::() + .expect("recovery_kind is Utf8"); + for i in 0..kinds.len() { + out.push(kinds.value(i).to_string()); + } + } + out +} + pub async fn branch_head_commit_id(graph_root: &Path, branch: &str) -> Result { let graph = match branch { "main" => CommitGraph::open(&graph_uri(graph_root)).await?, diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index 37d46cb..b5ca58f 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -134,6 +134,218 @@ async fn recovery_refuses_unknown_schema_version_on_open() { ); } +#[tokio::test] +async fn recovery_refuses_corrupt_sidecar_on_open_and_write() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + // A truncated/garbage sidecar — e.g. a crashed writer or a partial + // local-FS write (S3 PutObject is atomic; local fs::write is not). + write_sidecar_file(dir.path(), "01H000000000000000000000CC", "{not json"); + + // A live handle's write-entry heal must surface the parse failure + // loudly instead of proceeding over a sidecar it cannot interpret. + let err = load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"Alice","age":30}} +"#, + LoadMode::Merge, + ) + .await + .err() + .expect("expected the write to fail on the corrupt sidecar"); + assert!( + err.to_string().contains("is not valid JSON"), + "expected the corrupt-sidecar parse error, got: {}", + err, + ); + + // A fresh ReadWrite open fails the same way. + drop(db); + let err = Omnigraph::open(uri) + .await + .err() + .expect("expected open to fail because of the corrupt sidecar"); + let msg = err.to_string(); + assert!( + msg.contains("01H000000000000000000000CC") && msg.contains("is not valid JSON"), + "expected the corrupt-sidecar parse error naming the file, got: {}", + msg, + ); + // The file must remain on disk for inspection — never auto-deleted. + assert!( + list_recovery_dir(dir.path()).contains(&"01H000000000000000000000CC.json".to_string()), + "corrupt sidecar should remain on disk after refusal" + ); + + // Read-only open still works — the sweep is skipped entirely. + let _db = Omnigraph::open_read_only(uri).await.unwrap(); +} + +/// The commit-time drift guard's advice must be branch-aware: a pending +/// sidecar on ANOTHER branch does not cover this branch's drift. With a +/// deferred feature-branch sidecar on disk and genuinely uncovered drift +/// on main, the main write must still point at `omnigraph repair` — a +/// read-write reopen recovers the sidecar but cannot repair main's +/// uncovered drift. +#[tokio::test] +async fn drift_guard_advice_ignores_other_branch_sidecars() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n", + LoadMode::Merge, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + // A real feature write forks Person's Lance dataset onto the branch + // (the heal classifies a feature sidecar against the forked head). + db.mutate( + "feature", + helpers::MUTATION_QUERIES, + "insert_person", + &helpers::mixed_params(&[("$name", "eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + + // A sidecar pinning node:Person ON FEATURE, shaped so the write-entry + // heal defers it (head < expected_version classifies as an invariant + // violation; roll-forward-only mode leaves it for the next ReadWrite + // open) — it persists through the write attempt below. + let person_uri = node_table_uri(uri, "Person"); + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H000000000000000000000XB", + "started_at": "0", + "branch": "feature", + "actor_id": null, + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{person_uri}", + "expected_version": 999, + "post_commit_pin": 1000, + "table_branch": "feature" + }} + ] + }}"# + ); + write_sidecar_file(dir.path(), "01H000000000000000000000XB", &sidecar_json); + + // Genuinely uncovered drift on MAIN's Person (raw Lance write + // bypassing the manifest — the `omnigraph repair` class). + let mut ds = Dataset::open(&person_uri).await.unwrap(); + let _ = helpers::lance_delete_inline(&mut ds, "1 = 2").await; + + let err = load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", + LoadMode::Merge, + ) + .await + .err() + .expect("uncovered main drift must fail the write"); + assert!( + err.to_string().contains("run `omnigraph repair`"), + "a feature-branch sidecar must not flip main's uncovered-drift \ + advice to the reopen path; got: {err}" + ); +} + +/// A deferred sidecar pinned to a branch that is subsequently DELETED +/// must not wedge the graph: the branch's tree and forks are reclaimed, +/// so the pinned drift is unreachable and the sidecar is provably moot. +/// Both the write-entry heal and the open-time sweep must classify it +/// as orphaned (audit + discard) instead of failing to open the dead +/// branch on every write and every ReadWrite open — a terminal state, +/// since `repair` refuses while a sidecar is pending. +#[tokio::test] +async fn deleted_branch_sidecar_does_not_wedge_writes_or_open() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n", + LoadMode::Merge, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + helpers::MUTATION_QUERIES, + "insert_person", + &helpers::mixed_params(&[("$name", "eve")], &[("$age", 22)]), + ) + .await + .unwrap(); + + // A rollback-eligible (deferred) sidecar pinned to feature — shaped + // so every roll-forward-only pass leaves it on disk. + let person_uri = node_table_uri(&uri, "Person"); + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H000000000000000000000DB", + "started_at": "0", + "branch": "feature", + "actor_id": null, + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key": "node:Person", + "table_path": "{person_uri}", + "expected_version": 999, + "post_commit_pin": 1000, + "table_branch": "feature" + }} + ] + }}"# + ); + write_sidecar_file(dir.path(), "01H000000000000000000000DB", &sidecar_json); + + // Branch delete defers the rollback-eligible sidecar and proceeds — + // the sidecar now references a branch that no longer exists. + db.branch_delete("feature").await.unwrap(); + + // The next write's heal must classify the orphan and discard it, + // not fail opening the dead branch. + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n", + LoadMode::Merge, + ) + .await + .expect("a write after deleting a sidecar-pinned branch must succeed"); + assert_eq!( + list_recovery_dir(dir.path()).len(), + 0, + "the orphaned sidecar must be discarded (with an audit row), not left to wedge" + ); + + // And a fresh ReadWrite open must succeed too (the sweep shares the + // same classification). + drop(db); + let db = Omnigraph::open(&uri) + .await + .expect("ReadWrite open after deleting a sidecar-pinned branch must succeed"); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 2); +} + #[tokio::test] async fn read_only_open_skips_recovery_sweep() { let dir = tempfile::tempdir().unwrap(); diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 655e360..b3bcfaf 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -42,10 +42,12 @@ Use it this way: 5. **Recovery is part of the commit protocol.** Writers that can advance Lance HEAD before manifest publish must write `__recovery/{ulid}.json` sidecars. - `Omnigraph::open` in read-write mode runs the all-or-nothing sweep, and - `refresh` runs roll-forward-only recovery for long-lived processes. Do not - add a new writer kind without sidecar coverage or an explicit proof that no - Lance HEAD can move before manifest publish. + `Omnigraph::open` in read-write mode runs the all-or-nothing sweep; the + write entry points (`load_as`, `mutate_as`, `apply_schema_as`, + `branch_merge_as`) and `refresh` run roll-forward-only recovery in-process, + so a long-lived process converges on its next write rather than at restart. Do not add a new writer kind without + sidecar coverage or an explicit proof that no Lance HEAD can move before + manifest publish. 6. **Strong consistency is the default.** Reads are snapshot-isolated, writes are durable before acknowledgement, and branch reads observe the current @@ -106,7 +108,7 @@ Use it this way: | Index lifecycle | `ensure_indices` is explicit today; reconciler-based convergence is roadmap | [indexes.md](../user/indexes.md), [maintenance.md](../user/maintenance.md) | | Traversal IDs | Runtime still builds `TypeIndex`; Lance stable row-id based graph IDs are roadmap | [architecture.md](architecture.md), [query-language.md](../user/query-language.md) | | Auth | Bearer token hashing and server-side actor resolution are implemented at the HTTP boundary | [server.md](../user/server.md), [policy.md](../user/policy.md) | -| Tests | Tempdir-backed Lance tests are the current substrate; there is no `MemStorage` test backend | [testing.md](testing.md) | +| Tests | Tempdir-backed Lance tests are the current substrate; the storage adapter has an in-memory backend for adapter-level contract tests, but Lance datasets bypass it | [testing.md](testing.md) | The branch-delete reconciler is authority-derived: it reclaims orphaned forks today and degrades to a no-op if Lance ships an atomic multi-dataset branch @@ -146,6 +148,29 @@ them explicit. Remove the skip when the upstream Lance fix lands — the `lance_surface_guards.rs::compact_files_still_fails_on_blob_columns` guard turns red on that bump to force it. +- **Recovery is serialized against live writers in-process only:** the + write-entry heal (and `refresh`) serialize against a live writer's sidecar + lifetime via the per-`(table, branch)` write queues plus the schema-apply + serialization key — all in-process primitives. A recovery pass in one + process cannot serialize against a live writer in another (the open-time + sweep has the same exposure, and always has): it may roll a live foreign + writer's sidecar forward, which degrades to publisher-CAS contention for + data writes but can race the schema-staging promotion for a foreign live + schema apply. Multi-process writers on one graph are already documented + one-winner-CAS territory; closing this fully needs a cross-process + serialization primitive (e.g. lease-based use of the schema-apply lock + branch) — design it before promoting multi-process write topologies. +- **Local `write_text_if_match` is not a cross-process CAS:** object-store + backends use a true conditional put (ETag If-Match; the in-memory test + backend too), but upstream `object_store` leaves `PutMode::Update` + unimplemented for `LocalFileSystem`, so the local path emulates CAS with + a content-token compare followed by an atomic replace — a check-then-act + gap plus content-token ABA. Every current caller goes through the cluster + lock protocol first, which makes this safe. A lock-free caller would get + S3-correct but local-racy behavior — the same divergence shape as the + acknowledged-before-visible bug this branch fixed. Close it (local CAS + primitive, or a trait-level lock requirement) before admitting any + lock-free `if_match` caller. - **Manifest→commit-graph publish atomicity:** a graph commit advances `__manifest` (the visibility authority) and then appends `_graph_commits` as two separate writes (`commit_updates_with_actor_with_expected`, failpoint diff --git a/docs/dev/testing.md b/docs/dev/testing.md index f2b33de..d2d08f3 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -43,7 +43,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths | | `policy_engine_chassis.rs` | Engine-layer Cedar enforcement (MR-722): allow + deny through every `_as` writer via the SDK directly — no HTTP — proving embedded and CLI callers hit the same gate as the server, with action × scope shapes matching `authorize_request` | | `maintenance.rs` | `optimize` (compaction), `repair` (explicit uncovered-drift publish), and `cleanup` (version GC): empty/idempotent/no-op edges, policy validation, head preservation; `optimize` publishes its own compaction (`optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds`), skips pre-existing uncovered drift (`optimize_skips_preexisting_manifest_head_drift`), and refuses to run while a `__recovery` sidecar is pending (`optimize_defers_when_recovery_sidecar_is_pending`); `repair` previews/heals verified maintenance drift, refuses raw semantic drift without `--force`, and forced repair publishes only by explicit operator choice | -| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`). | +| `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`) and the write-entry in-process heal contract (the four `*_after_finalize_publisher_failure_heals_without_reopen` tests — load, mutation, schema apply, branch merge: a follow-up write on the same handle rolls a sidecar-covered residual forward without reopen/refresh) and the storage-fault matrix for the sidecar lifecycle (`recovery.sidecar_{write,delete,list}` / `recovery.record_audit` failpoints: Phase A put failure aborts with zero drift, Phase D delete failure is swallowed and healed by the next write, list failures are loud at heal and open, audit-append failures are retried to exactly one audit row; plus the bucket-gated `s3_load_recovers_after_publisher_failure_without_reopen`). | | `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path | | `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). | @@ -57,7 +57,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav - **CLI** — `crates/omnigraph-cli/tests/support/mod.rs`: `Command`-style wrapper for invoking `omnigraph`, server-process spawning, fixture resolution, output assertion helpers. - **Server** — no shared helpers; server tests call the `Omnigraph` engine API directly and exercise endpoints over the wire. -> Note: there is **no `MemStorage` or in-memory backend** today. Tests use `tempfile::tempdir()` for local FS. If you find yourself needing one for layer isolation, that's an architectural ask — keep it explicit in [docs/dev/invariants.md](invariants.md) under known gaps. +> Note: the storage adapter has an in-memory backend (`ObjectStorageAdapter::in_memory()`, full contract including true conditional updates) used by the adapter contract tests in `storage.rs`. It covers only the text-object layer (sidecars, schema staging, cluster state) — **Lance datasets bypass the adapter**, so engine integration tests still use `tempfile::tempdir()`. An in-memory Lance substrate remains an architectural ask — keep it explicit in [docs/dev/invariants.md](invariants.md) under known gaps. ## Failpoints (fault injection) @@ -74,6 +74,7 @@ CI runs three S3-backed tests against a containerized RustFS server (`.github/wo - `cargo test -p omnigraph-server --test s3` (single-graph serving + config-free `--cluster s3://` boot) - `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket) - `cargo test -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow` +- `cargo test -p omnigraph-engine --features failpoints --test failpoints s3_` (recovery-sidecar lifecycle on a real bucket) Locally, set `OMNIGRAPH_S3_TEST_BUCKET` (and the usual `AWS_*` vars including `AWS_ENDPOINT_URL_S3` for non-AWS) before running. Without those, S3 tests skip gracefully. diff --git a/docs/dev/writes.md b/docs/dev/writes.md index 5647d82..82d6ba8 100644 --- a/docs/dev/writes.md +++ b/docs/dev/writes.md @@ -215,19 +215,43 @@ Triggers for the residual: transient Lance write errors during finalize (object-store retry budget exhaustion, disk full); persistent publisher contention exceeding `PUBLISHER_RETRY_BUDGET = 5` retries. -**Long-running servers**: `Omnigraph::refresh` runs roll-forward-only -recovery in-process — the common Phase B → Phase C residual closes -without a restart. The next mutation on the same handle (after refresh) -no longer surfaces `ExpectedVersionMismatch` for the failed table. +**Long-running servers**: the write entry points (`load_as`, +`mutate_as`, `apply_schema_as`, `branch_merge_as`) and +`Omnigraph::refresh` run roll-forward-only recovery in-process +(`recovery::heal_pending_sidecars_roll_forward`) — the common +Phase B → Phase C residual closes on the next write, without a +restart and without an explicit refresh. The heal lists `__recovery/` +(one `list_dir`; empty in the steady state) and, per sidecar, acquires +the same per-`(table_key, table_branch)` write queues every sidecar +writer holds from before `write_sidecar` until after `delete_sidecar` — +so it serializes against a live writer instead of rolling its +in-flight sidecar forward from under it (a sidecar whose queues can be +acquired belongs to a writer that finished or died; an existence +re-check after the wait skips the finished case). Lock order is +queues → coordinator, matching every writer's commit→publish path. +Pinned by the four +`tests/failpoints.rs::*_after_finalize_publisher_failure_heals_without_reopen` +tests (load, mutation, schema apply, branch merge). The maintenance +entries need the heal for more than liveness: without it, a schema +apply re-plans rewrites from the manifest pin and orphans the drifted +Phase-B commit (dropping its rows), and a branch merge publishes the +drift as an unattributed side effect — both while the stale sidecar +lingers to misclassify later. Sidecars that would require a `Dataset::restore` (mixed / unexpected state) are deferred to the next `OpenMode::ReadWrite` open: restore is unsafe under concurrency because Lance's `check_restore_txn` accepts the restore against in-flight Append/Update/Delete commits and silently orphans them (pinned by `tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning`). +When such a deferred sidecar blocks a write, the commit-time drift +guard says so explicitly ("a pending recovery sidecar requires +rollback — reopen the graph read-write") instead of pointing at +`omnigraph repair`, which refuses while a sidecar is pending. Continuous in-process recovery for the rollback path is the goal of a -future background reconciler with per-(table, branch) writer-queue -acquisition. +future background reconciler. `ensure_indices` does not heal at entry +itself — it runs inside the load / schema-apply flows after their +entry heal, and its strict preconditions still fail loudly on drift +when invoked directly. The publisher-CAS contract is unchanged: a *concurrent writer* that advances any of our touched tables between snapshot capture and @@ -235,6 +259,44 @@ publisher commit produces exactly one winner. The residual above is about *our* abandoned commits in the failure path, not about concurrency races. +**Sidecar I/O failure semantics** (all sidecar I/O goes through the +backend-generic `StorageAdapter`; the contracts below are pinned by the +storage-fault failpoints `recovery.sidecar_{write,delete,list}` / +`recovery.record_audit` and their tests in `tests/failpoints.rs` and +`tests/recovery.rs`): + +- **Phase A put fails** (S3 PutObject / fs write): the writer aborts + before its first HEAD-advancing commit — no sidecar, no drift, + nothing to recover; a transient fault never wedges later writes. +- **Phase D delete fails** (S3 DeleteObject): swallowed with a warning — + the write already published, so failing the caller would report an + error for a durable write. The stale sidecar is consumed by the next + write's entry heal (or the next open) via the stale-sidecar + audit-recovery path, recorded as `RolledForward`. +- **`__recovery/` list fails** (S3 ListObjectsV2): loud at every + consumer — the write-entry heal fails the write, the open-time sweep + fails the open. Silently skipping recovery would be consumer + tolerance of drift. +- **Corrupt / unparseable sidecar**: refused loudly by heal and open + alike; the file stays on disk for operator inspection (read-only + opens still work — the sweep is skipped there). +- **Audit append fails after a roll-forward publish**: that recovery + attempt errors and keeps the sidecar; re-entry sees the + already-published manifest, records exactly one `RolledForward` + audit row, and deletes the sidecar (the retry tolerance documented + on `record_audit`). + +Backend notes (the adapter is one implementation over `object_store` +for every backend): local writes stage through `name#` temp +files that the backend filters from listings and refuses to address — +crash residue of that shape is invisible to the sweep, harmless, and +reclaimed by `delete_prefix`/manual cleanup. Storage errors are +backend-wrapped text without a typed NotFound discriminant — callers +that need missing-vs-error (the cluster store) probe `exists()` first. +`exists()` itself is object-store semantics everywhere: only objects +(or non-empty prefixes) exist, and a permission failure is a loud +error, not a silent `false`. + ## Conflict shape Concurrent writers to the same `(table, branch)` produce exactly one diff --git a/docs/user/storage.md b/docs/user/storage.md index 2c57a92..9cc2356 100644 --- a/docs/user/storage.md +++ b/docs/user/storage.md @@ -104,8 +104,8 @@ The split — L2 owns the cross-dataset catalog; L1 owns the per-dataset interna | Scheme | Backend | Notes | |---|---|---| -| local path / `file://` | `LocalStorageAdapter` (tokio) | Normalized to absolute paths | -| `s3://bucket/prefix` | `S3StorageAdapter` (object_store) | Honors `AWS_ENDPOINT_URL_S3`, `AWS_ALLOW_HTTP`, `AWS_S3_FORCE_PATH_STYLE` | +| local path / `file://` | `ObjectStorageAdapter` over `object_store::LocalFileSystem` | Normalized to absolute paths; relative and dot-segment paths are lexically absolutized | +| `s3://bucket/prefix` | `ObjectStorageAdapter` over `object_store` S3 | Honors `AWS_ENDPOINT_URL_S3`, `AWS_ALLOW_HTTP`, `AWS_S3_FORCE_PATH_STYLE` | | `http(s)://host:port` | HTTP client to `omnigraph-server` | Used by CLI as a target, not a storage backend | ## Object-store env vars (S3-compatible)