Merge pull request #186 from ModernRelay/feat/object-store-primitives

feat(storage,policy): object-store primitives for the cluster port (RFC-006 PR 1/3)
This commit is contained in:
Andrew Altshuler 2026-06-11 05:26:55 +03:00 committed by GitHub
commit 1b1583d897
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 378 additions and 2 deletions

1
Cargo.lock generated
View file

@ -4636,6 +4636,7 @@ dependencies = [
"serde",
"serde_json",
"serial_test",
"sha2",
"tempfile",
"thiserror",
"time",

View file

@ -277,7 +277,14 @@ pub struct PolicyEngine {
impl PolicyConfig {
pub fn load(path: &Path) -> Result<Self> {
let config: Self = serde_yaml::from_str(&fs::read_to_string(path)?)?;
Self::from_source(&fs::read_to_string(path)?)
}
/// Parse + validate a policy from YAML source. The from-content twin of
/// `load` for callers whose policies don't live on the local filesystem
/// (e.g. a cluster catalog on object storage).
pub fn from_source(source: &str) -> Result<Self> {
let config: Self = serde_yaml::from_str(source)?;
config.validate()?;
Ok(config)
}
@ -465,13 +472,26 @@ impl PolicyEngine {
PolicyCompiler::compile(&config, graph_id)
}
/// `load_graph` from YAML content instead of a file path — for policies
/// that live in a non-filesystem catalog (cluster object storage).
pub fn load_graph_from_source(source: &str, graph_id: &str) -> Result<Self> {
let config = PolicyConfig::from_source(source)?;
validate_kind_alignment(&config, PolicyEngineKind::Graph)?;
PolicyCompiler::compile(&config, graph_id)
}
/// Load a server-level policy file. Rejects rules whose actions
/// are per-graph (e.g. `read`, `change`) — those belong in a
/// per-graph policy file, not the server one. Takes no `graph_id`:
/// server-scoped actions resolve against the singleton
/// `Omnigraph::Server::"root"` entity, never a Graph.
pub fn load_server(path: &Path) -> Result<Self> {
let config = PolicyConfig::load(path)?;
Self::load_server_from_source(&fs::read_to_string(path)?)
}
/// `load_server` from YAML content instead of a file path.
pub fn load_server_from_source(source: &str) -> Result<Self> {
let config = PolicyConfig::from_source(source)?;
validate_kind_alignment(&config, PolicyEngineKind::Server)?;
// The Graph entity created by the compiler is never referenced
// by a server-scoped rule, so the label below is purely a
@ -1002,6 +1022,42 @@ impl PolicyChecker for PolicyEngine {
#[cfg(test)]
mod tests {
#[test]
fn from_source_twins_match_path_loaders() {
let yaml = r#"
version: 1
groups:
readers: ["act-r"]
protected_branches: [main]
rules:
- id: r1
allow:
actors: { group: readers }
actions: [read]
branch_scope: any
"#;
let config = PolicyConfig::from_source(yaml).unwrap();
assert_eq!(config.version, 1);
let engine = PolicyEngine::load_graph_from_source(yaml, "g1").unwrap();
drop(engine);
let server_yaml = r#"
version: 1
kind: server
groups:
admins: ["act-a"]
rules:
- id: s1
allow:
actors: { group: admins }
actions: [graph_list]
"#;
PolicyEngine::load_server_from_source(server_yaml).unwrap();
// Kind misalignment stays loud through the from-source path.
assert!(PolicyEngine::load_graph_from_source(server_yaml, "g1").is_err());
assert!(PolicyEngine::load_server_from_source(yaml).is_err());
}
use super::{
PolicyAction, PolicyCompiler, PolicyConfig, PolicyEngine, PolicyExpectation, PolicyRequest,
PolicyTestCase, PolicyTestConfig,

View file

@ -37,6 +37,7 @@ serde_json = { workspace = true }
reqwest = { workspace = true }
object_store = { workspace = true }
ulid = { workspace = true }
sha2 = { workspace = true }
base64 = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }

View file

@ -2029,6 +2029,25 @@ edge WorksAt: Person -> Company
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.inner.read_text_versioned(uri).await
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}
#[derive(Debug)]
@ -2071,6 +2090,25 @@ edge WorksAt: Person -> Company
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
self.inner.list_dir(dir_uri).await
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
self.inner.read_text_versioned(uri).await
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
self.inner
.write_text_if_match(uri, contents, expected_version)
.await
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
self.inner.delete_prefix(prefix_uri).await
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

View file

@ -39,6 +39,39 @@ pub trait StorageAdapter: Debug + Send + Sync {
/// Returns full URIs (same scheme as `dir_uri`). The result is unordered.
/// Returns Ok(empty) if the directory does not exist or is empty.
async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
/// Read a text object together with its backend version token (S3: the
/// object's ETag; local: sha256 of the content). The token is opaque —
/// valid only for `write_text_if_match` against the same adapter.
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)>;
/// Replace the object at `uri` only if its current version still matches
/// `expected_version` (obtained from a prior versioned read/write on this
/// adapter). Returns `Ok(Some(new_version))` on success and `Ok(None)`
/// when the precondition failed (a concurrent writer won — the CAS-lost
/// case callers must surface, never swallow). S3 uses a conditional put
/// (If-Match); local compares content then replaces via temp + rename —
/// the same single-machine semantics the callers had before this trait,
/// safe under the callers' own lock protocol but not a cross-process
/// barrier by itself.
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>>;
/// Recursively delete every object under `prefix_uri`. Returns Ok(())
/// when nothing exists there (idempotent). Local: `remove_dir_all`;
/// S3: list + delete (NOT atomic — callers must tolerate partial
/// prefixes on crash, which the cluster delete protocol does by retry).
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()>;
}
/// Version token for local files: content identity. ETags are unavailable
/// on the filesystem; sha256 is stable, cheap at these object sizes, and
/// already the cluster ledger's CAS vocabulary.
fn local_version_token(bytes: &[u8]) -> String {
use sha2::{Digest, Sha256};
let digest = Sha256::digest(bytes);
digest.iter().map(|byte| format!("{byte:02x}")).collect()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -149,6 +182,49 @@ impl StorageAdapter for LocalStorageAdapter {
}
Ok(out)
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
let path = local_path_from_uri(uri)?;
let bytes = tokio::fs::read(&path).await?;
let version = local_version_token(&bytes);
let text = String::from_utf8(bytes).map_err(|err| {
OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
})?;
Ok((text, version))
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
let path = local_path_from_uri(uri)?;
let current = match tokio::fs::read(&path).await {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(err.into()),
};
if local_version_token(&current) != expected_version {
return Ok(None);
}
let tmp = path.with_extension(format!("tmp.{}", ulid::Ulid::new()));
tokio::fs::write(&tmp, contents.as_bytes()).await?;
if let Err(err) = tokio::fs::rename(&tmp, &path).await {
let _ = tokio::fs::remove_file(&tmp).await;
return Err(err.into());
}
Ok(Some(local_version_token(contents.as_bytes())))
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
let path = local_path_from_uri(prefix_uri)?;
match tokio::fs::remove_dir_all(&path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err.into()),
}
}
}
#[async_trait]
@ -276,6 +352,84 @@ impl StorageAdapter for S3StorageAdapter {
}
Ok(out)
}
async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
let location = self.object_path(uri)?;
let result = self
.store
.get(&location)
.await
.map_err(|err| storage_backend_error("read", uri, err))?;
let etag = result.meta.e_tag.clone();
let bytes = result
.bytes()
.await
.map_err(|err| storage_backend_error("read", uri, err))?;
// Every S3-compatible store we target returns ETags; fall back to a
// content token rather than failing if one ever omits it.
let version = etag.unwrap_or_else(|| local_version_token(&bytes));
let text = String::from_utf8(bytes.to_vec()).map_err(|err| {
OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
})?;
Ok((text, version))
}
async fn write_text_if_match(
&self,
uri: &str,
contents: &str,
expected_version: &str,
) -> Result<Option<String>> {
let location = self.object_path(uri)?;
let mode = PutMode::Update(object_store::UpdateVersion {
e_tag: Some(expected_version.to_string()),
version: None,
});
match self
.store
.put_opts(
&location,
PutPayload::from(contents.as_bytes().to_vec()),
mode.into(),
)
.await
{
Ok(result) => Ok(Some(
result
.e_tag
.unwrap_or_else(|| local_version_token(contents.as_bytes())),
)),
Err(object_store::Error::Precondition { .. })
| Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(err) => Err(storage_backend_error("write_if_match", uri, err)),
}
}
async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
let dir_with_slash = if prefix_uri.ends_with('/') {
prefix_uri.to_string()
} else {
format!("{}/", prefix_uri)
};
let prefix_loc = self.object_path(&dir_with_slash)?;
let mut entries = self.store.list(Some(&prefix_loc));
let mut locations = Vec::new();
while let Some(meta) = entries
.try_next()
.await
.map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))?
{
locations.push(meta.location);
}
for location in locations {
match self.store.delete(&location).await {
Ok(()) => {}
Err(object_store::Error::NotFound { .. }) => {}
Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)),
}
}
Ok(())
}
}
impl S3StorageAdapter {
@ -444,6 +598,55 @@ fn env_var_truthy(key: &str) -> bool {
#[cfg(test)]
mod tests {
#[tokio::test]
async fn local_versioned_cas_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/state.json", dir.path().display());
let adapter = LocalStorageAdapter;
adapter.write_text(&uri, "v1").await.unwrap();
let (text, version) = adapter.read_text_versioned(&uri).await.unwrap();
assert_eq!(text, "v1");
// Matching token replaces and returns the next token.
let next = adapter
.write_text_if_match(&uri, "v2", &version)
.await
.unwrap()
.expect("fresh token must win");
assert_ne!(next, version);
// The stale token must lose (CAS-lost is Ok(None), never silent).
assert!(
adapter
.write_text_if_match(&uri, "v3", &version)
.await
.unwrap()
.is_none()
);
let (text, _) = adapter.read_text_versioned(&uri).await.unwrap();
assert_eq!(text, "v2");
// Missing object: precondition can't hold.
let missing = format!("{}/absent.json", dir.path().display());
assert!(
adapter
.write_text_if_match(&missing, "x", &version)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn local_delete_prefix_is_recursive_and_idempotent() {
let dir = tempfile::tempdir().unwrap();
let root = format!("{}/tree", dir.path().display());
let adapter = LocalStorageAdapter;
adapter.write_text(&format!("{root}/a.txt"), "a").await.unwrap();
adapter.write_text(&format!("{root}/sub/b.txt"), "b").await.unwrap();
adapter.delete_prefix(&root).await.unwrap();
assert!(!adapter.exists(&format!("{root}/a.txt")).await.unwrap());
adapter.delete_prefix(&root).await.unwrap(); // absent -> Ok
}
use super::*;
#[test]

View file

@ -167,3 +167,80 @@ async fn s3_public_load_uses_hidden_run_and_publishes() {
.to_rust_json();
assert_eq!(loaded[0]["p.name"], "Loaded-Over-S3");
}
/// The conditional-write contract the cluster ledger depends on (RFC-006):
/// versioned read -> If-Match replace -> stale token refused. Pins the
/// S3-compatible backend's behavior (RustFS in CI) — turns red if a backend
/// bump regresses conditional puts.
#[tokio::test(flavor = "multi_thread")]
async fn s3_adapter_conditional_writes_contract() {
let Some(uri) = s3_test_graph_uri("adapter-cas") else {
eprintln!("skipping s3 adapter cas test: OMNIGRAPH_S3_TEST_BUCKET is not set");
return;
};
use omnigraph::storage::storage_for_uri;
let adapter = storage_for_uri(&uri).unwrap();
let object = format!("{uri}/cas-probe.json");
assert!(adapter.write_text_if_absent(&object, "v1").await.unwrap());
assert!(!adapter.write_text_if_absent(&object, "v1b").await.unwrap());
let (text, version) = adapter.read_text_versioned(&object).await.unwrap();
assert_eq!(text, "v1");
let next = adapter
.write_text_if_match(&object, "v2", &version)
.await
.unwrap()
.expect("fresh etag must win");
assert!(
adapter
.write_text_if_match(&object, "v3", &version)
.await
.unwrap()
.is_none(),
"stale etag must be refused"
);
let again = adapter
.write_text_if_match(&object, "v3", &next)
.await
.unwrap();
assert!(again.is_some());
// Prefix delete: recursive + idempotent.
adapter
.write_text(&format!("{uri}/tree/a.json"), "a")
.await
.unwrap();
adapter
.write_text(&format!("{uri}/tree/sub/b.json"), "b")
.await
.unwrap();
adapter.delete_prefix(&format!("{uri}/tree")).await.unwrap();
assert!(!adapter.exists(&format!("{uri}/tree/a.json")).await.unwrap());
adapter.delete_prefix(&format!("{uri}/tree")).await.unwrap();
adapter.delete(&object).await.unwrap();
}
/// Schema apply against an S3 graph — the cluster's schema executor will
/// lean on this; previously untested upstream on object storage.
#[tokio::test(flavor = "multi_thread")]
async fn s3_schema_apply_migrates_live_graph() {
let Some(uri) = s3_test_graph_uri("schema-apply") else {
eprintln!("skipping s3 schema apply test: OMNIGRAPH_S3_TEST_BUCKET is not set");
return;
};
let mut db = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
let desired = format!("{TEST_SCHEMA}\nnode Note {{\n title: String @key\n}}\n");
let result = db.apply_schema(&desired).await.unwrap();
assert!(result.applied, "{result:?}");
let reopened = Omnigraph::open(&uri).await.unwrap();
assert!(
reopened.schema_source().contains("Note"),
"live S3 schema must carry the migration"
);
}