recovery: scaffold sidecar protocol + classifier + decision tree (Phase 2)

Add db/manifest/recovery.rs with the primitives the open-time recovery
sweep will invoke. No integration into Omnigraph::open or any writer
path yet — those land in Phase 3+.

Sidecar protocol:
- RecoverySidecar JSON shape (schema_version=1; SidecarSchemaError
  refuses unknown versions — old binaries don't guess at newer shapes).
- SidecarKind {Mutation, Load, SchemaApply, BranchMerge, EnsureIndices}
  for audit attribution.
- SidecarTablePin {table_key, table_path, expected_version,
  post_commit_pin}.
- write_sidecar / delete_sidecar / list_sidecars / parse_sidecar.

Classifier + decision dispatcher (all-or-nothing per sidecar):
- TableClassification {NoMovement, RolledPastExpected, UnexpectedAtP1,
  UnexpectedMultistep, InvariantViolation}.
- classify_table(pin, lance_head, manifest_pinned).
- decide(&[TableClassification]) -> SidecarDecision {RollForward,
  RollBack, Abort}. Mid-Phase-B crash with mixed states rolls BACK
  (not forward) — atomicity per docs/invariants.md §VI.23.

Restore primitive:
- restore_table_to_version(table_path, expected_version): open,
  checkout(expected_version), restore. Includes a fragment-set
  equality short-circuit so repeated mid-rollback crashes don't pile
  up Lance versions (Lance fragments are immutable; equal fragment-ids
  ⇒ equal content).

StorageAdapter trait extension:
- Added list_dir(dir_uri) -> Vec<String> for sidecar enumeration.
  LocalStorageAdapter uses tokio::fs::read_dir; S3StorageAdapter uses
  object_store::list with a prefix-collision guard
  (filters to require the directory '/' boundary so listing
  __recovery doesn't accidentally match __recovery_log/...).
  RecordingStorageAdapter (test wrapper) delegates to inner.

17 unit tests covering: classifier branches, decision branches
(including mid-Phase-B mix → RollBack and empty slice → RollForward),
JSON round-trip, schema-version refusal, restore HEAD+1, fragment-set
short-circuit no-op, list_sidecars empty/round-trip/non-JSON-skip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-02 23:22:17 +02:00
parent 1e70028293
commit 376d91d538
No known key found for this signature in database
4 changed files with 760 additions and 0 deletions

View file

@ -16,6 +16,8 @@ mod migrations;
mod namespace;
#[path = "manifest/publisher.rs"]
mod publisher;
#[path = "manifest/recovery.rs"]
mod recovery;
#[path = "manifest/repo.rs"]
mod repo;
#[path = "manifest/state.rs"]

View file

@ -0,0 +1,692 @@
//! MR-847 — Recovery-on-open primitives.
//!
//! This module implements the building blocks of the per-sidecar recovery
//! sweep that closes the documented Phase B → Phase C residual (see
//! `docs/runs.md` "Finalize → publisher residual"). The high-level shape:
//!
//! 1. Each writer that performs a multi-table commit writes a small JSON
//! sidecar at `__recovery/{ulid}.json` BEFORE its per-table
//! `commit_staged` loop, listing every `(table_key, table_path,
//! expected_version, post_commit_pin)` it intends to publish.
//! 2. After the manifest publish (Phase C) succeeds, the writer deletes
//! the sidecar.
//! 3. If the writer crashes between Phase B begin and Phase C success,
//! the sidecar remains. The next `Omnigraph::open` (gated on
//! `OpenMode::ReadWrite`) classifies each table in each sidecar and
//! either rolls forward all tables (if every table is at
//! `post_commit_pin` AND matches the sidecar) or rolls back all
//! `RolledPastExpected` tables to `expected_version`.
//!
//! Phase 2 (this commit) ships only the primitives: sidecar I/O,
//! classifier, decision dispatcher, restore-with-fragment-set-shortcut.
//! No integration into `Omnigraph::open` or any writer yet — those land
//! in Phase 3+.
//!
//! ## Verified Lance behavior the rollback path depends on
//!
//! - `Dataset::restore()` takes no version arg; restores
//! `self.manifest.version` (currently checked-out version). From HEAD =
//! `h`, produces a new commit at `h + 1` with content == checked-out
//! version. Pinned by
//! `tests/staged_writes.rs::lance_restore_appends_one_commit_with_checked_out_content`.
//! - `Dataset::restore` "wins" against concurrent Append/Update/Delete/
//! CreateIndex/Merge — see `check_restore_txn` at lance-4.0.0
//! `src/io/commit/conflict_resolver.rs:986`. The hazard is documented
//! by `tests/staged_writes.rs::lance_restore_loses_to_concurrent_append_via_orphaning`.
//! MR-847 sidesteps this by running recovery only at `Omnigraph::open`
//! (before any other writers can race); MR-856's continuous-recovery
//! reconciler must guard via per-(table_key, branch) queue acquisition
//! once MR-686 lands.
//!
//! See `.context/mr-847-design.md` for the full design.
use lance::Dataset;
use serde::{Deserialize, Serialize};
use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;
/// Subdirectory under the repo root holding sidecar files.
pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery";
/// Current sidecar JSON shape version. Bumping this is a breaking change:
/// older binaries will refuse to interpret newer sidecars (intentional —
/// see [`SidecarSchemaError`]).
pub(crate) const SIDECAR_SCHEMA_VERSION: u32 = 1;
/// Categorizes the writer that produced a sidecar so audit trail and
/// observability can attribute recoveries to the right code path.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) enum SidecarKind {
/// `MutationStaging::finalize` — `mutate_as` and the bulk loader.
Mutation,
/// `loader/mod.rs` — distinct from mutations only for audit clarity.
Load,
/// `schema_apply::apply_schema_with_lock` — table rewrites + indices.
SchemaApply,
/// `branch_merge_on_current_target` — three-way merge publishes.
BranchMerge,
/// `ensure_indices_for_branch` — index lifecycle commits.
EnsureIndices,
}
/// One table's contribution to a sidecar's intended commit. The classifier
/// uses these to decide per-table state at recovery time.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SidecarTablePin {
/// Stable identifier (`node:Person`, `edge:Knows`, etc.).
pub table_key: String,
/// Full URI to the Lance dataset for this table.
pub table_path: String,
/// Manifest-pinned version at writer start (CAS expectation).
pub expected_version: u64,
/// Lance HEAD that the writer's `commit_staged` would produce
/// (typically `expected_version + 1`).
pub post_commit_pin: u64,
}
/// In-memory representation of the on-disk JSON sidecar.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RecoverySidecar {
pub schema_version: u32,
pub operation_id: String,
pub started_at: String,
pub branch: Option<String>,
pub actor_id: Option<String>,
pub writer_kind: SidecarKind,
pub tables: Vec<SidecarTablePin>,
}
/// Opaque handle returned by [`write_sidecar`] so the caller can delete
/// the sidecar after Phase C succeeds. Holding the handle does NOT keep
/// the sidecar alive — it just records the URI to delete.
#[derive(Debug, Clone)]
pub(crate) struct RecoverySidecarHandle {
pub(crate) operation_id: String,
pub(crate) sidecar_uri: String,
}
/// Error returned when the sidecar's `schema_version` is unknown to this
/// binary. We refuse-and-error rather than read-and-warn: an old binary
/// cannot guess what semantics a newer writer baked into a future shape.
/// Operator action is required (typically: upgrade the binary).
#[derive(Debug)]
pub(crate) struct SidecarSchemaError {
pub sidecar_uri: String,
pub found_version: u32,
pub supported_version: u32,
}
impl std::fmt::Display for SidecarSchemaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"recovery sidecar at '{}' declares schema_version={}, but this \
binary supports only schema_version={}; refusing to interpret \
upgrade omnigraph or remove the sidecar with operator review",
self.sidecar_uri, self.found_version, self.supported_version,
)
}
}
impl std::error::Error for SidecarSchemaError {}
impl From<SidecarSchemaError> for OmniError {
fn from(err: SidecarSchemaError) -> Self {
OmniError::manifest_internal(err.to_string())
}
}
/// Per-table classification of observed Lance HEAD vs. manifest-pinned
/// state, computed against the sidecar's intent.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TableClassification {
/// `lance_head == manifest_pinned == sidecar.expected_version`.
/// The writer never reached this table's `commit_staged` (or this
/// table wasn't touched yet). No drift; no action.
NoMovement,
/// `lance_head == manifest_pinned + 1` AND
/// `sidecar.expected_version == manifest_pinned` AND
/// `sidecar.post_commit_pin == lance_head`. The writer's
/// `commit_staged` for this table succeeded; only Phase C did not
/// land. Eligible for roll-forward (in the all-or-nothing decision).
RolledPastExpected,
/// `lance_head == manifest_pinned + 1` but the sidecar's
/// `expected_version`/`post_commit_pin` don't match. Some other writer
/// or recovery action moved this table. Roll back to
/// `sidecar.expected_version`.
UnexpectedAtP1,
/// `lance_head > manifest_pinned + 1`. Multi-step orphan from a
/// previous restore attempt or an external mutation. Roll back to
/// `sidecar.expected_version`.
UnexpectedMultistep,
/// `lance_head < manifest_pinned`. Should be impossible: the manifest
/// pin can only advance after a successful Lance commit. Surface
/// loudly and abort recovery.
InvariantViolation { observed: u64 },
}
/// Per-sidecar decision derived from the table classifications.
///
/// **All-or-nothing**: the writer that produced the sidecar intended an
/// atomic publish across every table it listed. Rolling forward only some
/// of them would publish a partial commit and violate `docs/invariants.md`
/// §VI.23. The decision is based on the worst classification:
///
/// - Any `InvariantViolation` → `Abort` (operator action required).
/// - Any `UnexpectedAtP1` / `UnexpectedMultistep` / `NoMovement` →
/// `RollBack` all `RolledPastExpected` tables to `expected_version`.
/// - All `RolledPastExpected` → `RollForward` every table in one
/// manifest publish.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SidecarDecision {
/// All tables successfully reached Phase B for this writer; only the
/// manifest publish (Phase C) didn't land. Roll the pin forward atomically.
RollForward,
/// Some tables didn't reach Phase B (or sidecar doesn't match observed state).
/// Roll back the rolled-past-expected ones; leave the no-movement ones alone.
RollBack,
/// An invariant was violated. Refuse to act; surface for operator review.
Abort,
}
/// Build the `__recovery/` directory URI under a repo root.
pub(crate) fn recovery_dir_uri(root_uri: &str) -> String {
let trimmed = root_uri.trim_end_matches('/');
format!("{}/{}", trimmed, RECOVERY_DIR_NAME)
}
/// Build the URI for a specific sidecar (`__recovery/{operation_id}.json`).
pub(crate) fn sidecar_uri(root_uri: &str, operation_id: &str) -> String {
let dir = recovery_dir_uri(root_uri);
format!("{}/{}.json", dir, operation_id)
}
/// Write a sidecar atomically and return a handle for later deletion.
///
/// The atomicity contract is inherited from [`StorageAdapter::write_text`]:
/// LocalStorageAdapter writes via `tokio::fs::write` (whole-file replace);
/// S3StorageAdapter writes via PutObject (atomic at the object level).
/// Both are sufficient for sidecar semantics — readers either see the
/// complete sidecar or none.
pub(crate) async fn write_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
sidecar: &RecoverySidecar,
) -> Result<RecoverySidecarHandle> {
debug_assert_eq!(sidecar.schema_version, SIDECAR_SCHEMA_VERSION);
let uri = sidecar_uri(root_uri, &sidecar.operation_id);
let json = serde_json::to_string_pretty(sidecar).map_err(|err| {
OmniError::manifest_internal(format!("failed to serialize recovery sidecar: {}", err))
})?;
storage.write_text(&uri, &json).await?;
Ok(RecoverySidecarHandle {
operation_id: sidecar.operation_id.clone(),
sidecar_uri: uri,
})
}
/// Delete a sidecar after Phase C succeeded. Idempotent (safe to retry).
pub(crate) async fn delete_sidecar(
handle: &RecoverySidecarHandle,
storage: &dyn StorageAdapter,
) -> Result<()> {
storage.delete(&handle.sidecar_uri).await
}
/// Read every sidecar under `__recovery/`. Returns an empty vec if the
/// directory does not exist or is empty (the steady-state path).
///
/// Sidecars whose `schema_version` is unsupported by this binary are NOT
/// silently skipped — the function returns an error so an operator can
/// investigate. Rationale: a sidecar with an unknown shape may encode
/// state we don't know how to recover; better to fail open than guess.
pub(crate) async fn list_sidecars(
root_uri: &str,
storage: &dyn StorageAdapter,
) -> Result<Vec<RecoverySidecar>> {
let dir = recovery_dir_uri(root_uri);
let uris = storage.list_dir(&dir).await?;
let mut out = Vec::with_capacity(uris.len());
for uri in uris {
// Skip non-JSON files defensively; the directory is ours but a
// future feature might leave other artifacts here.
if !uri.ends_with(".json") {
continue;
}
let body = storage.read_text(&uri).await?;
let sidecar = parse_sidecar(&uri, &body)?;
out.push(sidecar);
}
Ok(out)
}
/// Parse a sidecar body, enforcing the schema-version refusal policy.
/// Exposed separately so unit tests can exercise the parse path without
/// going through storage.
pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result<RecoverySidecar> {
// First check the schema_version peek — gives a typed error before we
// try to deserialize the rest of the structure (which might fail with
// a less-helpful "missing field" message).
#[derive(Deserialize)]
struct Peek {
schema_version: u32,
}
let peek: Peek = serde_json::from_str(body).map_err(|err| {
OmniError::manifest_internal(format!(
"recovery sidecar at '{}' is not valid JSON: {}",
sidecar_uri, err
))
})?;
if peek.schema_version != SIDECAR_SCHEMA_VERSION {
return Err(SidecarSchemaError {
sidecar_uri: sidecar_uri.to_string(),
found_version: peek.schema_version,
supported_version: SIDECAR_SCHEMA_VERSION,
}
.into());
}
serde_json::from_str(body).map_err(|err| {
OmniError::manifest_internal(format!(
"recovery sidecar at '{}' failed to deserialize: {}",
sidecar_uri, err
))
})
}
/// Classify one table's observed state vs. the sidecar's intent.
pub(crate) fn classify_table(
pin: &SidecarTablePin,
lance_head: u64,
manifest_pinned: u64,
) -> TableClassification {
use TableClassification::*;
if lance_head < manifest_pinned {
return InvariantViolation {
observed: lance_head,
};
}
if lance_head == manifest_pinned {
return NoMovement;
}
// lance_head > manifest_pinned
if lance_head == manifest_pinned + 1 {
if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head {
RolledPastExpected
} else {
UnexpectedAtP1
}
} else {
// lance_head > manifest_pinned + 1
UnexpectedMultistep
}
}
/// Compute the per-sidecar decision from a slice of table classifications.
///
/// All-or-nothing per `docs/invariants.md` §VI.23 — see [`SidecarDecision`].
pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision {
use SidecarDecision::*;
use TableClassification::*;
if classifications.iter().any(|c| matches!(c, InvariantViolation { .. })) {
return Abort;
}
if classifications
.iter()
.any(|c| matches!(c, NoMovement | UnexpectedAtP1 | UnexpectedMultistep))
{
return RollBack;
}
// All RolledPastExpected (or the slice is empty — no-op trivially).
RollForward
}
/// Restore a single table's Lance HEAD to `expected_version`, producing a
/// new commit at HEAD+1 with content == content-at-`expected_version`.
///
/// Idempotency: if the latest Lance commit's fragment-id set already equals
/// the fragment-id set at `expected_version`, this is a no-op. Soundness —
/// Lance fragments are immutable; equal fragment-ids ⇒ equal content.
/// This guards against version pile-up under repeated mid-rollback crashes
/// (see `docs/runs.md` "Finalize → publisher residual" + `.context/mr-847-design.md`
/// §"Fragment-set equality short-circuit").
pub(crate) async fn restore_table_to_version(
table_path: &str,
expected_version: u64,
) -> Result<()> {
let head = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let target = head
.checkout_version(expected_version)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
if fragment_ids(&head) == fragment_ids(&target) {
// Lance HEAD already reflects target content (a prior restore
// landed; we just didn't get to delete the sidecar). No-op.
return Ok(());
}
// checkout returns a NEW Dataset; restore() takes &mut self.
let mut to_restore = target;
to_restore
.restore()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(())
}
fn fragment_ids(ds: &Dataset) -> Vec<u64> {
let mut ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
ids.sort_unstable();
ids
}
/// Convenience: build a [`RecoverySidecar`] with auto-generated
/// `operation_id` and `started_at`. The caller fills in the other fields.
pub(crate) fn new_sidecar(
writer_kind: SidecarKind,
branch: Option<String>,
actor_id: Option<String>,
tables: Vec<SidecarTablePin>,
) -> RecoverySidecar {
use std::time::{SystemTime, UNIX_EPOCH};
let operation_id = ulid::Ulid::new().to_string();
let started_at = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => format!("{}", d.as_micros()),
Err(_) => "0".to_string(),
};
RecoverySidecar {
schema_version: SIDECAR_SCHEMA_VERSION,
operation_id,
started_at,
branch,
actor_id,
writer_kind,
tables,
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use crate::storage::LocalStorageAdapter;
use crate::table_store::TableStore;
fn person_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
]))
}
fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, age)| *age).collect();
RecordBatch::try_new(
person_schema(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
],
)
.unwrap()
}
fn make_pin(table_key: &str, table_path: &str, expected: u64, post: u64) -> SidecarTablePin {
SidecarTablePin {
table_key: table_key.to_string(),
table_path: table_path.to_string(),
expected_version: expected,
post_commit_pin: post,
}
}
#[test]
fn sidecar_round_trips_through_json() {
let original = new_sidecar(
SidecarKind::Mutation,
Some("main".to_string()),
Some("act-alice".to_string()),
vec![make_pin("node:Person", "file:///tmp/people.lance", 5, 6)],
);
let json = serde_json::to_string(&original).unwrap();
let parsed = parse_sidecar("file:///tmp/__recovery/x.json", &json).unwrap();
assert_eq!(parsed.schema_version, SIDECAR_SCHEMA_VERSION);
assert_eq!(parsed.operation_id, original.operation_id);
assert_eq!(parsed.writer_kind, SidecarKind::Mutation);
assert_eq!(parsed.branch.as_deref(), Some("main"));
assert_eq!(parsed.actor_id.as_deref(), Some("act-alice"));
assert_eq!(parsed.tables.len(), 1);
assert_eq!(parsed.tables[0].table_key, "node:Person");
}
#[test]
fn parse_sidecar_refuses_unknown_schema_version() {
let body = r#"{
"schema_version": 99,
"operation_id": "01H000000000000000000000XX",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "Mutation",
"tables": []
}"#;
let err = parse_sidecar("file:///tmp/__recovery/x.json", body).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("schema_version=99") && msg.contains("supports only schema_version=1"),
"expected SidecarSchemaError mentioning the version mismatch, got: {}",
msg,
);
}
#[test]
fn classify_no_movement_when_head_equals_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(classify_table(&pin, 5, 5), TableClassification::NoMovement);
}
#[test]
fn classify_rolled_past_expected_when_sidecar_matches() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 6, 5),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_unexpected_at_p1_when_sidecar_does_not_match() {
// Same +1 drift but post_commit_pin says it should be 7, not 6.
let pin = make_pin("node:Person", "irrelevant", 5, 7);
assert_eq!(
classify_table(&pin, 6, 5),
TableClassification::UnexpectedAtP1,
);
}
#[test]
fn classify_unexpected_multistep_when_head_jumped_more_than_one() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 8, 5),
TableClassification::UnexpectedMultistep,
);
}
#[test]
fn classify_invariant_violation_when_head_below_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5),
TableClassification::InvariantViolation { observed: 3 },
);
}
#[test]
fn decide_roll_forward_when_all_classifications_match() {
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::RolledPastExpected,
];
assert_eq!(decide(&cls), SidecarDecision::RollForward);
}
#[test]
fn decide_roll_back_on_mid_phase_b_crash_mix() {
// Mid-Phase-B crash: one table iterated (RolledPastExpected),
// another not yet iterated (NoMovement).
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::NoMovement,
];
assert_eq!(decide(&cls), SidecarDecision::RollBack);
}
#[test]
fn decide_roll_back_on_unexpected_at_p1() {
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::UnexpectedAtP1,
];
assert_eq!(decide(&cls), SidecarDecision::RollBack);
}
#[test]
fn decide_abort_on_invariant_violation() {
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::InvariantViolation { observed: 1 },
];
assert_eq!(decide(&cls), SidecarDecision::Abort);
}
#[test]
fn decide_roll_forward_on_empty_slice() {
// Degenerate case: no tables in the sidecar. Vacuously RollForward
// (and the executor will iterate zero tables).
assert_eq!(decide(&[]), SidecarDecision::RollForward);
}
#[tokio::test]
async fn restore_table_to_version_appends_one_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
store
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
store
.append_batch(&uri, &mut ds, person_batch(&[("carol", Some(40))]))
.await
.unwrap();
let head_before = ds.version().version;
assert_eq!(head_before, 3);
restore_table_to_version(&uri, 1).await.unwrap();
let post = Dataset::open(&uri).await.unwrap();
assert_eq!(post.version().version, head_before + 1);
// Content matches v1 (just alice).
let scanner = post.scan();
let batches: Vec<RecordBatch> = futures::TryStreamExt::try_collect(
scanner.try_into_stream().await.unwrap(),
)
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 1);
}
#[tokio::test]
async fn restore_table_to_version_no_ops_when_fragments_already_match() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
store
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
// First restore: HEAD goes from 2 to 3 (with content == v1).
restore_table_to_version(&uri, 1).await.unwrap();
let mid = Dataset::open(&uri).await.unwrap().version().version;
assert_eq!(mid, 3);
// Second restore to v1: content already matches; no-op.
restore_table_to_version(&uri, 1).await.unwrap();
let post = Dataset::open(&uri).await.unwrap().version().version;
assert_eq!(
post, mid,
"second restore must short-circuit via fragment-set equality"
);
}
#[tokio::test]
async fn list_sidecars_returns_empty_when_dir_missing() {
let dir = tempfile::tempdir().unwrap();
let storage = LocalStorageAdapter::default();
let result = list_sidecars(dir.path().to_str().unwrap(), &storage)
.await
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn write_then_list_then_delete_round_trip() {
let dir = tempfile::tempdir().unwrap();
// Create the __recovery/ subdir so write_sidecar's parent exists
// (LocalStorageAdapter::write_text doesn't mkdir parents).
std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap();
let storage = LocalStorageAdapter::default();
let root = dir.path().to_str().unwrap();
let sidecar = new_sidecar(
SidecarKind::Mutation,
Some("main".to_string()),
Some("act-alice".to_string()),
vec![make_pin("node:Person", "file:///tmp/x.lance", 5, 6)],
);
let handle = write_sidecar(root, &storage, &sidecar).await.unwrap();
assert_eq!(handle.operation_id, sidecar.operation_id);
let listed = list_sidecars(root, &storage).await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].operation_id, sidecar.operation_id);
delete_sidecar(&handle, &storage).await.unwrap();
let after = list_sidecars(root, &storage).await.unwrap();
assert!(after.is_empty());
}
#[tokio::test]
async fn list_sidecars_skips_non_json_files() {
let dir = tempfile::tempdir().unwrap();
std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap();
// Drop a non-JSON file the sweep must ignore (e.g., .DS_Store).
std::fs::write(
dir.path().join(RECOVERY_DIR_NAME).join(".DS_Store"),
"noise",
)
.unwrap();
let storage = LocalStorageAdapter::default();
let result = list_sidecars(dir.path().to_str().unwrap(), &storage)
.await
.unwrap();
assert!(result.is_empty());
}
}

View file

@ -1276,6 +1276,10 @@ edge WorksAt: Person -> Company
self.deletes.lock().unwrap().push(uri.to_string());
self.inner.delete(uri).await
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}
}
#[tokio::test]

View file

@ -27,6 +27,10 @@ pub trait StorageAdapter: Debug + Send + Sync {
async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
/// Remove a file. Returns Ok(()) if the file does not exist.
async fn delete(&self, uri: &str) -> Result<()>;
/// List all files (non-recursively, files only) directly under `dir_uri`.
/// Returns full URIs (same scheme as `dir_uri`). The result is unordered.
/// Returns Ok(empty) if the directory does not exist or is empty.
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -82,6 +86,27 @@ impl StorageAdapter for LocalStorageAdapter {
Err(err) => Err(err.into()),
}
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
let path = local_path_from_uri(dir_uri)?;
let mut out = Vec::new();
let mut entries = match tokio::fs::read_dir(&path).await {
Ok(e) => e,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(out),
Err(err) => return Err(err.into()),
};
let dir_str = dir_uri.trim_end_matches('/');
while let Some(entry) = entries.next_entry().await? {
let ft = entry.file_type().await?;
if !ft.is_file() {
continue;
}
if let Some(name) = entry.file_name().to_str() {
out.push(format!("{}/{}", dir_str, name));
}
}
Ok(out)
}
}
#[async_trait]
@ -154,6 +179,43 @@ impl StorageAdapter for S3StorageAdapter {
Err(err) => Err(storage_backend_error("delete", uri, err)),
}
}
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
// Normalize: ensure the URI describes a directory (trailing '/') so
// we don't match sibling paths with a shared prefix
// (e.g. listing `__recovery` shouldn't match `__recovery_log/...`).
let dir_with_slash = if dir_uri.ends_with('/') {
dir_uri.to_string()
} else {
format!("{}/", dir_uri)
};
// object_store::Path strips the trailing '/'; re-add it for filtering.
let prefix_loc = self.object_path(&dir_with_slash)?;
let prefix_with_slash = format!("{}/", prefix_loc.as_ref());
let mut entries = self.store.list(Some(&prefix_loc));
let mut out = Vec::new();
let bucket_root = format!("{}{}/", S3_SCHEME_PREFIX, self.bucket);
while let Some(meta) = entries
.try_next()
.await
.map_err(|err| storage_backend_error("list_dir", dir_uri, err))?
{
let key_str = meta.location.as_ref();
// Require the directory boundary to filter out sibling-prefix
// matches (object_store's `list` is prefix-based, not dir-based).
if !key_str.starts_with(&prefix_with_slash) {
continue;
}
let suffix = &key_str[prefix_with_slash.len()..];
// Non-recursive: skip anything inside a sub-directory.
if suffix.contains('/') {
continue;
}
out.push(format!("{}{}", bucket_root, key_str));
}
Ok(out)
}
}
impl S3StorageAdapter {