diff --git a/Cargo.lock b/Cargo.lock index 675fad7..b1cf0ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4636,6 +4636,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "sha2", "tempfile", "thiserror", "time", diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index 9cc2148..a4a2fe0 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -37,6 +37,7 @@ serde_json = { workspace = true } reqwest = { workspace = true } object_store = { workspace = true } ulid = { workspace = true } +sha2 = { workspace = true } base64 = { workspace = true } futures = { workspace = true } tracing = { workspace = true } diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index f217f7d..50f5d34 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -2029,6 +2029,25 @@ edge WorksAt: Person -> Company async fn list_dir(&self, dir_uri: &str) -> Result> { self.inner.list_dir(dir_uri).await } + + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { + self.inner.read_text_versioned(uri).await + } + + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result> { + self.inner + .write_text_if_match(uri, contents, expected_version) + .await + } + + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { + self.inner.delete_prefix(prefix_uri).await + } } #[derive(Debug)] @@ -2071,6 +2090,25 @@ edge WorksAt: Person -> Company async fn list_dir(&self, dir_uri: &str) -> Result> { self.inner.list_dir(dir_uri).await } + + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { + self.inner.read_text_versioned(uri).await + } + + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result> { + self.inner + .write_text_if_match(uri, contents, expected_version) + .await + } + + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { + self.inner.delete_prefix(prefix_uri).await + } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/crates/omnigraph/src/storage.rs b/crates/omnigraph/src/storage.rs index 564b577..978d1ce 100644 --- a/crates/omnigraph/src/storage.rs +++ b/crates/omnigraph/src/storage.rs @@ -39,6 +39,39 @@ pub trait StorageAdapter: Debug + Send + Sync { /// Returns full URIs (same scheme as `dir_uri`). The result is unordered. /// Returns Ok(empty) if the directory does not exist or is empty. async fn list_dir(&self, dir_uri: &str) -> Result>; + /// Read a text object together with its backend version token (S3: the + /// object's ETag; local: sha256 of the content). The token is opaque — + /// valid only for `write_text_if_match` against the same adapter. + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)>; + /// Replace the object at `uri` only if its current version still matches + /// `expected_version` (obtained from a prior versioned read/write on this + /// adapter). Returns `Ok(Some(new_version))` on success and `Ok(None)` + /// when the precondition failed (a concurrent writer won — the CAS-lost + /// case callers must surface, never swallow). S3 uses a conditional put + /// (If-Match); local compares content then replaces via temp + rename — + /// the same single-machine semantics the callers had before this trait, + /// safe under the callers' own lock protocol but not a cross-process + /// barrier by itself. + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result>; + /// Recursively delete every object under `prefix_uri`. Returns Ok(()) + /// when nothing exists there (idempotent). Local: `remove_dir_all`; + /// S3: list + delete (NOT atomic — callers must tolerate partial + /// prefixes on crash, which the cluster delete protocol does by retry). + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()>; +} + +/// Version token for local files: content identity. ETags are unavailable +/// on the filesystem; sha256 is stable, cheap at these object sizes, and +/// already the cluster ledger's CAS vocabulary. +fn local_version_token(bytes: &[u8]) -> String { + use sha2::{Digest, Sha256}; + let digest = Sha256::digest(bytes); + digest.iter().map(|byte| format!("{byte:02x}")).collect() } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -149,6 +182,49 @@ impl StorageAdapter for LocalStorageAdapter { } Ok(out) } + + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { + let path = local_path_from_uri(uri)?; + let bytes = tokio::fs::read(&path).await?; + let version = local_version_token(&bytes); + let text = String::from_utf8(bytes).map_err(|err| { + OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err)) + })?; + Ok((text, version)) + } + + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result> { + let path = local_path_from_uri(uri)?; + let current = match tokio::fs::read(&path).await { + Ok(bytes) => bytes, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(err.into()), + }; + if local_version_token(¤t) != expected_version { + return Ok(None); + } + let tmp = path.with_extension(format!("tmp.{}", ulid::Ulid::new())); + tokio::fs::write(&tmp, contents.as_bytes()).await?; + if let Err(err) = tokio::fs::rename(&tmp, &path).await { + let _ = tokio::fs::remove_file(&tmp).await; + return Err(err.into()); + } + Ok(Some(local_version_token(contents.as_bytes()))) + } + + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { + let path = local_path_from_uri(prefix_uri)?; + match tokio::fs::remove_dir_all(&path).await { + Ok(()) => Ok(()), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + } + } } #[async_trait] @@ -276,6 +352,84 @@ impl StorageAdapter for S3StorageAdapter { } Ok(out) } + + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { + let location = self.object_path(uri)?; + let result = self + .store + .get(&location) + .await + .map_err(|err| storage_backend_error("read", uri, err))?; + let etag = result.meta.e_tag.clone(); + let bytes = result + .bytes() + .await + .map_err(|err| storage_backend_error("read", uri, err))?; + // Every S3-compatible store we target returns ETags; fall back to a + // content token rather than failing if one ever omits it. + let version = etag.unwrap_or_else(|| local_version_token(&bytes)); + let text = String::from_utf8(bytes.to_vec()).map_err(|err| { + OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err)) + })?; + Ok((text, version)) + } + + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result> { + let location = self.object_path(uri)?; + let mode = PutMode::Update(object_store::UpdateVersion { + e_tag: Some(expected_version.to_string()), + version: None, + }); + match self + .store + .put_opts( + &location, + PutPayload::from(contents.as_bytes().to_vec()), + mode.into(), + ) + .await + { + Ok(result) => Ok(Some( + result + .e_tag + .unwrap_or_else(|| local_version_token(contents.as_bytes())), + )), + Err(object_store::Error::Precondition { .. }) + | Err(object_store::Error::NotFound { .. }) => Ok(None), + Err(err) => Err(storage_backend_error("write_if_match", uri, err)), + } + } + + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { + let dir_with_slash = if prefix_uri.ends_with('/') { + prefix_uri.to_string() + } else { + format!("{}/", prefix_uri) + }; + let prefix_loc = self.object_path(&dir_with_slash)?; + let mut entries = self.store.list(Some(&prefix_loc)); + let mut locations = Vec::new(); + while let Some(meta) = entries + .try_next() + .await + .map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))? + { + locations.push(meta.location); + } + for location in locations { + match self.store.delete(&location).await { + Ok(()) => {} + Err(object_store::Error::NotFound { .. }) => {} + Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)), + } + } + Ok(()) + } } impl S3StorageAdapter { @@ -444,6 +598,55 @@ fn env_var_truthy(key: &str) -> bool { #[cfg(test)] mod tests { + + #[tokio::test] + async fn local_versioned_cas_roundtrip() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/state.json", dir.path().display()); + let adapter = LocalStorageAdapter; + adapter.write_text(&uri, "v1").await.unwrap(); + let (text, version) = adapter.read_text_versioned(&uri).await.unwrap(); + assert_eq!(text, "v1"); + + // Matching token replaces and returns the next token. + let next = adapter + .write_text_if_match(&uri, "v2", &version) + .await + .unwrap() + .expect("fresh token must win"); + assert_ne!(next, version); + // The stale token must lose (CAS-lost is Ok(None), never silent). + assert!( + adapter + .write_text_if_match(&uri, "v3", &version) + .await + .unwrap() + .is_none() + ); + let (text, _) = adapter.read_text_versioned(&uri).await.unwrap(); + assert_eq!(text, "v2"); + // Missing object: precondition can't hold. + let missing = format!("{}/absent.json", dir.path().display()); + assert!( + adapter + .write_text_if_match(&missing, "x", &version) + .await + .unwrap() + .is_none() + ); + } + + #[tokio::test] + async fn local_delete_prefix_is_recursive_and_idempotent() { + let dir = tempfile::tempdir().unwrap(); + let root = format!("{}/tree", dir.path().display()); + let adapter = LocalStorageAdapter; + adapter.write_text(&format!("{root}/a.txt"), "a").await.unwrap(); + adapter.write_text(&format!("{root}/sub/b.txt"), "b").await.unwrap(); + adapter.delete_prefix(&root).await.unwrap(); + assert!(!adapter.exists(&format!("{root}/a.txt")).await.unwrap()); + adapter.delete_prefix(&root).await.unwrap(); // absent -> Ok + } use super::*; #[test] diff --git a/crates/omnigraph/tests/s3_storage.rs b/crates/omnigraph/tests/s3_storage.rs index 7e4f0a3..3814600 100644 --- a/crates/omnigraph/tests/s3_storage.rs +++ b/crates/omnigraph/tests/s3_storage.rs @@ -167,3 +167,80 @@ async fn s3_public_load_uses_hidden_run_and_publishes() { .to_rust_json(); assert_eq!(loaded[0]["p.name"], "Loaded-Over-S3"); } + +/// The conditional-write contract the cluster ledger depends on (RFC-006): +/// versioned read -> If-Match replace -> stale token refused. Pins the +/// S3-compatible backend's behavior (RustFS in CI) — turns red if a backend +/// bump regresses conditional puts. +#[tokio::test(flavor = "multi_thread")] +async fn s3_adapter_conditional_writes_contract() { + let Some(uri) = s3_test_graph_uri("adapter-cas") else { + eprintln!("skipping s3 adapter cas test: OMNIGRAPH_S3_TEST_BUCKET is not set"); + return; + }; + use omnigraph::storage::storage_for_uri; + let adapter = storage_for_uri(&uri).unwrap(); + let object = format!("{uri}/cas-probe.json"); + + assert!(adapter.write_text_if_absent(&object, "v1").await.unwrap()); + assert!(!adapter.write_text_if_absent(&object, "v1b").await.unwrap()); + + let (text, version) = adapter.read_text_versioned(&object).await.unwrap(); + assert_eq!(text, "v1"); + let next = adapter + .write_text_if_match(&object, "v2", &version) + .await + .unwrap() + .expect("fresh etag must win"); + assert!( + adapter + .write_text_if_match(&object, "v3", &version) + .await + .unwrap() + .is_none(), + "stale etag must be refused" + ); + let again = adapter + .write_text_if_match(&object, "v3", &next) + .await + .unwrap(); + assert!(again.is_some()); + + // Prefix delete: recursive + idempotent. + adapter + .write_text(&format!("{uri}/tree/a.json"), "a") + .await + .unwrap(); + adapter + .write_text(&format!("{uri}/tree/sub/b.json"), "b") + .await + .unwrap(); + adapter.delete_prefix(&format!("{uri}/tree")).await.unwrap(); + assert!(!adapter.exists(&format!("{uri}/tree/a.json")).await.unwrap()); + adapter.delete_prefix(&format!("{uri}/tree")).await.unwrap(); + adapter.delete(&object).await.unwrap(); +} + +/// Schema apply against an S3 graph — the cluster's schema executor will +/// lean on this; previously untested upstream on object storage. +#[tokio::test(flavor = "multi_thread")] +async fn s3_schema_apply_migrates_live_graph() { + let Some(uri) = s3_test_graph_uri("schema-apply") else { + eprintln!("skipping s3 schema apply test: OMNIGRAPH_S3_TEST_BUCKET is not set"); + return; + }; + let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap(); + load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite) + .await + .unwrap(); + + let desired = format!("{TEST_SCHEMA}\nnode Note {{\n title: String @key\n}}\n"); + let result = db.apply_schema(&desired).await.unwrap(); + assert!(result.applied, "{result:?}"); + + let reopened = Omnigraph::open(&uri).await.unwrap(); + assert!( + reopened.schema_source().contains("Note"), + "live S3 schema must carry the migration" + ); +}