feat(storage): versioned CAS, conditional replace, and prefix delete on StorageAdapter

Three primitives the cluster's object-storage port (RFC-006) needs, on the
engine's existing adapter rather than a parallel store:

- read_text_versioned: content + an opaque backend version token (S3: the
  ETag from GET; local: content sha256 — ETags don't exist on a filesystem).
- write_text_if_match: replace only when the token still matches. S3 maps to
  a conditional put (PutMode::Update / If-Match) — verified against RustFS
  beta.8 through the real object_store 0.12.5 path, no extra builder config
  needed; local compares content then swaps via temp+rename, the same
  single-machine semantics callers had before this trait (safe under their
  own lock protocol, not a cross-process barrier by itself). CAS-lost is
  Ok(None), never silent.
- delete_prefix: recursive + idempotent (local remove_dir_all; S3 list +
  delete, with the non-atomicity documented for crash-retry callers).

Gated S3 coverage: s3_adapter_conditional_writes_contract pins the
conditional-write behavior the cluster ledger will depend on (red if a
backend bump regresses it), and s3_schema_apply_migrates_live_graph closes
the previously-untested schema-apply-on-S3 path before the cluster's schema
executor leans on it. Engine gains the sha2 workspace dep.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-11 05:09:45 +03:00
parent 328bfef6fb
commit f48e69b999
5 changed files with 320 additions and 0 deletions

1
Cargo.lock generated
View file

@ -4636,6 +4636,7 @@ dependencies = [
"serde",
"serde_json",
"serial_test",
"sha2",
"tempfile",
"thiserror",
"time",

View file

@ -37,6 +37,7 @@ serde_json = { workspace = true }
reqwest = { workspace = true }
object_store = { workspace = true }
ulid = { workspace = true }
sha2 = { workspace = true }
base64 = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }

View file

@ -2029,6 +2029,25 @@ edge WorksAt: Person -> Company
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.inner.read_text_versioned(uri).await
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}
#[derive(Debug)]
@ -2071,6 +2090,25 @@ edge WorksAt: Person -> Company
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.inner.read_text_versioned(uri).await
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

View file

@ -39,6 +39,39 @@ pub trait StorageAdapter: Debug + Send + Sync {
/// Returns full URIs (same scheme as `dir_uri`). The result is unordered.
/// Returns Ok(empty) if the directory does not exist or is empty.
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
/// Read a text object together with its backend version token (S3: the
/// object's ETag; local: sha256 of the content). The token is opaque —
/// valid only for `write_text_if_match` against the same adapter.
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)>;
/// Replace the object at `uri` only if its current version still matches
/// `expected_version` (obtained from a prior versioned read/write on this
/// adapter). Returns `Ok(Some(new_version))` on success and `Ok(None)`
/// when the precondition failed (a concurrent writer won — the CAS-lost
/// case callers must surface, never swallow). S3 uses a conditional put
/// (If-Match); local compares content then replaces via temp + rename —
/// the same single-machine semantics the callers had before this trait,
/// safe under the callers' own lock protocol but not a cross-process
/// barrier by itself.
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>>;
/// Recursively delete every object under `prefix_uri`. Returns Ok(())
/// when nothing exists there (idempotent). Local: `remove_dir_all`;
/// S3: list + delete (NOT atomic — callers must tolerate partial
/// prefixes on crash, which the cluster delete protocol does by retry).
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()>;
}
/// Version token for local files: content identity. ETags are unavailable
/// on the filesystem; sha256 is stable, cheap at these object sizes, and
/// already the cluster ledger's CAS vocabulary.
fn local_version_token(bytes: &[u8]) -> String {
use sha2::{Digest, Sha256};
let digest = Sha256::digest(bytes);
digest.iter().map(|byte| format!("{byte:02x}")).collect()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -149,6 +182,49 @@ impl StorageAdapter for LocalStorageAdapter {
}
Ok(out)
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
let path = local_path_from_uri(uri)?;
let bytes = tokio::fs::read(&path).await?;
let version = local_version_token(&bytes);
let text = String::from_utf8(bytes).map_err(|err| {
OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
})?;
Ok((text, version))
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
let path = local_path_from_uri(uri)?;
let current = match tokio::fs::read(&path).await {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
if local_version_token(&current) != expected_version {
return Ok(None);
}
let tmp = path.with_extension(format!("tmp.{}", ulid::Ulid::new()));
tokio::fs::write(&tmp, contents.as_bytes()).await?;
if let Err(err) = tokio::fs::rename(&tmp, &path).await {
let _ = tokio::fs::remove_file(&tmp).await;
return Err(err.into());
}
Ok(Some(local_version_token(contents.as_bytes())))
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
let path = local_path_from_uri(prefix_uri)?;
match tokio::fs::remove_dir_all(&path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err.into()),
}
}
}
#[async_trait]
@ -276,6 +352,84 @@ impl StorageAdapter for S3StorageAdapter {
}
Ok(out)
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
let location = self.object_path(uri)?;
let result = self
.store
.get(&location)
.await
.map_err(|err| storage_backend_error("read", uri, err))?;
let etag = result.meta.e_tag.clone();
let bytes = result
.bytes()
.await
.map_err(|err| storage_backend_error("read", uri, err))?;
// Every S3-compatible store we target returns ETags; fall back to a
// content token rather than failing if one ever omits it.
let version = etag.unwrap_or_else(|| local_version_token(&bytes));
let text = String::from_utf8(bytes.to_vec()).map_err(|err| {
OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
})?;
Ok((text, version))
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
let location = self.object_path(uri)?;
let mode = PutMode::Update(object_store::UpdateVersion {
e_tag: Some(expected_version.to_string()),
version: None,
});
match self
.store
.put_opts(
&location,
PutPayload::from(contents.as_bytes().to_vec()),
mode.into(),
)
.await
{
Ok(result) => Ok(Some(
result
.e_tag
.unwrap_or_else(|| local_version_token(contents.as_bytes())),
)),
Err(object_store::Error::Precondition { .. })
| Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(err) => Err(storage_backend_error("write_if_match", uri, err)),
}
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
let dir_with_slash = if prefix_uri.ends_with('/') {
prefix_uri.to_string()
} else {
format!("{}/", prefix_uri)
};
let prefix_loc = self.object_path(&dir_with_slash)?;
let mut entries = self.store.list(Some(&prefix_loc));
let mut locations = Vec::new();
while let Some(meta) = entries
.try_next()
.await
.map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))?
{
locations.push(meta.location);
}
for location in locations {
match self.store.delete(&location).await {
Ok(()) => {}
Err(object_store::Error::NotFound { .. }) => {}
Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)),
}
}
Ok(())
}
}
impl S3StorageAdapter {
@ -444,6 +598,55 @@ fn env_var_truthy(key: &str) -> bool {
#[cfg(test)]
mod tests {
#[tokio::test]
async fn local_versioned_cas_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/state.json", dir.path().display());
let adapter = LocalStorageAdapter;
adapter.write_text(&uri, "v1").await.unwrap();
let (text, version) = adapter.read_text_versioned(&uri).await.unwrap();
assert_eq!(text, "v1");
// Matching token replaces and returns the next token.
let next = adapter
.write_text_if_match(&uri, "v2", &version)
.await
.unwrap()
.expect("fresh token must win");
assert_ne!(next, version);
// The stale token must lose (CAS-lost is Ok(None), never silent).
assert!(
adapter
.write_text_if_match(&uri, "v3", &version)
.await
.unwrap()
.is_none()
);
let (text, _) = adapter.read_text_versioned(&uri).await.unwrap();
assert_eq!(text, "v2");
// Missing object: precondition can't hold.
let missing = format!("{}/absent.json", dir.path().display());
assert!(
adapter
.write_text_if_match(&missing, "x", &version)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn local_delete_prefix_is_recursive_and_idempotent() {
let dir = tempfile::tempdir().unwrap();
let root = format!("{}/tree", dir.path().display());
let adapter = LocalStorageAdapter;
adapter.write_text(&format!("{root}/a.txt"), "a").await.unwrap();
adapter.write_text(&format!("{root}/sub/b.txt"), "b").await.unwrap();
adapter.delete_prefix(&root).await.unwrap();
assert!(!adapter.exists(&format!("{root}/a.txt")).await.unwrap());
adapter.delete_prefix(&root).await.unwrap(); // absent -> Ok
}
use super::*;
#[test]

View file

@ -167,3 +167,80 @@ async fn s3_public_load_uses_hidden_run_and_publishes() {
.to_rust_json();
assert_eq!(loaded[0]["p.name"], "Loaded-Over-S3");
}
/// The conditional-write contract the cluster ledger depends on (RFC-006):
/// versioned read -> If-Match replace -> stale token refused. Pins the
/// S3-compatible backend's behavior (RustFS in CI) — turns red if a backend
/// bump regresses conditional puts.
#[tokio::test(flavor = "multi_thread")]
async fn s3_adapter_conditional_writes_contract() {
let Some(uri) = s3_test_graph_uri("adapter-cas") else {
eprintln!("skipping s3 adapter cas test: OMNIGRAPH_S3_TEST_BUCKET is not set");
return;
};
use omnigraph::storage::storage_for_uri;
let adapter = storage_for_uri(&uri).unwrap();
let object = format!("{uri}/cas-probe.json");
assert!(adapter.write_text_if_absent(&object, "v1").await.unwrap());
assert!(!adapter.write_text_if_absent(&object, "v1b").await.unwrap());
let (text, version) = adapter.read_text_versioned(&object).await.unwrap();
assert_eq!(text, "v1");
let next = adapter
.write_text_if_match(&object, "v2", &version)
.await
.unwrap()
.expect("fresh etag must win");
assert!(
adapter
.write_text_if_match(&object, "v3", &version)
.await
.unwrap()
.is_none(),
"stale etag must be refused"
);
let again = adapter
.write_text_if_match(&object, "v3", &next)
.await
.unwrap();
assert!(again.is_some());
// Prefix delete: recursive + idempotent.
adapter
.write_text(&format!("{uri}/tree/a.json"), "a")
.await
.unwrap();
adapter
.write_text(&format!("{uri}/tree/sub/b.json"), "b")
.await
.unwrap();
adapter.delete_prefix(&format!("{uri}/tree")).await.unwrap();
assert!(!adapter.exists(&format!("{uri}/tree/a.json")).await.unwrap());
adapter.delete_prefix(&format!("{uri}/tree")).await.unwrap();
adapter.delete(&object).await.unwrap();
}
/// Schema apply against an S3 graph — the cluster's schema executor will
/// lean on this; previously untested upstream on object storage.
#[tokio::test(flavor = "multi_thread")]
async fn s3_schema_apply_migrates_live_graph() {
let Some(uri) = s3_test_graph_uri("schema-apply") else {
eprintln!("skipping s3 schema apply test: OMNIGRAPH_S3_TEST_BUCKET is not set");
return;
};
let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let desired = format!("{TEST_SCHEMA}\nnode Note {{\n title: String @key\n}}\n");
let result = db.apply_schema(&desired).await.unwrap();
assert!(result.applied, "{result:?}");
let reopened = Omnigraph::open(&uri).await.unwrap();
assert!(
reopened.schema_source().contains("Note"),
"live S3 schema must carry the migration"
);
}