diff --git a/Cargo.lock b/Cargo.lock index a3d6d62..53d7709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2570,6 +2570,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -4650,6 +4660,7 @@ dependencies = [ "clap", "color-eyre", "dashmap", + "fs2", "futures", "lance", "lance-index", @@ -7093,6 +7104,22 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + [[package]] name = "winapi-util" version = "0.1.11" @@ -7102,6 +7129,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.62.2" diff --git a/Cargo.toml b/Cargo.toml index 66bfc01..ec60b5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ url = "2" cedar-policy = "4.9" sha2 = "0.10" subtle = "2" +fs2 = "0.4" [profile.dev] debug = 0 diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index e9a0e46..5ea2524 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -39,6 +39,7 @@ subtle = { workspace = true } async-trait = { workspace = true } arc-swap = { workspace = true } dashmap = "6" +fs2 = { workspace = true } regex = { workspace = true } thiserror = { workspace = true } aws-config = { version = "1", optional = true, default-features = false, features = ["rustls", "rt-tokio", "credentials-process", "sso"] } diff --git a/crates/omnigraph-server/src/api.rs b/crates/omnigraph-server/src/api.rs index 5a10152..52c0a33 100644 --- a/crates/omnigraph-server/src/api.rs +++ b/crates/omnigraph-server/src/api.rs @@ -487,3 +487,74 @@ pub struct GraphInfo { pub struct GraphListResponse { pub graphs: Vec, } + +// ─── MR-668 PR 7 — POST /graphs request/response ─────────────────────────── + +/// Schema specification for a new graph in `POST /graphs`. Nested +/// per MR-668 decision 7 — leaves room for future fields without +/// breaking the request shape. Mirrors the `policy: { file }` nesting +/// pattern. +/// +/// Today only `source` (inline `.pg` text) is supported. Future fields +/// might include `schema.allow_data_loss`, `schema.version`, etc. +/// +/// **Asymmetric with `SchemaApplyRequest`**: `POST /schema/apply` still +/// uses a flat `schema_source: String` for backwards compatibility. +/// A follow-up release may migrate that too. +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct GraphSchemaSpec { + /// Inline `.pg` schema source. + #[schema(example = "node Person {\n name: String @key\n age: I32?\n}")] + pub source: String, +} + +/// Per-graph policy specification in `POST /graphs`. Mirrors the +/// `policy: { file }` shape in `omnigraph.yaml`'s `graphs..policy` +/// section. +#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)] +pub struct GraphPolicySpec { + /// Path to the per-graph Cedar policy file, server-side. + /// Must be readable by the server process at request time. + /// Path is relative to the server's working directory (NOT to the + /// `omnigraph.yaml`'s `base_dir`) — caller-supplied paths are + /// trusted as-is. + pub file: Option, +} + +/// Request body for `POST /graphs` (MR-668 PR 7). +/// +/// Body shape: +/// ```json +/// { +/// "graph_id": "alpha", +/// "uri": "/path/to/alpha.omni", +/// "schema": { "source": "" }, +/// "policy": { "file": "./policies/alpha.yaml" } +/// } +/// ``` +/// +/// 32 MiB body limit (matches `INGEST_REQUEST_BODY_LIMIT_BYTES`). +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct GraphCreateRequest { + /// New graph's id. Must satisfy `^[a-zA-Z0-9-]{1,64}$`, not start with + /// `_`, and not be a reserved name. See `GraphId::try_from`. + pub graph_id: String, + /// Storage URI (local path or `s3://...`). Must NOT already be in + /// use by another registered graph. Server `Omnigraph::init`s the + /// graph at this URI. + pub uri: String, + /// Inline schema (`{ source }`). Required. + pub schema: GraphSchemaSpec, + /// Per-graph Cedar policy. Optional — `None` means the graph has + /// no per-graph policy enforcement (HTTP auth still applies if + /// configured). + #[serde(default)] + pub policy: Option, +} + +/// Response from `POST /graphs` on success (201 Created). +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct GraphCreateResponse { + pub graph_id: String, + pub uri: String, +} diff --git a/crates/omnigraph-server/src/config.rs b/crates/omnigraph-server/src/config.rs index 8abe3cd..e233657 100644 --- a/crates/omnigraph-server/src/config.rs +++ b/crates/omnigraph-server/src/config.rs @@ -5,7 +5,9 @@ use std::path::{Path, PathBuf}; use clap::ValueEnum; use color_eyre::eyre::{Result, bail}; +use fs2::FileExt; use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; pub const DEFAULT_CONFIG_FILE: &str = "omnigraph.yaml"; #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -371,6 +373,126 @@ fn absolute_base_dir(cwd: &Path, path: &Path) -> Result { .unwrap_or_else(|| cwd.to_path_buf())) } +/// SHA-256 hash of the file at `path`. Used to baseline `omnigraph.yaml` +/// at server startup; later compared inside `rewrite_atomic` to detect +/// operator hand-edits ("YAML drift") that would otherwise be clobbered +/// silently. Read errors propagate so startup fails loudly if the +/// config file disappears between `load_config` and the hashing. +pub fn hash_config_file(path: &Path) -> std::io::Result<[u8; 32]> { + let bytes = fs::read(path)?; + let digest = Sha256::digest(&bytes); + let mut out = [0u8; 32]; + out.copy_from_slice(&digest); + Ok(out) +} + +/// Why `rewrite_atomic` refused to rewrite. +#[derive(Debug, thiserror::Error)] +pub enum RewriteAtomicError { + /// The on-disk file no longer matches the expected hash — an + /// operator hand-edited `omnigraph.yaml` between server start + /// and now. Rewriting would clobber their changes; instead we + /// refuse loudly. Maps to HTTP 503. + #[error( + "omnigraph.yaml drift detected: on-disk file does not match the server's startup baseline. \ + Stop the server, reconcile the edits, then restart." + )] + Drift, + /// IO failure during the rewrite — couldn't acquire flock, couldn't + /// write the staging file, couldn't rename, etc. The on-disk file + /// is unchanged (rename is atomic on POSIX). Maps to HTTP 500. + #[error("{0}")] + Io(#[from] std::io::Error), + /// Failed to serialize the new `OmnigraphConfig` to YAML. Should + /// not happen in practice — `OmnigraphConfig` has no infallible + /// serde paths in the current types. Maps to HTTP 500. + #[error("serialize config: {0}")] + Serialize(#[from] serde_yaml::Error), +} + +/// Atomically rewrite `omnigraph.yaml` under an exclusive `fcntl::flock` +/// with SHA-256 drift detection (MR-668 PR 7). +/// +/// Returns the new file's hash on success — callers update their +/// in-memory baseline to this value before releasing other request +/// handlers. +/// +/// Sequence (everything inside the flock): +/// 1. Acquire `LOCK_EX` on `path`. +/// 2. Re-read on-disk bytes, hash them. +/// 3. If on-disk hash != `expected_hash` → `RewriteAtomicError::Drift`. +/// 4. Serialize `new_config` to YAML. +/// 5. Write to `path.tmp` and `sync_all` it. +/// 6. `rename(path.tmp, path)` (atomic on POSIX). +/// 7. `sync_all` the parent directory for crash-durability. +/// 8. Release flock (RAII drop on the File). +/// +/// Sync I/O throughout — callers wrap in `tokio::task::spawn_blocking` +/// so the async runtime doesn't stall. +/// +/// **Comments are stripped.** `serde_yaml::to_string` produces canonical +/// YAML without preserving the operator's comments. Decision Q20 in the +/// MR-668 plan accepts this tradeoff for v0.7.0; a future split-file +/// design (`omnigraph.yaml` operator-owned + `omnigraph.runtime.yaml` +/// server-owned) is the escalation path if operators push back. +pub fn rewrite_atomic( + path: &Path, + new_config: &OmnigraphConfig, + expected_hash: &[u8; 32], +) -> std::result::Result<[u8; 32], RewriteAtomicError> { + // 1. flock. Open RW so flock works; we re-read via fs::read below. + let lock_file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(path)?; + lock_file.lock_exclusive()?; + // RAII unlock via `_lock_guard` — the file dropping releases the flock. + let _lock_guard = lock_file; + + // 2. Re-read + hash. + let current_bytes = fs::read(path)?; + let mut current_hash = [0u8; 32]; + current_hash.copy_from_slice(&Sha256::digest(¤t_bytes)); + + // 3. Drift check. + if current_hash != *expected_hash { + return Err(RewriteAtomicError::Drift); + } + + // 4. Serialize new config. + let serialized = serde_yaml::to_string(new_config)?; + + // 5. Write to .tmp + fsync. + let tmp_path = staging_path(path); + fs::write(&tmp_path, &serialized)?; + let tmp_file = fs::File::open(&tmp_path)?; + tmp_file.sync_all()?; + drop(tmp_file); + + // 6. Atomic rename. + fs::rename(&tmp_path, path)?; + + // 7. fsync parent dir for crash-durability (POSIX rename isn't + // durable until the directory entry is synced). + if let Some(parent) = path.parent() { + let dir = fs::File::open(parent)?; + dir.sync_all()?; + } + + // Compute the new file's hash for the caller to update its baseline. + let mut new_hash = [0u8; 32]; + new_hash.copy_from_slice(&Sha256::digest(serialized.as_bytes())); + Ok(new_hash) +} + +/// Staging path used during `rewrite_atomic`: `.tmp` to avoid +/// colliding with any other workflow that might be reading the file. +fn staging_path(path: &Path) -> PathBuf { + let mut s = path.as_os_str().to_owned(); + s.push(".tmp"); + PathBuf::from(s) +} + #[cfg(test)] mod tests { use std::fs; diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index ed05363..4aa6b4a 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -30,6 +30,7 @@ pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_ use axum::body::{Body, Bytes}; use axum::extract::DefaultBodyLimit; use axum::extract::{Extension, OriginalUri, Path, Query, Request, State}; +use axum::handler::Handler; use axum::http::StatusCode; use axum::http::header::{AUTHORIZATION, CONTENT_TYPE}; use axum::middleware::{self, Next}; @@ -81,6 +82,7 @@ fn hash_bearer_token(token: &str) -> BearerTokenHash { paths( server_health, server_graphs_list, + server_graphs_create, server_snapshot, server_read, server_export, @@ -230,6 +232,16 @@ pub struct AppState { /// server policy is configured. Per-graph policies live on each /// `GraphHandle.policy`. server_policy: Option>, + /// PR 7: SHA-256 hash of `omnigraph.yaml` at server startup, used + /// by `POST /graphs` to detect operator hand-edits between server + /// start and the rewrite. Wrapped in `Arc>` so the POST + /// handler can update the baseline after a successful rewrite + /// (later POSTs will compare against the post-rewrite hash, not + /// the original startup hash). + /// + /// `None` in single mode (no config file is rewritten there). Some + /// in multi mode when the server was started with `--config`. + config_hash: Option>>, } struct ExportStreamWriter { @@ -340,6 +352,7 @@ impl AppState { workload: self.workload, bearer_tokens: self.bearer_tokens, server_policy: self.server_policy, + config_hash: self.config_hash, } } @@ -430,6 +443,7 @@ impl AppState { workload, bearer_tokens, server_policy: None, + config_hash: None, } } @@ -439,13 +453,16 @@ impl AppState { /// /// Caller supplies the already-opened `GraphHandle`s and (optionally) /// the path to the source config file. `server_policy` is loaded - /// from `server.policy.file` if configured. + /// from `server.policy.file` if configured. `config_hash` is the + /// SHA-256 of `omnigraph.yaml` at startup; `POST /graphs` compares + /// the on-disk file against this baseline before rewriting. pub fn new_multi( handles: Vec>, bearer_tokens: Vec<(String, String)>, server_policy: Option, workload: workload::WorkloadController, config_path: Option, + config_hash: Option<[u8; 32]>, ) -> std::result::Result { let bearer_tokens = hash_bearer_tokens(bearer_tokens); let registry = Arc::new(GraphRegistry::from_handles(handles)?); @@ -455,6 +472,7 @@ impl AppState { workload: Arc::new(workload), bearer_tokens, server_policy: server_policy.map(Arc::new), + config_hash: config_hash.map(|h| Arc::new(std::sync::Mutex::new(h))), }) } @@ -918,7 +936,13 @@ pub fn build_app(state: AppState) -> Router { // single mode the handler returns 405 so clients see "resource // exists, wrong context" rather than 404 "no such resource." let management = Router::new() - .route("/graphs", get(server_graphs_list)) + .route( + "/graphs", + get(server_graphs_list).post( + server_graphs_create + .layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)), + ), + ) .route_layer(middleware::from_fn_with_state( state.clone(), require_bearer_auth, @@ -1053,6 +1077,11 @@ async fn open_multi_graph_state( .into_iter() .collect::>>()?; + // PR 7: SHA-256 the config file so `POST /graphs` can detect + // operator hand-edits later. + let config_hash = config::hash_config_file(&config_path) + .map_err(|err| color_eyre::eyre::eyre!("hash omnigraph.yaml: {err}"))?; + let workload = workload::WorkloadController::from_env(); let state = AppState::new_multi( handles, @@ -1060,6 +1089,7 @@ async fn open_multi_graph_state( server_policy, workload, Some(config_path), + Some(config_hash), ) .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?; Ok(state) @@ -1196,6 +1226,253 @@ async fn server_graphs_list( Ok(Json(GraphListResponse { graphs })) } +#[utoipa::path( + post, + path = "/graphs", + tag = "management", + operation_id = "createGraph", + request_body = api::GraphCreateRequest, + responses( + (status = 201, description = "Graph created", body = api::GraphCreateResponse), + (status = 400, description = "Invalid request body (graph_id, schema, policy file)", body = ErrorOutput), + (status = 401, description = "Unauthorized", body = ErrorOutput), + (status = 403, description = "Forbidden", body = ErrorOutput), + (status = 405, description = "Method not allowed (single-graph mode)", body = ErrorOutput), + (status = 409, description = "graph_id or uri already registered", body = ErrorOutput), + (status = 413, description = "Request body too large (>32 MiB)", body = ErrorOutput), + (status = 500, description = "Init failure or YAML rewrite failure", body = ErrorOutput), + (status = 503, description = "omnigraph.yaml drift detected (operator edited the file)", body = ErrorOutput), + ), + security(("bearer_token" = [])), +)] +/// Create a new graph at runtime (MR-668 PR 7). +/// +/// Multi-graph mode only. Operators add a graph to the registry +/// without restarting the server. The server `Omnigraph::init`s the +/// new graph at `req.uri`, atomically rewrites `omnigraph.yaml` to +/// include the new entry, then publishes the handle in the registry. +/// +/// Cedar-gated by `PolicyAction::GraphCreate` against +/// `Omnigraph::Server::"root"` (the same server-level policy as +/// `GET /graphs`). +/// +/// Failure modes: +/// * Init fails → orphan storage files at `req.uri` (PR 2a cleans up +/// schema files but not Lance datasets; operator removes manually). +/// * Rewrite fails (`fs2::flock` IO error) → orphan storage; YAML +/// unchanged. +/// * YAML drift (operator edited the file) → 503; YAML and storage +/// both unchanged. +/// * Duplicate `graph_id` or `uri` → 409; storage already in use. +async fn server_graphs_create( + State(state): State, + actor: Option>, + Json(request): Json, +) -> std::result::Result<(StatusCode, Json), ApiError> { + // ─── 1. Mode check: management endpoints don't apply in single mode. + let (config_path, config_hash) = match state.mode() { + ServerMode::Single { .. } => { + return Err(ApiError { + status: StatusCode::METHOD_NOT_ALLOWED, + code: ErrorCode::BadRequest, + message: "POST /graphs is only available in multi-graph mode".to_string(), + merge_conflicts: Vec::new(), + manifest_conflict: None, + }); + } + ServerMode::Multi { config_path } => match (config_path.clone(), state.config_hash.clone()) { + (Some(path), Some(hash)) => (path, hash), + _ => { + return Err(ApiError::internal( + "multi-mode AppState missing config_path or config_hash".to_string(), + )); + } + }, + }; + + // ─── 2. Cedar authorize. Server-level policy gates this. + authorize_request( + actor.as_ref().map(|Extension(actor)| actor), + state.server_policy.as_deref(), + PolicyRequest { + actor_id: actor + .as_ref() + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) + .unwrap_or_default(), + action: PolicyAction::GraphCreate, + branch: None, + target_branch: None, + }, + )?; + + // ─── 3. Validate request body. + let graph_id = GraphId::try_from(request.graph_id.clone()) + .map_err(|err| ApiError::bad_request(err.to_string()))?; + if request.schema.source.trim().is_empty() { + return Err(ApiError::bad_request( + "schema.source must not be empty".to_string(), + )); + } + if request.uri.trim().is_empty() { + return Err(ApiError::bad_request("uri must not be empty".to_string())); + } + + // Per-graph policy file (optional). Resolved as caller-supplied path. + // Validation: must exist and parse against the Cedar schema. Loading + // here surfaces config errors before we init the graph. + let policy_file_str = request + .policy + .as_ref() + .and_then(|p| p.file.clone()) + .filter(|s| !s.trim().is_empty()); + let policy_engine = if let Some(path) = policy_file_str.as_deref() { + Some( + PolicyEngine::load(std::path::Path::new(path), graph_id.as_str()) + .map_err(|err| ApiError::bad_request(format!("policy.file: {err}")))?, + ) + } else { + None + }; + + // ─── 4. Pre-check duplicates (best-effort — registry.insert is the + // authoritative atomic check, but this returns a clearer error + // when the duplicate is obvious). + let key = GraphKey::cluster(graph_id.clone()); + if matches!(state.registry().get(&key), RegistryLookup::Ready(_)) { + return Err(ApiError::conflict(format!( + "graph '{graph_id}' is already registered" + ))); + } + if state + .registry() + .list() + .iter() + .any(|h| h.uri == request.uri) + { + return Err(ApiError::conflict(format!( + "uri '{}' is already in use by another graph", + request.uri + ))); + } + + // ─── 5. Init the new engine at the requested URI. PR 2a's cleanup + // removes schema files on init failure; Lance directories + // become orphans if `GraphCoordinator::init` partially + // succeeded (documented limitation pending delete_prefix). + let engine = Omnigraph::init(&request.uri, &request.schema.source) + .await + .map_err(|err| ApiError::internal(format!("init: {err}")))?; + + // Apply engine-layer policy enforcement (MR-722). HTTP-layer is the + // first gate; this is the redundant-but-correct backstop. + let (engine, policy_arc): (Omnigraph, Option>) = if let Some(p) = policy_engine + { + let policy_arc: Arc = Arc::new(p); + let checker = Arc::clone(&policy_arc) as Arc; + (engine.with_policy(checker), Some(policy_arc)) + } else { + (engine, None) + }; + + let handle = Arc::new(GraphHandle { + key: key.clone(), + uri: request.uri.clone(), + engine: Arc::new(engine), + policy: policy_arc, + }); + + // ─── 6. Rewrite omnigraph.yaml atomically (drift detection inside). + // Done in a blocking task because `fs2::flock` is sync. + let new_target = config::TargetConfig { + uri: request.uri.clone(), + bearer_token_env: None, + policy: config::PolicySettings { + file: policy_file_str.clone(), + }, + }; + let graph_id_for_yaml = graph_id.as_str().to_string(); + let config_path_for_blocking = config_path.clone(); + let config_hash_for_blocking = Arc::clone(&config_hash); + let rewrite_result = tokio::task::spawn_blocking(move || { + rewrite_yaml_with_new_graph( + &config_path_for_blocking, + &config_hash_for_blocking, + &graph_id_for_yaml, + new_target, + ) + }) + .await + .map_err(|err| ApiError::internal(format!("rewrite join: {err}")))?; + rewrite_result?; + + // ─── 7. Publish in the registry. If this fails (race), the YAML + // already has the entry — on restart it gets opened and + // added cleanly. Operator-visible inconsistency is brief + // (just until next restart). + state + .registry() + .insert(Arc::clone(&handle)) + .await + .map_err(|err| match err { + registry::InsertError::DuplicateKey(_) | registry::InsertError::DuplicateUri(_) => { + ApiError::conflict(err.to_string()) + } + })?; + + Ok(( + StatusCode::CREATED, + Json(api::GraphCreateResponse { + graph_id: graph_id.as_str().to_string(), + uri: request.uri, + }), + )) +} + +/// Load `omnigraph.yaml` from disk, add the new graph entry, write it +/// back via `config::rewrite_atomic`, and update the in-memory baseline +/// hash. Returns an `ApiError` mapped to the appropriate HTTP status +/// (503 for drift, 500 for IO/serialize failures). +/// +/// Runs inside `tokio::task::spawn_blocking` — `fs2::flock` is sync. +fn rewrite_yaml_with_new_graph( + config_path: &std::path::Path, + config_hash: &Arc>, + graph_id: &str, + new_target: config::TargetConfig, +) -> std::result::Result<(), ApiError> { + // Re-read the config file to construct the next state. + let bytes = std::fs::read(config_path) + .map_err(|err| ApiError::internal(format!("read omnigraph.yaml: {err}")))?; + let mut updated: config::OmnigraphConfig = serde_yaml::from_slice(&bytes) + .map_err(|err| ApiError::internal(format!("parse omnigraph.yaml: {err}")))?; + updated.graphs.insert(graph_id.to_string(), new_target); + + // Grab the current baseline hash for the drift check. + let expected = *config_hash + .lock() + .expect("config_hash mutex must not be poisoned"); + let new_hash = config::rewrite_atomic(config_path, &updated, &expected).map_err(|err| { + match err { + config::RewriteAtomicError::Drift => ApiError { + status: StatusCode::SERVICE_UNAVAILABLE, + code: ErrorCode::Conflict, + message: err.to_string(), + merge_conflicts: Vec::new(), + manifest_conflict: None, + }, + other => ApiError::internal(other.to_string()), + } + })?; + + // Update the baseline so the next POST sees this as the new "no + // drift" reference. If we forgot this, every POST after the first + // would 503. + *config_hash + .lock() + .expect("config_hash mutex must not be poisoned") = new_hash; + Ok(()) +} + async fn server_openapi(State(state): State) -> Json { let mut doc = ApiDoc::openapi(); if !state.requires_bearer_auth() { diff --git a/crates/omnigraph-server/tests/openapi.rs b/crates/omnigraph-server/tests/openapi.rs index b4f96df..4451bbc 100644 --- a/crates/omnigraph-server/tests/openapi.rs +++ b/crates/omnigraph-server/tests/openapi.rs @@ -1008,7 +1008,7 @@ async fn app_for_multi_mode(graph_ids: &[&str]) -> (Vec, Rout dirs.push(dir); } let workload = omnigraph_server::workload::WorkloadController::from_env(); - let state = AppState::new_multi(handles, Vec::new(), None, workload, None).unwrap(); + let state = AppState::new_multi(handles, Vec::new(), None, workload, None, None).unwrap(); let app = build_app(state); (dirs, app) } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index ed5088d..e80d33b 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -4360,7 +4360,7 @@ mod multi_graph_startup { dirs.push(dir); } let workload = omnigraph_server::workload::WorkloadController::from_env(); - let state = AppState::new_multi(handles, Vec::new(), None, workload, None).unwrap(); + let state = AppState::new_multi(handles, Vec::new(), None, workload, None, None).unwrap(); let app = build_app(state); (dirs, app) } @@ -4741,7 +4741,7 @@ graphs: let tokens = vec![("act-andrew".to_string(), "secret-token".to_string())]; let workload = omnigraph_server::workload::WorkloadController::from_env(); let state = - AppState::new_multi(vec![handle], tokens, None, workload, None).unwrap(); + AppState::new_multi(vec![handle], tokens, None, workload, None, None).unwrap(); let app = build_app(state); // No Authorization header → 401. @@ -4822,6 +4822,7 @@ rules: Some(server_policy), workload, None, + None, ) .unwrap(); let app = build_app(state); @@ -4864,6 +4865,333 @@ rules: ); } + // ─── PR 7 — POST /graphs ────────────────────────────────────────── + + use omnigraph_server::api::{GraphCreateRequest, GraphCreateResponse, GraphPolicySpec, GraphSchemaSpec}; + use omnigraph_server::config::{OmnigraphConfig, hash_config_file}; + + /// Spin up a multi-mode server whose `omnigraph.yaml` we control, + /// so PR 7's `POST /graphs` can rewrite it. Returns the config + /// directory (to live across the test) and a built `Router`. + async fn multi_mode_app_with_real_config( + initial_graphs: &[&str], + ) -> (tempfile::TempDir, Router) { + let cfg_dir = tempfile::tempdir().unwrap(); + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + + // Init each starting graph at a real URI inside the config dir. + let mut yaml_graphs = String::new(); + let mut handles = Vec::new(); + for id in initial_graphs { + let graph_uri = cfg_dir.path().join(format!("{id}.omni")); + Omnigraph::init(graph_uri.to_str().unwrap(), &schema) + .await + .unwrap(); + yaml_graphs.push_str(&format!( + " {id}:\n uri: {}\n", + graph_uri.display() + )); + // Open in-memory engine for the handle. + let engine = Omnigraph::open(graph_uri.to_str().unwrap()) + .await + .unwrap(); + handles.push(Arc::new( + omnigraph_server::GraphHandle { + key: omnigraph_server::GraphKey::cluster( + omnigraph_server::GraphId::try_from(*id).unwrap(), + ), + uri: graph_uri.to_string_lossy().to_string(), + engine: Arc::new(engine), + policy: None, + }, + )); + } + let config_path = cfg_dir.path().join("omnigraph.yaml"); + fs::write(&config_path, format!("graphs:\n{yaml_graphs}")).unwrap(); + let config_hash = hash_config_file(&config_path).unwrap(); + + let workload = omnigraph_server::workload::WorkloadController::from_env(); + let state = AppState::new_multi( + handles, + Vec::new(), + None, + workload, + Some(config_path.clone()), + Some(config_hash), + ) + .unwrap(); + let app = build_app(state); + (cfg_dir, app) + } + + async fn post_graph( + app: &Router, + body: &GraphCreateRequest, + auth: Option<&str>, + ) -> (StatusCode, Value) { + let json_body = serde_json::to_vec(body).unwrap(); + let mut request = Request::builder() + .method(Method::POST) + .uri("/graphs") + .header("content-type", "application/json"); + if let Some(token) = auth { + request = request.header("authorization", format!("Bearer {token}")); + } + let req = request.body(Body::from(json_body)).unwrap(); + let response = app.clone().oneshot(req).await.unwrap(); + let status = response.status(); + let body_bytes = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + let body_json: Value = if body_bytes.is_empty() { + Value::Null + } else { + serde_json::from_slice(&body_bytes).unwrap_or(Value::Null) + }; + (status, body_json) + } + + /// Happy path: POST creates a new graph, returns 201, the graph is + /// queryable via cluster routes, and omnigraph.yaml now includes it. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_creates_a_new_graph_end_to_end() { + let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await; + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + let new_uri = cfg_dir.path().join("beta.omni"); + let req = GraphCreateRequest { + graph_id: "beta".to_string(), + uri: new_uri.to_string_lossy().to_string(), + schema: GraphSchemaSpec { source: schema }, + policy: None, + }; + let (status, body) = post_graph(&app, &req, None).await; + assert_eq!(status, StatusCode::CREATED, "got body: {body}"); + let resp: GraphCreateResponse = serde_json::from_value(body).unwrap(); + assert_eq!(resp.graph_id, "beta"); + + // The new graph is reachable via its cluster route. + let snap = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/graphs/beta/snapshot?branch=main") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(snap.status(), StatusCode::OK); + + // The YAML on disk now references the new graph. + let yaml = fs::read_to_string(cfg_dir.path().join("omnigraph.yaml")).unwrap(); + assert!( + yaml.contains("beta:"), + "rewritten YAML must include 'beta:'; got:\n{yaml}" + ); + assert!( + yaml.contains(new_uri.to_str().unwrap()), + "rewritten YAML must include the new URI; got:\n{yaml}" + ); + } + + /// Two POSTs in sequence both succeed: the second one's drift + /// check passes because the first POST updates the in-memory + /// baseline hash to the post-rewrite hash. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_baseline_hash_updates_between_rewrites() { + let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await; + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + for name in ["beta", "gamma"] { + let new_uri = cfg_dir.path().join(format!("{name}.omni")); + let req = GraphCreateRequest { + graph_id: name.to_string(), + uri: new_uri.to_string_lossy().to_string(), + schema: GraphSchemaSpec { + source: schema.clone(), + }, + policy: None, + }; + let (status, body) = post_graph(&app, &req, None).await; + assert_eq!(status, StatusCode::CREATED, "create {name}: {body}"); + } + let yaml = fs::read_to_string(cfg_dir.path().join("omnigraph.yaml")).unwrap(); + assert!(yaml.contains("beta:")); + assert!(yaml.contains("gamma:")); + } + + /// Duplicate `graph_id` returns 409. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_duplicate_graph_id_returns_409() { + let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await; + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + let req = GraphCreateRequest { + graph_id: "alpha".to_string(), // already registered + uri: cfg_dir + .path() + .join("alpha-duplicate.omni") + .to_string_lossy() + .to_string(), + schema: GraphSchemaSpec { source: schema }, + policy: None, + }; + let (status, body) = post_graph(&app, &req, None).await; + assert_eq!(status, StatusCode::CONFLICT, "got body: {body}"); + } + + /// Duplicate `uri` returns 409. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_duplicate_uri_returns_409() { + let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await; + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + let alpha_uri = cfg_dir.path().join("alpha.omni"); + let req = GraphCreateRequest { + graph_id: "beta".to_string(), + uri: alpha_uri.to_string_lossy().to_string(), // already in use + schema: GraphSchemaSpec { source: schema }, + policy: None, + }; + let (status, _) = post_graph(&app, &req, None).await; + assert_eq!(status, StatusCode::CONFLICT); + } + + /// Invalid `graph_id` (reserved name) returns 400. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_invalid_graph_id_returns_400() { + let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await; + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + let req = GraphCreateRequest { + graph_id: "policies".to_string(), // reserved + uri: cfg_dir + .path() + .join("policies.omni") + .to_string_lossy() + .to_string(), + schema: GraphSchemaSpec { source: schema }, + policy: None, + }; + let (status, _) = post_graph(&app, &req, None).await; + assert_eq!(status, StatusCode::BAD_REQUEST); + } + + /// Empty schema source returns 400 with a clear message. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_empty_schema_source_returns_400() { + let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await; + let req = GraphCreateRequest { + graph_id: "beta".to_string(), + uri: cfg_dir + .path() + .join("beta.omni") + .to_string_lossy() + .to_string(), + schema: GraphSchemaSpec { + source: " \n ".to_string(), + }, + policy: None, + }; + let (status, body) = post_graph(&app, &req, None).await; + assert_eq!(status, StatusCode::BAD_REQUEST); + assert!( + body.to_string().contains("schema.source"), + "expected schema.source rejection in body: {body}" + ); + } + + /// Single mode rejects `POST /graphs` with 405. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_returns_405_in_single_mode() { + let temp = init_loaded_graph().await; + let graph = graph_path(temp.path()); + let state = AppState::open(graph.to_string_lossy().to_string()) + .await + .unwrap(); + let app = build_app(state); + let req = GraphCreateRequest { + graph_id: "beta".to_string(), + uri: "/tmp/beta.omni".to_string(), + schema: GraphSchemaSpec { + source: "node Person { name: String @key }\n".to_string(), + }, + policy: None, + }; + let (status, _) = post_graph(&app, &req, None).await; + assert_eq!(status, StatusCode::METHOD_NOT_ALLOWED); + } + + /// YAML drift detection: operator hand-edits the config file + /// between server start and the POST → 503 Service Unavailable. + #[tokio::test(flavor = "multi_thread")] + async fn post_graphs_yaml_drift_detection_returns_503() { + let (cfg_dir, app) = multi_mode_app_with_real_config(&["alpha"]).await; + // Simulate an operator editing the file out from under the + // running server. This changes the on-disk hash; the server's + // in-memory baseline (computed at startup) no longer matches. + let config_path = cfg_dir.path().join("omnigraph.yaml"); + let mut yaml = fs::read_to_string(&config_path).unwrap(); + yaml.push_str("\n# operator added a comment after server start\n"); + fs::write(&config_path, yaml).unwrap(); + + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + let req = GraphCreateRequest { + graph_id: "beta".to_string(), + uri: cfg_dir + .path() + .join("beta.omni") + .to_string_lossy() + .to_string(), + schema: GraphSchemaSpec { source: schema }, + policy: None, + }; + let (status, body) = post_graph(&app, &req, None).await; + assert_eq!( + status, + StatusCode::SERVICE_UNAVAILABLE, + "expected drift detection, got: {body}" + ); + assert!( + body.to_string().contains("drift"), + "expected drift message, got: {body}" + ); + } + + /// hash_config_file is deterministic and detects byte-level changes. + #[test] + fn hash_config_file_is_deterministic_and_detects_changes() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cfg.yaml"); + fs::write(&path, "graphs:\n alpha:\n uri: /tmp/a.omni\n").unwrap(); + let h1 = hash_config_file(&path).unwrap(); + let h2 = hash_config_file(&path).unwrap(); + assert_eq!(h1, h2, "hash must be deterministic"); + fs::write(&path, "graphs:\n alpha:\n uri: /tmp/b.omni\n").unwrap(); + let h3 = hash_config_file(&path).unwrap(); + assert_ne!(h1, h3, "hash must change when content changes"); + } + + /// rewrite_atomic refuses to rewrite when the baseline doesn't match. + #[test] + fn rewrite_atomic_refuses_when_hash_drifts() { + use omnigraph_server::config::{RewriteAtomicError, rewrite_atomic}; + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cfg.yaml"); + fs::write(&path, "graphs:\n alpha:\n uri: /tmp/a.omni\n").unwrap(); + // Pass an obviously-wrong baseline hash. + let wrong_hash = [0u8; 32]; + let mut new_config = OmnigraphConfig::default(); + new_config.graphs.insert( + "beta".to_string(), + omnigraph_server::config::TargetConfig { + uri: "/tmp/b.omni".to_string(), + bearer_token_env: None, + policy: Default::default(), + }, + ); + let err = rewrite_atomic(&path, &new_config, &wrong_hash).unwrap_err(); + assert!( + matches!(err, RewriteAtomicError::Drift), + "expected Drift, got: {err}" + ); + } + /// End-to-end: load an `omnigraph.yaml` with two graphs and serve /// them. Both graphs must be queryable via cluster routes. /// diff --git a/openapi.json b/openapi.json index 8c5abc9..0bb9ec5 100644 --- a/openapi.json +++ b/openapi.json @@ -640,6 +640,121 @@ "bearer_token": [] } ] + }, + "post": { + "tags": [ + "management" + ], + "summary": "Create a new graph at runtime (MR-668 PR 7).", + "description": "Multi-graph mode only. Operators add a graph to the registry\nwithout restarting the server. The server `Omnigraph::init`s the\nnew graph at `req.uri`, atomically rewrites `omnigraph.yaml` to\ninclude the new entry, then publishes the handle in the registry.\n\nCedar-gated by `PolicyAction::GraphCreate` against\n`Omnigraph::Server::\"root\"` (the same server-level policy as\n`GET /graphs`).\n\nFailure modes:\n* Init fails → orphan storage files at `req.uri` (PR 2a cleans up\n schema files but not Lance datasets; operator removes manually).\n* Rewrite fails (`fs2::flock` IO error) → orphan storage; YAML\n unchanged.\n* YAML drift (operator edited the file) → 503; YAML and storage\n both unchanged.\n* Duplicate `graph_id` or `uri` → 409; storage already in use.", + "operationId": "createGraph", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/GraphCreateRequest" + } + } + }, + "required": true + }, + "responses": { + "201": { + "description": "Graph created", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/GraphCreateResponse" + } + } + } + }, + "400": { + "description": "Invalid request body (graph_id, schema, policy file)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "401": { + "description": "Unauthorized", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "403": { + "description": "Forbidden", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "405": { + "description": "Method not allowed (single-graph mode)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "409": { + "description": "graph_id or uri already registered", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "413": { + "description": "Request body too large (>32 MiB)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "500": { + "description": "Init failure or YAML rewrite failure", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "503": { + "description": "omnigraph.yaml drift detected (operator edited the file)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + } + }, + "security": [ + { + "bearer_token": [] + } + ] } }, "/healthz": { @@ -1325,6 +1440,56 @@ } } }, + "GraphCreateRequest": { + "type": "object", + "description": "Request body for `POST /graphs` (MR-668 PR 7).\n\nBody shape:\n```json\n{\n \"graph_id\": \"alpha\",\n \"uri\": \"/path/to/alpha.omni\",\n \"schema\": { \"source\": \"\" },\n \"policy\": { \"file\": \"./policies/alpha.yaml\" }\n}\n```\n\n32 MiB body limit (matches `INGEST_REQUEST_BODY_LIMIT_BYTES`).", + "required": [ + "graph_id", + "uri", + "schema" + ], + "properties": { + "graph_id": { + "type": "string", + "description": "New graph's id. Must satisfy `^[a-zA-Z0-9-]{1,64}$`, not start with\n`_`, and not be a reserved name. See `GraphId::try_from`." + }, + "policy": { + "oneOf": [ + { + "type": "null" + }, + { + "$ref": "#/components/schemas/GraphPolicySpec", + "description": "Per-graph Cedar policy. Optional — `None` means the graph has\nno per-graph policy enforcement (HTTP auth still applies if\nconfigured)." + } + ] + }, + "schema": { + "$ref": "#/components/schemas/GraphSchemaSpec", + "description": "Inline schema (`{ source }`). Required." + }, + "uri": { + "type": "string", + "description": "Storage URI (local path or `s3://...`). Must NOT already be in\nuse by another registered graph. Server `Omnigraph::init`s the\ngraph at this URI." + } + } + }, + "GraphCreateResponse": { + "type": "object", + "description": "Response from `POST /graphs` on success (201 Created).", + "required": [ + "graph_id", + "uri" + ], + "properties": { + "graph_id": { + "type": "string" + }, + "uri": { + "type": "string" + } + } + }, "GraphInfo": { "type": "object", "description": "One entry in the response from `GET /graphs`. Cluster operators\nconsume this list to discover which graphs the server is currently\nserving. The shape is intentionally minimal — `graph_id` and `uri`\nare the only fields a routing client needs.", @@ -1356,6 +1521,33 @@ } } }, + "GraphPolicySpec": { + "type": "object", + "description": "Per-graph policy specification in `POST /graphs`. Mirrors the\n`policy: { file }` shape in `omnigraph.yaml`'s `graphs..policy`\nsection.", + "properties": { + "file": { + "type": [ + "string", + "null" + ], + "description": "Path to the per-graph Cedar policy file, server-side.\nMust be readable by the server process at request time.\nPath is relative to the server's working directory (NOT to the\n`omnigraph.yaml`'s `base_dir`) — caller-supplied paths are\ntrusted as-is." + } + } + }, + "GraphSchemaSpec": { + "type": "object", + "description": "Schema specification for a new graph in `POST /graphs`. Nested\nper MR-668 decision 7 — leaves room for future fields without\nbreaking the request shape. Mirrors the `policy: { file }` nesting\npattern.\n\nToday only `source` (inline `.pg` text) is supported. Future fields\nmight include `schema.allow_data_loss`, `schema.version`, etc.\n\n**Asymmetric with `SchemaApplyRequest`**: `POST /schema/apply` still\nuses a flat `schema_source: String` for backwards compatibility.\nA follow-up release may migrate that too.", + "required": [ + "source" + ], + "properties": { + "source": { + "type": "string", + "description": "Inline `.pg` schema source.", + "example": "node Person {\n name: String @key\n age: I32?\n}" + } + } + }, "HealthOutput": { "type": "object", "required": [