test(cluster,server): gated object-storage cluster e2e + CI wiring + docs

s3_cluster.rs runs the full control-plane lifecycle against a real
bucket (CI: containerized RustFS; locally the RustFS binary): import →
lock released (pins the drop-time release regression caught on the first
live smoke) → apply (graph roots + catalog on the bucket, nothing local)
→ serving snapshots from both the config dir and the bare URI → schema
evolution → approved delete (prefix removal) → empty-cluster refusal.
The server suite gains the config-free boot test: --cluster s3://… with
zero local files serves a stored query over HTTP.

CI: the rustfs job runs both suites; the classify filter covers the
cluster store/serve modules and the new test files. The server smoke
drops its name filter — every test in the s3 target is bucket-gated, and
a filter matching nothing passes vacuously (which silently ran zero
tests for a while).

Docs: deployment.md gains the Bucket-no-volume shape as the preferred
cloud deployment; cluster.md/server.md document --cluster <uri>;
testing.md maps the new suite.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-11 15:56:40 +03:00
parent 58855c0a7c
commit 8d7aed065f
7 changed files with 311 additions and 15 deletions

View file

@ -89,7 +89,9 @@ jobs:
crates/omnigraph/src/storage.rs) run_rustfs_ci=true ;;
crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;;
crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;;
crates/omnigraph-server/tests/server.rs) run_rustfs_ci=true ;;
crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;;
crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;;
crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;;
crates/omnigraph-cli/tests/system_local.rs) run_rustfs_ci=true ;;
esac
done
@ -351,10 +353,14 @@ jobs:
run: cargo test --locked -p omnigraph-engine --test s3_storage -- --nocapture
- name: Run RustFS server smoke
# The exact test name (not a loose substring): a filter that matches
# nothing passes vacuously, which silently ran zero tests here for a
# while (the old filter said s3_repo; the test says s3_graph).
run: cargo test --locked -p omnigraph-server --test s3 server_opens_s3_graph_directly_and_serves_snapshot_and_read -- --nocapture
# No name filter: every test in the s3 target is bucket-gated, and a
# filter matching nothing passes vacuously (which silently ran zero
# tests here for a while — the old filter said s3_repo, the test
# said s3_graph).
run: cargo test --locked -p omnigraph-server --test s3 -- --nocapture
- name: Run RustFS cluster e2e
run: cargo test --locked -p omnigraph-cluster --test s3_cluster -- --nocapture
- name: Run RustFS CLI smoke
run: cargo test --locked -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow -- --nocapture

View file

@ -0,0 +1,162 @@
//! Cluster-on-object-storage end-to-end (RFC-006): the full control-plane
//! lifecycle with `storage: s3://…` — import, apply (graph roots + catalog
//! on the bucket), serving snapshots from both the config dir and the bare
//! storage URI, schema evolution, and the approved delete (prefix removal).
//!
//! Gated like every S3 suite: skips unless `OMNIGRAPH_S3_TEST_BUCKET` is
//! set (CI runs it against containerized RustFS; locally use the RustFS
//! binary + `AWS_*` env, see docs/dev/testing.md).
//!
//! Runtime flavor is multi_thread on purpose: the state-lock guard's
//! drop-time release uses block_in_place on object stores, which is the
//! production (CLI) runtime shape — and the lock-release regression this
//! suite pins (a spawned delete dying with a short-lived runtime) only
//! reproduces realistically under it.
use std::env;
use std::fs;
use omnigraph_cluster::{
apply_config_dir, import_config_dir, read_serving_snapshot,
read_serving_snapshot_from_storage, status_config_dir, validate_config_dir,
};
use ulid::Ulid;
const SCHEMA_V1: &str = "node Person {\n name: String @key\n}\n";
const SCHEMA_V2: &str = "node Person {\n name: String @key\n title: String?\n}\n";
const FIND_PERSON_GQ: &str = "query find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n";
const POLICY_YAML: &str = r#"
version: 1
actors:
- id: act-admin
roles: [admin]
rules:
- effect: permit
actions: [read, change, schema_apply, branch_create, branch_delete, branch_merge]
roles: [admin]
"#;
/// Unique per-run storage root under the test bucket, or None to skip.
fn s3_storage_root(suite: &str) -> Option<String> {
let bucket = env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
Some(format!("s3://{bucket}/cluster-e2e/{suite}-{}", Ulid::new()))
}
fn write_cluster_fixture(dir: &std::path::Path, storage_root: &str, schema: &str) {
fs::write(dir.join("people.pg"), schema).unwrap();
fs::create_dir_all(dir.join("queries")).unwrap();
fs::write(dir.join("queries/people.gq"), FIND_PERSON_GQ).unwrap();
fs::write(dir.join("intel.policy.yaml"), POLICY_YAML).unwrap();
fs::write(
dir.join("cluster.yaml"),
format!(
r#"version: 1
storage: {storage_root}
graphs:
knowledge:
schema: people.pg
queries: queries/
policies:
intel:
file: intel.policy.yaml
applies_to: [graph.knowledge]
"#
),
)
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn s3_cluster_full_lifecycle_import_apply_serve_evolve_delete() {
let Some(root) = s3_storage_root("lifecycle") else {
eprintln!("skipping s3 cluster e2e: OMNIGRAPH_S3_TEST_BUCKET is not set");
return;
};
let dir = tempfile::tempdir().unwrap();
write_cluster_fixture(dir.path(), &root, SCHEMA_V1);
// validate is config-only and must pass before any bucket I/O.
let validate = validate_config_dir(dir.path());
assert!(validate.ok, "{:?}", validate.diagnostics);
let import = import_config_dir(dir.path()).await;
assert!(import.ok, "{:?}", import.diagnostics);
// The lock-release regression (caught live on the first smoke): the
// guard's drop must COMPLETE its bucket delete before the command
// returns — a follow-up command finding `state_lock_held` means the
// release was spawned into a dying runtime.
let status = status_config_dir(dir.path()).await;
assert!(status.ok, "{:?}", status.diagnostics);
assert!(
!status.state_observations.locked,
"import leaked the state lock on the bucket: {:?}",
status.state_observations
);
let apply = apply_config_dir(dir.path()).await;
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
// Nothing stored locally: the config dir holds only declared sources.
assert!(!dir.path().join("__cluster").exists());
assert!(!dir.path().join("graphs").exists());
// Serving snapshot resolves through cluster.yaml's storage: key…
let via_config = read_serving_snapshot(dir.path()).await.unwrap();
assert_eq!(via_config.graphs.len(), 1);
let graph_root = via_config.graphs[0].root.to_string_lossy().to_string();
assert!(
graph_root.starts_with("s3://") && graph_root.ends_with("graphs/knowledge.omni"),
"{graph_root}"
);
assert_eq!(via_config.queries.len(), 1);
assert_eq!(via_config.policies.len(), 1);
assert!(
via_config.policies[0].source.contains("act-admin"),
"policy must carry verified content, not a path"
);
// …and config-free, straight from the bucket URI (the deployment
// payoff: a server needs only the URI and credentials).
let via_uri = read_serving_snapshot_from_storage(&root).await.unwrap();
assert_eq!(via_uri.graphs.len(), 1);
assert_eq!(
via_uri.graphs[0].root.to_string_lossy(),
via_config.graphs[0].root.to_string_lossy()
);
assert_eq!(via_uri.policies.len(), 1);
// Schema evolution converges on the bucket.
write_cluster_fixture(dir.path(), &root, SCHEMA_V2);
let evolve = apply_config_dir(dir.path()).await;
assert!(evolve.ok && evolve.converged, "{:?}", evolve.diagnostics);
// Approved delete: drop the graph from the config; the plan demands an
// approval, the approved apply prefix-deletes the bucket root.
fs::write(
dir.path().join("cluster.yaml"),
format!("version: 1\nstorage: {root}\ngraphs: {{}}\n"),
)
.unwrap();
let plan = omnigraph_cluster::plan_config_dir(dir.path()).await;
assert!(plan.ok, "{:?}", plan.diagnostics);
let approval = plan
.approvals_required
.first()
.expect("graph delete requires approval");
let approve = omnigraph_cluster::approve_config_dir(
dir.path(),
&approval.resource,
"e2e-operator",
)
.await;
assert!(approve.ok, "{:?}", approve.diagnostics);
let delete = apply_config_dir(dir.path()).await;
assert!(delete.ok && delete.converged, "{:?}", delete.diagnostics);
let after = read_serving_snapshot_from_storage(&root).await;
assert!(
after.is_err(),
"an empty cluster must refuse to serve, got {after:?}"
);
}

View file

@ -75,3 +75,105 @@ async fn server_opens_s3_graph_directly_and_serves_snapshot_and_read() {
assert_eq!(read_body["row_count"], 1);
assert_eq!(read_body["rows"][0]["p.name"], "Alice");
}
/// Config-free cluster serving (RFC-006): boot `--cluster s3://bucket/prefix`
/// with NO local files at all — the ledger and catalog on the bucket are the
/// whole deployment artifact. The fixture cluster is applied from a temp
/// config dir, which is then dropped before the server boots from the URI.
#[tokio::test(flavor = "multi_thread")]
async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() {
let Some(bucket) = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok() else {
eprintln!("skipping s3 cluster-serving test: OMNIGRAPH_S3_TEST_BUCKET is not set");
return;
};
let unique = format!(
"{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let root = format!("s3://{bucket}/cluster-serve/{unique}");
// Apply a one-graph cluster onto the bucket, seed it, then DROP the
// config dir — the boot below must need nothing local.
{
let dir = tempfile::tempdir().unwrap();
fs::write(
dir.path().join("people.pg"),
"node Person {\n name: String @key\n}\n",
)
.unwrap();
fs::write(
dir.path().join("people.gq"),
"query find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
fs::write(
dir.path().join("cluster.yaml"),
format!(
"version: 1\nstorage: {root}\ngraphs:\n knowledge:\n schema: people.pg\n queries:\n find_person:\n file: people.gq\n"
),
)
.unwrap();
let import = omnigraph_cluster::import_config_dir(dir.path()).await;
assert!(import.ok, "{:?}", import.diagnostics);
let apply = omnigraph_cluster::apply_config_dir(dir.path()).await;
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
let graph_uri = format!("{root}/graphs/knowledge.omni");
let mut db = Omnigraph::open(&graph_uri).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n",
LoadMode::Overwrite,
)
.await
.unwrap();
}
let settings = omnigraph_server::load_server_settings(
None,
Some(&std::path::PathBuf::from(&root)),
None,
None,
None,
true,
)
.await
.unwrap();
let omnigraph_server::ServerConfigMode::Multi {
graphs,
config_path,
server_policy,
} = settings.mode
else {
panic!("cluster boot must select multi-graph routing");
};
let state = omnigraph_server::open_multi_graph_state(
graphs,
Vec::new(),
server_policy.as_ref(),
config_path,
)
.await
.unwrap();
let app = build_app(state);
let response = tower::ServiceExt::oneshot(
app,
Request::builder()
.method(Method::POST)
.uri("/graphs/knowledge/queries/find_person")
.header("content-type", "application/json")
.body(Body::from(json!({"params": {"name": "Ada"}}).to_string()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let bytes = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let value: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(value["rows"][0]["p.name"], "Ada", "{value}");
}

View file

@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs` (incl. the full-cycle cluster lifecycle with a spawned `--cluster` server — declare→serve→evolve→drift-heal→approved-delete — and applied-policy enforcement over HTTP), `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated); `tests/s3_cluster.rs` (bucket-gated full lifecycle on object storage) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level; incl. cluster-mode boot — converged-dir serving, policy binding wiring, boot refusals), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |
@ -64,7 +64,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
CI runs three S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml``rustfs_integration` job):
- `cargo test -p omnigraph-engine --test s3_storage`
- `cargo test -p omnigraph-server --test server server_opens_s3_graph_directly_and_serves_snapshot_and_read`
- `cargo test -p omnigraph-server --test s3` (single-graph serving + config-free `--cluster s3://` boot)
- `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket)
- `cargo test -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow`
Locally, set `OMNIGRAPH_S3_TEST_BUCKET` (and the usual `AWS_*` vars including `AWS_ENDPOINT_URL_S3` for non-AWS) before running. Without those, S3 tests skip gracefully.

View file

@ -84,6 +84,12 @@ OMNIGRAPH_SERVER_BEARER_TOKENS_JSON='{"act-reader":"s3cret"}' \
omnigraph-server --cluster company-brain --bind 0.0.0.0:8080
```
`--cluster` accepts either a **config directory** (the storage root resolves
through `cluster.yaml`'s `storage:` key) or a **storage-root URI directly**
(`--cluster s3://bucket/prefix`) — config-free serving: a serving box needs
only the URI and credentials, no checkout of the config repo. The ledger and
catalog on the bucket are the deployment artifact.
`--cluster` is an **exclusive boot source**: it cannot be combined with a
graph URI, `--target`, or `--config`, and `omnigraph.yaml` is never read in
this mode. Routing is always multi-graph:

View file

@ -47,10 +47,31 @@ omnigraph-server s3://my-bucket/graphs/example/releases/2026-04-10-v0.1.0 \
## Cluster Mode in Containers (AWS, Railway)
A cluster-booted deployment serves a **cluster directory** (config + state
ledger + content-addressed catalog + graph data) from a mounted volume — the
one structural difference from the stateless S3 single-graph shape, which
needs no volume at all. The container contract:
A cluster-booted deployment has **two shapes** since the `storage:` root
(RFC-006):
- **Bucket, no volume (preferred for cloud)** — the cluster's ledger,
catalog, and graph data live under an object-storage root
(`storage: s3://bucket/prefix` in `cluster.yaml`). The server boots
**config-free** from the bare URI; the container needs no volume at all:
```bash
docker run -d \
-e OMNIGRAPH_CLUSTER=s3://my-bucket/clusters/company-brain \
-e AWS_ACCESS_KEY_ID=... -e AWS_SECRET_ACCESS_KEY=... \
-e OMNIGRAPH_SERVER_BEARER_TOKEN=... \
-p 8080:8080 <image>
```
Day-2 runs from any operator checkout of the config repo:
`omnigraph cluster apply --config ./company-brain` (the `storage:` key
routes every stored byte to the bucket), then restart the service. The
state lock is genuinely cross-machine on object storage, so CI and
operator shells contend safely.
- **Volume (file-rooted)** — the original shape: the whole cluster
directory on a mounted volume. Still fully supported; the container
contract:
```bash
docker run -d \
@ -102,8 +123,6 @@ above).
### Constraints (current honest list)
- **Cluster directories are local-filesystem** — the volume is mandatory;
S3-hosted cluster dirs are not supported.
- **No hot reload** — applied changes serve on the next restart.
- **Single-writer apply** — run `cluster apply` from one place at a time
(the state lock enforces this; CI or one operator shell, not both).

View file

@ -16,7 +16,7 @@ Axum 0.8 + tokio + utoipa-generated OpenAPI. **Two modes** (v0.6.0+): single-gra
### Cluster-booted multi mode (Phase 5)
`omnigraph-server --cluster <dir>` boots from the cluster catalog's **applied
`omnigraph-server --cluster <dir-or-uri>` boots from the cluster catalog's **applied
revision** (`state.json` + content-addressed blobs) instead of
`omnigraph.yaml` — an exclusive boot source: combining it with `<URI>`,
`--target`, or `--config` is a startup error, and `omnigraph.yaml` is never
@ -27,7 +27,7 @@ for what is read and the fail-fast readiness rules. `--bind`,
Mode inference:
0. CLI `--cluster <dir>` → **multi, cluster-booted** (exclusive)
0. CLI `--cluster <dir | s3://…>` → **multi, cluster-booted** (exclusive; a scheme-qualified argument reads the ledger straight from the storage root, no local config)
1. CLI positional `<URI>` → single
2. CLI `--target <name>` → single
3. `server.graph` in config → single