mr-668: POST /graphs runtime create endpoint (PR 7/10)

PR 7 of the MR-668 multi-graph server work. Operators can now add a
graph to a running multi-graph server without restarting:

  curl -X POST http://server/graphs \
    -H "Content-Type: application/json" \
    -d '{
          "graph_id": "beta",
          "uri": "/data/beta.omni",
          "schema": { "source": "node Person { name: String @key }\n" },
          "policy": { "file": "./policies/beta.yaml" }
        }'

DELETE remains deferred (out of v0.7.0 scope per the trimmed plan —
no `delete_prefix`, no tombstones).

Body shape (decision 7):
  - Nested `schema: { source: "..." }` (mirrors the `policy: { file }`
    pattern; leaves room for future fields without breakage).
  - Optional nested `policy: { file: "..." }` for per-graph Cedar.
  - 32 MiB body limit (reuses `INGEST_REQUEST_BODY_LIMIT_BYTES`).
  - Asymmetric with `SchemaApplyRequest` which keeps flat
    `schema_source: String` — documented in api.rs.

Atomic YAML rewrite + drift detection:
  - New `config::rewrite_atomic(path, new_config, expected_hash)`:
    flock → re-read + hash check → serialize → write `.tmp` → fsync
    → rename → fsync parent dir. Returns the new hash for the caller
    to update its in-memory baseline.
  - New `config::hash_config_file(path)` — SHA-256 of the on-disk
    bytes, used at startup and after each rewrite.
  - New `RewriteAtomicError { Drift | Io | Serialize }` enum.
  - `AppState.config_hash: Option<Arc<Mutex<[u8;32]>>>` carries the
    in-memory baseline. Updated after every successful rewrite so
    subsequent POSTs don't false-trigger drift.
  - The mutex is `std::sync::Mutex` (brief critical section, no .await
    inside). The flock itself serializes file access process-wide
    AND across multiple server instances (defense in depth).
  - All sync I/O runs inside `tokio::task::spawn_blocking` — flock
    is sync.

Handler ordering (the load-bearing sequence):
  1. Mode check: 405 in single mode.
  2. Cedar authorize: `GraphCreate` against `Omnigraph::Server::"root"`.
  3. Validate body: `GraphId::try_from` (regex + reserved-name), empty
     schema/uri checks, per-graph policy file parse.
  4. Pre-check registry for duplicate graph_id / duplicate uri (409).
  5. `Omnigraph::init` the new engine.
  6. Atomic YAML rewrite (drift detection inside).
  7. Publish in registry (atomic re-check via `GraphRegistry::insert`).

Failure modes (documented in handler rustdoc):
  - Init fails → orphan storage at `req.uri` (PR 2a cleans up schema
    files; Lance datasets remain orphans until `delete_prefix` lands).
  - YAML rewrite fails (drift, IO) → orphan storage; YAML unchanged.
  - Registry insert fails (race) → YAML has entry but registry doesn't;
    next restart opens it cleanly.

New dependency: `fs2 = "0.4"` (workspace + omnigraph-server). POSIX-only
file locking. Linux/macOS deployment supported; Windows out of scope.

Tests (10 new in `tests/server.rs::multi_graph_startup`):
  - `post_graphs_creates_a_new_graph_end_to_end` — happy path, includes
    YAML inspection to confirm the rewrite landed.
  - `post_graphs_baseline_hash_updates_between_rewrites` — two POSTs in
    a row both succeed (drift baseline updates correctly).
  - `post_graphs_duplicate_graph_id_returns_409`
  - `post_graphs_duplicate_uri_returns_409`
  - `post_graphs_invalid_graph_id_returns_400` (reserved name)
  - `post_graphs_empty_schema_source_returns_400`
  - `post_graphs_returns_405_in_single_mode`
  - `post_graphs_yaml_drift_detection_returns_503` — operator hand-edits
    omnigraph.yaml; server refuses to clobber.
  - `hash_config_file_is_deterministic_and_detects_changes`
  - `rewrite_atomic_refuses_when_hash_drifts`

OpenAPI: `server_graphs_create` registered in `ApiDoc::paths(...)`;
openapi.json regenerated.

Result: 225 server tests green (74 lib + 66 openapi + 85 integration),
all MR-731 regressions still pinned.

LOC: ~580 lib.rs net (handler + helpers), ~120 config.rs (rewrite
machinery), +71 api.rs (request/response shapes), +332 tests/server.rs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-25 20:38:58 +02:00
parent 94b6346bdd
commit a4e6cb689a
No known key found for this signature in database
9 changed files with 1030 additions and 5 deletions

View file

@ -1008,7 +1008,7 @@ async fn app_for_multi_mode(graph_ids: &[&str]) -> (Vec<tempfile::TempDir>, Rout
dirs.push(dir);
}
let workload = omnigraph_server::workload::WorkloadController::from_env();
let state = AppState::new_multi(handles, Vec::new(), None, workload, None).unwrap();
let state = AppState::new_multi(handles, Vec::new(), None, workload, None, None).unwrap();
let app = build_app(state);
(dirs, app)
}

View file

@ -4360,7 +4360,7 @@ mod multi_graph_startup {
dirs.push(dir);
}
let workload = omnigraph_server::workload::WorkloadController::from_env();
let state = AppState::new_multi(handles, Vec::new(), None, workload, None).unwrap();
let state = AppState::new_multi(handles, Vec::new(), None, workload, None, None).unwrap();
let app = build_app(state);
(dirs, app)
}
@ -4741,7 +4741,7 @@ graphs:
let tokens = vec![("act-andrew".to_string(), "secret-token".to_string())];
let workload = omnigraph_server::workload::WorkloadController::from_env();
let state =
AppState::new_multi(vec![handle], tokens, None, workload, None).unwrap();
AppState::new_multi(vec![handle], tokens, None, workload, None, None).unwrap();
let app = build_app(state);
// No Authorization header → 401.
@ -4822,6 +4822,7 @@ rules:
Some(server_policy),
workload,
None,
None,
)
.unwrap();
let app = build_app(state);
@ -4864,6 +4865,333 @@ rules:
);
}
// ─── PR 7 — POST /graphs ──────────────────────────────────────────
use omnigraph_server::api::{GraphCreateRequest, GraphCreateResponse, GraphPolicySpec, GraphSchemaSpec};
use omnigraph_server::config::{OmnigraphConfig, hash_config_file};
/// Spin up a multi-mode server whose `omnigraph.yaml` we control,
/// so PR 7's `POST /graphs` can rewrite it. Returns the config
/// directory (to live across the test) and a built `Router`.
async fn multi_mode_app_with_real_config(
initial_graphs: &[&str],
) -> (tempfile::TempDir, Router) {
let cfg_dir = tempfile::tempdir().unwrap();
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
// Init each starting graph at a real URI inside the config dir.
let mut yaml_graphs = String::new();
let mut handles = Vec::new();
for id in initial_graphs {
let graph_uri = cfg_dir.path().join(format!("{id}.omni"));
Omnigraph::init(graph_uri.to_str().unwrap(), &schema)
.await
.unwrap();
yaml_graphs.push_str(&format!(
" {id}:\n uri: {}\n",
graph_uri.display()
));
// Open in-memory engine for the handle.
let engine = Omnigraph::open(graph_uri.to_str().unwrap())
.await
.unwrap();
handles.push(Arc::new(
omnigraph_server::GraphHandle {
key: omnigraph_server::GraphKey::cluster(
omnigraph_server::GraphId::try_from(*id).unwrap(),
),
uri: graph_uri.to_string_lossy().to_string(),
engine: Arc::new(engine),
policy: None,
},
));
}
let config_path = cfg_dir.path().join("omnigraph.yaml");
fs::write(&config_path, format!("graphs:\n{yaml_graphs}")).unwrap();
let config_hash = hash_config_file(&config_path).unwrap();
let workload = omnigraph_server::workload::WorkloadController::from_env();
let state = AppState::new_multi(
handles,
Vec::new(),
None,
workload,
Some(config_path.clone()),
Some(config_hash),
)
.unwrap();
let app = build_app(state);
(cfg_dir, app)
}
async fn post_graph(
app: &Router,
body: &GraphCreateRequest,
auth: Option<&str>,
) -> (StatusCode, Value) {
let json_body = serde_json::to_vec(body).unwrap();
let mut request = Request::builder()
.method(Method::POST)
.uri("/graphs")
.header("content-type", "application/json");
if let Some(token) = auth {
request = request.header("authorization", format!("Bearer {token}"));
}
let req = request.body(Body::from(json_body)).unwrap();
let response = app.clone().oneshot(req).await.unwrap();
let status = response.status();
let body_bytes = to_bytes(response.into_body(), usize::MAX).await.unwrap();
let body_json: Value = if body_bytes.is_empty() {
Value::Null
} else {
serde_json::from_slice(&body_bytes).unwrap_or(Value::Null)
};
(status, body_json)
}
/// Happy path: POST creates a new graph, returns 201, the graph is
/// queryable via cluster routes, and omnigraph.yaml now includes it.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_creates_a_new_graph_end_to_end() {
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
let new_uri = cfg_dir.path().join("beta.omni");
let req = GraphCreateRequest {
graph_id: "beta".to_string(),
uri: new_uri.to_string_lossy().to_string(),
schema: GraphSchemaSpec { source: schema },
policy: None,
};
let (status, body) = post_graph(&app, &req, None).await;
assert_eq!(status, StatusCode::CREATED, "got body: {body}");
let resp: GraphCreateResponse = serde_json::from_value(body).unwrap();
assert_eq!(resp.graph_id, "beta");
// The new graph is reachable via its cluster route.
let snap = app
.clone()
.oneshot(
Request::builder()
.method(Method::GET)
.uri("/graphs/beta/snapshot?branch=main")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(snap.status(), StatusCode::OK);
// The YAML on disk now references the new graph.
let yaml = fs::read_to_string(cfg_dir.path().join("omnigraph.yaml")).unwrap();
assert!(
yaml.contains("beta:"),
"rewritten YAML must include 'beta:'; got:\n{yaml}"
);
assert!(
yaml.contains(new_uri.to_str().unwrap()),
"rewritten YAML must include the new URI; got:\n{yaml}"
);
}
/// Two POSTs in sequence both succeed: the second one's drift
/// check passes because the first POST updates the in-memory
/// baseline hash to the post-rewrite hash.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_baseline_hash_updates_between_rewrites() {
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
for name in ["beta", "gamma"] {
let new_uri = cfg_dir.path().join(format!("{name}.omni"));
let req = GraphCreateRequest {
graph_id: name.to_string(),
uri: new_uri.to_string_lossy().to_string(),
schema: GraphSchemaSpec {
source: schema.clone(),
},
policy: None,
};
let (status, body) = post_graph(&app, &req, None).await;
assert_eq!(status, StatusCode::CREATED, "create {name}: {body}");
}
let yaml = fs::read_to_string(cfg_dir.path().join("omnigraph.yaml")).unwrap();
assert!(yaml.contains("beta:"));
assert!(yaml.contains("gamma:"));
}
/// Duplicate `graph_id` returns 409.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_duplicate_graph_id_returns_409() {
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
let req = GraphCreateRequest {
graph_id: "alpha".to_string(), // already registered
uri: cfg_dir
.path()
.join("alpha-duplicate.omni")
.to_string_lossy()
.to_string(),
schema: GraphSchemaSpec { source: schema },
policy: None,
};
let (status, body) = post_graph(&app, &req, None).await;
assert_eq!(status, StatusCode::CONFLICT, "got body: {body}");
}
/// Duplicate `uri` returns 409.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_duplicate_uri_returns_409() {
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
let alpha_uri = cfg_dir.path().join("alpha.omni");
let req = GraphCreateRequest {
graph_id: "beta".to_string(),
uri: alpha_uri.to_string_lossy().to_string(), // already in use
schema: GraphSchemaSpec { source: schema },
policy: None,
};
let (status, _) = post_graph(&app, &req, None).await;
assert_eq!(status, StatusCode::CONFLICT);
}
/// Invalid `graph_id` (reserved name) returns 400.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_invalid_graph_id_returns_400() {
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
let req = GraphCreateRequest {
graph_id: "policies".to_string(), // reserved
uri: cfg_dir
.path()
.join("policies.omni")
.to_string_lossy()
.to_string(),
schema: GraphSchemaSpec { source: schema },
policy: None,
};
let (status, _) = post_graph(&app, &req, None).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
/// Empty schema source returns 400 with a clear message.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_empty_schema_source_returns_400() {
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
let req = GraphCreateRequest {
graph_id: "beta".to_string(),
uri: cfg_dir
.path()
.join("beta.omni")
.to_string_lossy()
.to_string(),
schema: GraphSchemaSpec {
source: " \n ".to_string(),
},
policy: None,
};
let (status, body) = post_graph(&app, &req, None).await;
assert_eq!(status, StatusCode::BAD_REQUEST);
assert!(
body.to_string().contains("schema.source"),
"expected schema.source rejection in body: {body}"
);
}
/// Single mode rejects `POST /graphs` with 405.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_returns_405_in_single_mode() {
let temp = init_loaded_graph().await;
let graph = graph_path(temp.path());
let state = AppState::open(graph.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
let req = GraphCreateRequest {
graph_id: "beta".to_string(),
uri: "/tmp/beta.omni".to_string(),
schema: GraphSchemaSpec {
source: "node Person { name: String @key }\n".to_string(),
},
policy: None,
};
let (status, _) = post_graph(&app, &req, None).await;
assert_eq!(status, StatusCode::METHOD_NOT_ALLOWED);
}
/// YAML drift detection: operator hand-edits the config file
/// between server start and the POST → 503 Service Unavailable.
#[tokio::test(flavor = "multi_thread")]
async fn post_graphs_yaml_drift_detection_returns_503() {
let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await;
// Simulate an operator editing the file out from under the
// running server. This changes the on-disk hash; the server's
// in-memory baseline (computed at startup) no longer matches.
let config_path = cfg_dir.path().join("omnigraph.yaml");
let mut yaml = fs::read_to_string(&config_path).unwrap();
yaml.push_str("\n# operator added a comment after server start\n");
fs::write(&config_path, yaml).unwrap();
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
let req = GraphCreateRequest {
graph_id: "beta".to_string(),
uri: cfg_dir
.path()
.join("beta.omni")
.to_string_lossy()
.to_string(),
schema: GraphSchemaSpec { source: schema },
policy: None,
};
let (status, body) = post_graph(&app, &req, None).await;
assert_eq!(
status,
StatusCode::SERVICE_UNAVAILABLE,
"expected drift detection, got: {body}"
);
assert!(
body.to_string().contains("drift"),
"expected drift message, got: {body}"
);
}
/// hash_config_file is deterministic and detects byte-level changes.
#[test]
fn hash_config_file_is_deterministic_and_detects_changes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cfg.yaml");
fs::write(&path, "graphs:\n alpha:\n uri: /tmp/a.omni\n").unwrap();
let h1 = hash_config_file(&path).unwrap();
let h2 = hash_config_file(&path).unwrap();
assert_eq!(h1, h2, "hash must be deterministic");
fs::write(&path, "graphs:\n alpha:\n uri: /tmp/b.omni\n").unwrap();
let h3 = hash_config_file(&path).unwrap();
assert_ne!(h1, h3, "hash must change when content changes");
}
/// rewrite_atomic refuses to rewrite when the baseline doesn't match.
#[test]
fn rewrite_atomic_refuses_when_hash_drifts() {
use omnigraph_server::config::{RewriteAtomicError, rewrite_atomic};
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cfg.yaml");
fs::write(&path, "graphs:\n alpha:\n uri: /tmp/a.omni\n").unwrap();
// Pass an obviously-wrong baseline hash.
let wrong_hash = [0u8; 32];
let mut new_config = OmnigraphConfig::default();
new_config.graphs.insert(
"beta".to_string(),
omnigraph_server::config::TargetConfig {
uri: "/tmp/b.omni".to_string(),
bearer_token_env: None,
policy: Default::default(),
},
);
let err = rewrite_atomic(&path, &new_config, &wrong_hash).unwrap_err();
assert!(
matches!(err, RewriteAtomicError::Drift),
"expected Drift, got: {err}"
);
}
/// End-to-end: load an `omnigraph.yaml` with two graphs and serve
/// them. Both graphs must be queryable via cluster routes.
///