From 16e4a833c0413971f407274747b71153acd2f376 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Tue, 16 Jun 2026 04:02:08 +0300 Subject: [PATCH] Wire cluster embedding providers --- crates/omnigraph-cluster/src/config.rs | 122 ++++++- crates/omnigraph-cluster/src/diff.rs | 20 +- crates/omnigraph-cluster/src/lib.rs | 161 ++++++--- crates/omnigraph-cluster/src/serve.rs | 76 ++++- crates/omnigraph-cluster/src/sweep.rs | 92 ++++- crates/omnigraph-cluster/src/tests.rs | 321 +++++++++++++++++- crates/omnigraph-cluster/src/types.rs | 161 +++++++-- crates/omnigraph-server/src/lib.rs | 9 +- crates/omnigraph-server/src/settings.rs | 11 + crates/omnigraph-server/tests/multi_graph.rs | 179 +++++++++- crates/omnigraph/src/db/omnigraph.rs | 11 +- crates/omnigraph/src/embedding.rs | 14 +- docs/dev/rfc-012-embedding-provider-config.md | 31 +- docs/user/clusters/config.md | 47 ++- docs/user/reference/constants.md | 12 +- docs/user/search/embeddings.md | 30 ++ 16 files changed, 1132 insertions(+), 165 deletions(-) diff --git a/crates/omnigraph-cluster/src/config.rs b/crates/omnigraph-cluster/src/config.rs index acc954d..d0e0edd 100644 --- a/crates/omnigraph-cluster/src/config.rs +++ b/crates/omnigraph-cluster/src/config.rs @@ -42,7 +42,12 @@ pub(crate) fn resolve_query_decls( return ( map.iter() .map(|(name, config)| { - (name.clone(), QueryConfig { file: config.file.clone() }) + ( + name.clone(), + QueryConfig { + file: config.file.clone(), + }, + ) }) .collect(), BTreeMap::new(), @@ -66,7 +71,10 @@ pub(crate) fn resolve_query_decls( diagnostics.push(Diagnostic::error( "query_dir_unreadable", format!("graphs.{graph_id}.queries"), - format!("could not list query directory '{}': {err}", resolved.display()), + format!( + "could not list query directory '{}': {err}", + resolved.display() + ), )); continue; } @@ -76,7 +84,10 @@ pub(crate) fn resolve_query_decls( diagnostics.push(Diagnostic::warning( "query_dir_empty", format!("graphs.{graph_id}.queries"), - format!("query directory '{}' contains no .gq files", resolved.display()), + format!( + "query directory '{}' contains no .gq files", + resolved.display() + ), )); } for path in entries { @@ -132,7 +143,12 @@ pub(crate) fn resolve_query_decls( continue; } origin.insert(name.clone(), declared.clone()); - registry.insert(name, QueryConfig { file: declared.clone() }); + registry.insert( + name, + QueryConfig { + file: declared.clone(), + }, + ); } contents.insert(declared, source); } @@ -269,8 +285,6 @@ pub(crate) fn validate_cluster_header( } } - - pub(crate) fn state_resource_digests(state: &ClusterState) -> BTreeMap { state .applied_revision @@ -295,7 +309,6 @@ pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState { } } - pub(crate) async fn observe_declared_graphs( desired: &DesiredCluster, backend: &ClusterStore, @@ -350,19 +363,28 @@ pub(crate) async fn observe_declared_graphs( StateResource { digest: observation.schema_digest.clone(), applies_to: None, + embedding_provider: None, + embedding_profile: None, }, ); let query_digests = state_query_digests_for_graph(state, &graph.id); + let embedding_provider = state_graph_embedding_provider(state, &graph.id); + let embedding_provider_digest = + state_embedding_provider_digest(state, embedding_provider.as_deref()); let graph_digest_value = graph_digest( &graph.id, Some(&observation.schema_digest), Some(&query_digests), + embedding_provider.as_deref(), + embedding_provider_digest.as_ref(), ); state.applied_revision.resources.insert( graph_address.clone(), StateResource { digest: graph_digest_value, applies_to: None, + embedding_provider, + embedding_profile: None, }, ); state.observations.insert( @@ -499,7 +521,6 @@ pub(crate) fn graph_observation_json(observation: GraphObservationJson<'_>) -> s }) } - pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { let parsed = parse_cluster_config(config_dir); let config_dir = parsed.config_dir; @@ -519,6 +540,35 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { let mut dependencies = BTreeSet::new(); let mut graph_query_digests: BTreeMap> = BTreeMap::new(); let mut graph_schema_digests: BTreeMap = BTreeMap::new(); + let mut graph_embedding_providers: BTreeMap = BTreeMap::new(); + let mut embedding_provider_digests: BTreeMap = BTreeMap::new(); + let mut embedding_providers: BTreeMap = BTreeMap::new(); + + for (provider_name, profile) in &raw.providers.embedding { + validate_id( + "embedding provider name", + &format!("providers.embedding.{provider_name}"), + provider_name, + &mut diagnostics, + ); + let address = embedding_provider_address(provider_name); + profile.validate( + format!("providers.embedding.{provider_name}"), + &mut diagnostics, + ); + let digest = embedding_provider_digest(profile); + embedding_provider_digests.insert(address.clone(), digest.clone()); + embedding_providers.insert(address.clone(), profile.clone()); + resources.insert( + address.clone(), + ResourceSummary { + address, + kind: "embedding_provider".to_string(), + digest, + path: None, + }, + ); + } for (graph_id, graph) in &raw.graphs { validate_id( @@ -533,6 +583,35 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { from: schema_address.clone(), to: graph_address.clone(), }); + if let Some(provider_ref) = graph.embedding_provider.as_deref() { + match normalize_embedding_provider_target(provider_ref) { + EmbeddingProviderTarget::Provider(provider_name) => { + let provider_address = embedding_provider_address(&provider_name); + if raw.providers.embedding.contains_key(&provider_name) { + dependencies.insert(Dependency { + from: graph_address.clone(), + to: provider_address.clone(), + }); + graph_embedding_providers.insert(graph_id.clone(), provider_address); + } else { + diagnostics.push(Diagnostic::error( + "dangling_embedding_provider_reference", + format!("graphs.{graph_id}.embedding_provider"), + format!( + "graph references embedding provider `{provider_name}`, but no providers.embedding.{provider_name} profile is declared" + ), + )); + } + } + EmbeddingProviderTarget::WrongKind(kind) => diagnostics.push(Diagnostic::error( + "wrong_kind_reference", + format!("graphs.{graph_id}.embedding_provider"), + format!( + "embedding_provider expects a providers.embedding ref or bare provider name, got `{kind}`" + ), + )), + } + } let schema_path = resolve_config_path(&config_dir, &graph.schema); let schema_source = match fs::read_to_string(&schema_path) { @@ -646,10 +725,15 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { } for graph_id in raw.graphs.keys() { + let embedding_provider = graph_embedding_providers.get(graph_id); + let embedding_provider_digest = + embedding_provider.and_then(|address| embedding_provider_digests.get(address)); let digest = graph_digest( graph_id, graph_schema_digests.get(graph_id), graph_query_digests.get(graph_id), + embedding_provider.map(String::as_str), + embedding_provider_digest, ); resources.insert( graph_address(graph_id), @@ -754,6 +838,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { .get(graph_id) .cloned() .unwrap_or_default(), + embedding_provider: graph_embedding_providers.get(graph_id).cloned(), }) .collect(); let config_digest = desired_config_digest(&raw, &resource_digests); @@ -769,6 +854,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { resources: resource_list, dependencies, policy_bindings, + embedding_providers, }), diagnostics, config_dir, @@ -828,7 +914,6 @@ pub(crate) fn future_field_diagnostics(text: &str) -> Vec { let future_fields = [ "apply", "env_file", - "providers", "pipelines", "embeddings", "ui", @@ -882,6 +967,21 @@ pub(crate) fn normalize_policy_target(value: &str) -> PolicyTarget { } } +enum EmbeddingProviderTarget { + Provider(String), + WrongKind(String), +} + +fn normalize_embedding_provider_target(value: &str) -> EmbeddingProviderTarget { + if let Some(name) = value.strip_prefix("provider.embedding.") { + EmbeddingProviderTarget::Provider(name.to_string()) + } else if value.contains('.') { + EmbeddingProviderTarget::WrongKind(value.to_string()) + } else { + EmbeddingProviderTarget::Provider(value.to_string()) + } +} + pub(crate) fn graph_address(graph_id: &str) -> String { format!("graph.{graph_id}") } @@ -898,6 +998,10 @@ pub(crate) fn policy_address(policy_name: &str) -> String { format!("policy.{policy_name}") } +pub(crate) fn embedding_provider_address(provider_name: &str) -> String { + format!("provider.embedding.{provider_name}") +} + pub(crate) fn resolve_config_path(config_dir: &Path, path: &Path) -> PathBuf { if path.is_absolute() { path.to_path_buf() diff --git a/crates/omnigraph-cluster/src/diff.rs b/crates/omnigraph-cluster/src/diff.rs index 593b2fa..516a86e 100644 --- a/crates/omnigraph-cluster/src/diff.rs +++ b/crates/omnigraph-cluster/src/diff.rs @@ -152,7 +152,9 @@ pub(crate) fn approved_resources( let candidates: Vec<&ApprovalArtifact> = artifacts .iter() .map(|(_, artifact)| artifact) - .filter(|artifact| artifact.consumed_at.is_none() && artifact.resource == change.resource) + .filter(|artifact| { + artifact.consumed_at.is_none() && artifact.resource == change.resource + }) .collect(); if candidates.is_empty() { continue; @@ -181,6 +183,7 @@ pub(crate) enum ResourceKind { Schema(String), Query { graph: String, name: String }, Policy(String), + EmbeddingProvider(String), Unknown, } @@ -199,6 +202,8 @@ pub(crate) fn resource_kind(address: &str) -> ResourceKind { } } else if let Some(name) = address.strip_prefix("policy.") { ResourceKind::Policy(name.to_string()) + } else if let Some(name) = address.strip_prefix("provider.embedding.") { + ResourceKind::EmbeddingProvider(name.to_string()) } else { ResourceKind::Unknown } @@ -261,8 +266,7 @@ pub(crate) fn classify_changes( let (disposition, reason) = match resource_kind(&change.resource) { ResourceKind::Schema(graph) => match change.operation { PlanOperation::Create - if graph_creates.contains(&graph) - && !pending_recovery.contains(&graph) => + if graph_creates.contains(&graph) && !pending_recovery.contains(&graph) => { // Applied with the graph create — the init carries it. (ApplyDisposition::Applied, None) @@ -325,10 +329,7 @@ pub(crate) fn classify_changes( if pending_recovery.contains(&graph) { (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) } else if schema_pending.contains(&graph) { - ( - ApplyDisposition::Blocked, - Some("dependency_not_applied"), - ) + (ApplyDisposition::Blocked, Some("dependency_not_applied")) } else { // A graph create in the same plan no longer blocks: // creates execute first in the same apply run. @@ -353,9 +354,8 @@ pub(crate) fn classify_changes( } } }, - ResourceKind::Unknown => { - (ApplyDisposition::Deferred, Some("apply_unsupported_kind")) - } + ResourceKind::EmbeddingProvider(_) => (ApplyDisposition::Applied, None), + ResourceKind::Unknown => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), }; change.disposition = Some(disposition); change.reason = reason.map(str::to_string); diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 0c0f4e6..6251913 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -20,18 +20,34 @@ use ulid::Ulid; pub mod failpoints; mod config; -mod types; mod diff; mod serve; -mod sweep; mod store; +mod sweep; +mod types; +use config::{ + QueriesDecl, future_field_diagnostics, graph_address, initial_import_state, load_desired, + normalize_policy_target, observe_declared_graphs, observe_live_graph, parse_cluster_config, + policy_address, preview_schema_migration, query_address, resolve_config_path, + resolve_query_decls, schema_address, state_resource_digests, validate_cluster_header, + validate_id, validate_query_source, +}; +use diff::{ + FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, + classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, + diff_resources, resource_kind, +}; +pub use serve::{ + ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, cluster_root_for_graph_uri, + read_serving_snapshot, read_serving_snapshot_from_storage, resolve_graph_storage_uri, +}; use store::{ClusterStore, StateLockGuard, StateSnapshot}; +use sweep::{ + mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, + tombstone_graph_subtree, warn_pending_recovery_sidecars, +}; pub use types::*; use types::*; -pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, cluster_root_for_graph_uri, read_serving_snapshot, read_serving_snapshot_from_storage, resolve_graph_storage_uri}; -use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source}; -use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind}; -use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars}; pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; @@ -44,10 +60,7 @@ pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals"; /// The store for a load outcome: the declared `storage:` root when present, /// the config directory itself otherwise. A bad root is a loud error. -fn store_for( - config_dir: &Path, - storage_root: Option<&str>, -) -> Result { +fn store_for(config_dir: &Path, storage_root: Option<&str>) -> Result { match storage_root { Some(root) => ClusterStore::for_storage_root(root), None => Ok(ClusterStore::for_config_dir(config_dir)), @@ -179,7 +192,12 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { &desired.config_digest, &mut diagnostics, ); - classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new(), &approved); + classify_changes( + &mut changes, + &desired.dependencies, + &BTreeSet::new(), + &approved, + ); // Embed real migration steps for schema updates so plan is a data-aware // preview; failures degrade to the digest diff with a warning. @@ -282,9 +300,7 @@ pub async fn apply_config_dir_with_options( ok: !has_errors(&diagnostics), config_dir, actor: actor_for_output.clone(), - desired_revision: DesiredRevision { - config_digest, - }, + desired_revision: DesiredRevision { config_digest }, state_observations: observations, changes, applied_count: 0, @@ -464,8 +480,7 @@ pub async fn apply_config_dir_with_options( failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); continue; } - let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) - else { + let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else { continue; }; let graph_uri = backend.graph_root(graph_id); @@ -604,8 +619,7 @@ pub async fn apply_config_dir_with_options( failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); continue; } - let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) - else { + let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else { continue; }; let graph_uri = backend.graph_root(graph_id); @@ -955,8 +969,10 @@ pub async fn apply_config_dir_with_options( .expect("create/update always carries an after digest"), // Policies record their applied bindings so the // ledger is serving-sufficient (RFC-005 §D3). - applies_to: desired - .policy_bindings + applies_to: desired.policy_bindings.get(&change.resource).cloned(), + embedding_provider: None, + embedding_profile: desired + .embedding_providers .get(&change.resource) .cloned(), }, @@ -964,7 +980,10 @@ pub async fn apply_config_dir_with_options( set_resource_status_applied(&mut new_state, &change.resource); } PlanOperation::Delete => { - new_state.applied_revision.resources.remove(&change.resource); + new_state + .applied_revision + .resources + .remove(&change.resource); new_state.resource_statuses.remove(&change.resource); } }, @@ -1219,7 +1238,6 @@ pub async fn approve_config_dir( } } - pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; @@ -1238,7 +1256,9 @@ pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { } }; let mut observations = backend.observations(); - backend.observe_lock(&mut observations, &mut diagnostics).await; + backend + .observe_lock(&mut observations, &mut diagnostics) + .await; warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); let mut resource_digests = BTreeMap::new(); @@ -1254,9 +1274,7 @@ pub async fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { // Read-only point-in-time catalog check: report the // findings as diagnostics; persisting Drifted statuses // is refresh's job. Status never writes state. - for (address, finding) in - verify_catalog_payloads(&backend, &state).await - { + for (address, finding) in verify_catalog_payloads(&backend, &state).await { diagnostics.push(payload_finding_diagnostic(&address, &finding)); } resource_digests = state_resource_digests(&state); @@ -1312,7 +1330,10 @@ pub async fn force_unlock_config_dir( if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); if !has_errors(&diagnostics) { - match backend.force_unlock(lock_id.as_ref(), &mut observations).await { + match backend + .force_unlock(lock_id.as_ref(), &mut observations) + .await + { Ok(()) => lock_removed = true, Err(diagnostic) => diagnostics.push(diagnostic), } @@ -1380,7 +1401,10 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St let operation_label = state_sync_operation_label(operation); let _lock_guard = if desired.state_lock { - match backend.acquire_lock(operation_label, &mut observations).await { + match backend + .acquire_lock(operation_label, &mut observations) + .await + { Ok(guard) => Some(guard), Err(diagnostic) => { diagnostics.push(diagnostic); @@ -1542,7 +1566,10 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St state.state_revision = state.state_revision.saturating_add(1); } - match backend.write_state(&state, expected_cas.as_deref(), &mut observations).await { + match backend + .write_state(&state, expected_cas.as_deref(), &mut observations) + .await + { Ok(()) => { // Completed sweep sidecars are deleted only after their outcome // is durably recorded; on failure they stay and re-sweep. @@ -1569,9 +1596,6 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St } } - - - #[derive(Debug, PartialEq, Eq)] enum PayloadFinding { Missing, @@ -1650,7 +1674,10 @@ async fn write_resource_payload( Diagnostic::error( "resource_payload_write_error", resource, - format!("could not read resource source '{}': {err}", source.display()), + format!( + "could not read resource source '{}': {err}", + source.display() + ), ) })?; if sha256_hex(&bytes) != expected_digest { @@ -1692,7 +1719,11 @@ async fn write_resource_payload( fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredCluster) { for graph in &desired.graphs { let graph_address = graph_address(&graph.id); - if !state.applied_revision.resources.contains_key(&graph_address) { + if !state + .applied_revision + .resources + .contains_key(&graph_address) + { continue; } let schema_digest = state @@ -1701,11 +1732,26 @@ fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredClus .get(&schema_address(&graph.id)) .map(|resource| resource.digest.clone()); let query_digests = state_query_digests_for_graph(state, &graph.id); - let digest = graph_digest(&graph.id, schema_digest.as_ref(), Some(&query_digests)); - state - .applied_revision - .resources - .insert(graph_address, StateResource { digest, applies_to: None }); + let embedding_provider = graph.embedding_provider.as_deref(); + let embedding_provider_digest = embedding_provider + .and_then(|address| state.applied_revision.resources.get(address)) + .map(|resource| resource.digest.clone()); + let digest = graph_digest( + &graph.id, + schema_digest.as_ref(), + Some(&query_digests), + embedding_provider, + embedding_provider_digest.as_ref(), + ); + state.applied_revision.resources.insert( + graph_address, + StateResource { + digest, + applies_to: None, + embedding_provider: graph.embedding_provider.clone(), + embedding_profile: None, + }, + ); } } @@ -1773,7 +1819,6 @@ fn duplicate_key_diagnostics(text: &str) -> Vec { diagnostics } - fn strip_comment(line: &str) -> String { let mut in_single_quote = false; let mut in_double_quote = false; @@ -1796,7 +1841,6 @@ fn strip_comment(line: &str) -> String { line.to_string() } - fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap { let prefix = format!("query.{graph_id}."); state @@ -1811,6 +1855,23 @@ fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeM .collect() } +fn state_graph_embedding_provider(state: &ClusterState, graph_id: &str) -> Option { + state + .applied_revision + .resources + .get(&graph_address(graph_id)) + .and_then(|resource| resource.embedding_provider.clone()) +} + +fn state_embedding_provider_digest( + state: &ClusterState, + embedding_provider: Option<&str>, +) -> Option { + embedding_provider + .and_then(|address| state.applied_revision.resources.get(address)) + .map(|resource| resource.digest.clone()) +} + fn set_resource_status_applied(state: &mut ClusterState, address: &str) { state.resource_statuses.insert( address.to_string(), @@ -1843,6 +1904,8 @@ fn graph_digest( graph_id: &str, schema_digest: Option<&String>, query_digests: Option<&BTreeMap>, + embedding_provider: Option<&str>, + embedding_provider_digest: Option<&String>, ) -> String { let mut input = format!( "graph\0{graph_id}\0schema\0{}\0", @@ -1857,6 +1920,21 @@ fn graph_digest( input.push('\0'); } } + if let Some(provider) = embedding_provider { + input.push_str("embedding_provider\0"); + input.push_str(provider); + input.push('\0'); + input.push_str(embedding_provider_digest.map_or("", String::as_str)); + input.push('\0'); + } + sha256_hex(input.as_bytes()) +} + +fn embedding_provider_digest(profile: &EmbeddingProviderConfig) -> String { + let mut input = String::from("embedding-provider\0"); + let config_semantics = + serde_json::to_string(profile).expect("embedding provider config must serialize"); + input.push_str(&config_semantics); sha256_hex(input.as_bytes()) } @@ -1930,7 +2008,6 @@ fn display_path(path: &Path) -> String { path.display().to_string() } - #[cfg(test)] #[path = "tests.rs"] mod tests; diff --git a/crates/omnigraph-cluster/src/serve.rs b/crates/omnigraph-cluster/src/serve.rs index 241ab41..a0357b4 100644 --- a/crates/omnigraph-cluster/src/serve.rs +++ b/crates/omnigraph-cluster/src/serve.rs @@ -8,6 +8,7 @@ use super::*; pub struct ServingGraph { pub graph_id: String, pub root: PathBuf, + pub embedding: Option, } /// One stored query: its graph binding, registry name, and verified source. @@ -111,7 +112,10 @@ pub async fn cluster_root_for_graph_uri(graph_uri: &str) -> Option { /// /// `cluster` is a config directory or a storage-root URI (`s3://…`, config-free), /// mirroring the server's `--cluster` dispatch. -pub async fn resolve_graph_storage_uri(cluster: &str, graph_id: &str) -> Result { +pub async fn resolve_graph_storage_uri( + cluster: &str, + graph_id: &str, +) -> Result { let backend = if cluster.contains("://") { ClusterStore::for_storage_root(cluster)? } else { @@ -200,15 +204,73 @@ async fn read_snapshot_with_store( return Err(diagnostics); }; + let mut embedding_profiles: BTreeMap = BTreeMap::new(); + for (address, entry) in &state.applied_revision.resources { + if !matches!(resource_kind(address), ResourceKind::EmbeddingProvider(_)) { + continue; + } + let Some(profile) = entry.embedding_profile.clone() else { + diagnostics.push(Diagnostic::error( + "embedding_provider_profile_missing", + address.clone(), + "no applied embedding provider profile recorded; re-run `cluster apply` to backfill", + )); + continue; + }; + let actual_digest = embedding_provider_digest(&profile); + if actual_digest != entry.digest { + diagnostics.push(Diagnostic::error( + "embedding_provider_digest_mismatch", + address.clone(), + format!( + "applied embedding provider profile does not match its recorded digest (actual sha256:{actual_digest}); run `cluster refresh` then `cluster apply`, and restart" + ), + )); + continue; + } + embedding_profiles.insert(address.clone(), profile); + } + let mut graphs = Vec::new(); let mut queries = Vec::new(); let mut policies = Vec::new(); for (address, entry) in &state.applied_revision.resources { match resource_kind(address) { ResourceKind::Graph(graph_id) => { + let embedding = match entry.embedding_provider.as_deref() { + Some(provider_address) => match resource_kind(provider_address) { + ResourceKind::EmbeddingProvider(_) => { + match embedding_profiles.get(provider_address) { + Some(profile) => Some(profile.clone()), + None => { + diagnostics.push(Diagnostic::error( + "embedding_provider_missing", + address.clone(), + format!( + "graph references `{provider_address}`, but no applied embedding provider profile is available; re-run `cluster apply`" + ), + )); + None + } + } + } + _ => { + diagnostics.push(Diagnostic::error( + "wrong_kind_reference", + address.clone(), + format!( + "graph embedding_provider expects `provider.embedding.`, got `{provider_address}`" + ), + )); + None + } + }, + None => None, + }; graphs.push(ServingGraph { root: PathBuf::from(backend.graph_root(&graph_id)), graph_id, + embedding, }); } ResourceKind::Schema(_) => {} @@ -216,7 +278,10 @@ async fn read_snapshot_with_store( let ResourceKind::Query { graph, name } = &kind else { unreachable!() }; - match backend.read_verified_payload(&kind, &entry.digest, address).await { + match backend + .read_verified_payload(&kind, &entry.digest, address) + .await + { Ok(source) => queries.push(ServingQuery { graph_id: graph.clone(), name: name.clone(), @@ -237,7 +302,10 @@ async fn read_snapshot_with_store( )); continue; }; - match backend.read_verified_payload(&kind, &entry.digest, address).await { + match backend + .read_verified_payload(&kind, &entry.digest, address) + .await + { Ok(source) => policies.push(ServingPolicy { name: name.clone(), source, @@ -246,6 +314,7 @@ async fn read_snapshot_with_store( Err(diagnostic) => diagnostics.push(diagnostic), } } + ResourceKind::EmbeddingProvider(_) => {} ResourceKind::Unknown => {} } } @@ -313,4 +382,3 @@ mod tests { ); } } - diff --git a/crates/omnigraph-cluster/src/sweep.rs b/crates/omnigraph-cluster/src/sweep.rs index 7aecb01..6539cae 100644 --- a/crates/omnigraph-cluster/src/sweep.rs +++ b/crates/omnigraph-cluster/src/sweep.rs @@ -19,13 +19,29 @@ pub(crate) async fn sweep_recovery_sidecars( for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await { match sidecar.kind { RecoverySidecarKind::GraphCreate => { - sweep_graph_create_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await; + sweep_graph_create_sidecar( + backend, + path, + sidecar, + state, + diagnostics, + &mut outcome, + ) + .await; } RecoverySidecarKind::SchemaApply => { sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; } RecoverySidecarKind::GraphDelete => { - sweep_graph_delete_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await; + sweep_graph_delete_sidecar( + backend, + path, + sidecar, + state, + diagnostics, + &mut outcome, + ) + .await; } } } @@ -71,15 +87,30 @@ pub(crate) async fn sweep_graph_create_sidecar( StateResource { digest: live_digest.clone(), applies_to: None, + embedding_provider: None, + embedding_profile: None, }, ); let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); - let composite = - graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); - state - .applied_revision - .resources - .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); + let embedding_provider = state_graph_embedding_provider(state, &sidecar.graph_id); + let embedding_provider_digest = + state_embedding_provider_digest(state, embedding_provider.as_deref()); + let composite = graph_digest( + &sidecar.graph_id, + Some(&live_digest), + Some(&query_digests), + embedding_provider.as_deref(), + embedding_provider_digest.as_ref(), + ); + state.applied_revision.resources.insert( + graph_address.clone(), + StateResource { + digest: composite, + applies_to: None, + embedding_provider, + embedding_profile: None, + }, + ); set_resource_status_applied(state, &graph_address); set_resource_status_applied(state, &schema_addr); state.recovery_records.insert( @@ -200,14 +231,30 @@ pub(crate) async fn sweep_schema_apply_sidecar( StateResource { digest: live_digest.clone(), applies_to: None, + embedding_provider: None, + embedding_profile: None, }, ); let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); - let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); - state - .applied_revision - .resources - .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); + let embedding_provider = state_graph_embedding_provider(state, &sidecar.graph_id); + let embedding_provider_digest = + state_embedding_provider_digest(state, embedding_provider.as_deref()); + let composite = graph_digest( + &sidecar.graph_id, + Some(&live_digest), + Some(&query_digests), + embedding_provider.as_deref(), + embedding_provider_digest.as_ref(), + ); + state.applied_revision.resources.insert( + graph_address.clone(), + StateResource { + digest: composite, + applies_to: None, + embedding_provider, + embedding_profile: None, + }, + ); set_resource_status_applied(state, &graph_address); set_resource_status_applied(state, &schema_addr); state.recovery_records.insert( @@ -274,7 +321,11 @@ pub(crate) async fn sweep_graph_delete_sidecar( return; } - if !state.applied_revision.resources.contains_key(&graph_address) { + if !state + .applied_revision + .resources + .contains_key(&graph_address) + { // Row 7: already tombstoned (or never recorded); crash fell between // the state CAS and sidecar delete. outcome.completed_sidecars.push(path); @@ -283,7 +334,12 @@ pub(crate) async fn sweep_graph_delete_sidecar( // Row 7b: the root is gone, the ledger is stale — roll forward the // tombstone, consume the approval the sidecar carries, audit. - tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref()); + tombstone_graph_subtree( + state, + &sidecar.graph_id, + sidecar.approval_id.as_deref(), + sidecar.actor.as_deref(), + ); state.recovery_records.insert( sidecar.operation_id.clone(), json!({ @@ -342,7 +398,11 @@ pub(crate) fn tombstone_graph_subtree( /// Record approval consumption in the state ledger. The artifact FILE is /// rewritten with consumed_at only after the state write lands, so a failed /// CAS leaves the approval valid for the retry. -pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) { +pub(crate) fn record_approval_consumed( + state: &mut ClusterState, + approval_id: &str, + operation_id: &str, +) { state.approval_records.insert( approval_id.to_string(), json!({ diff --git a/crates/omnigraph-cluster/src/tests.rs b/crates/omnigraph-cluster/src/tests.rs index 805ecda..ac448cf 100644 --- a/crates/omnigraph-cluster/src/tests.rs +++ b/crates/omnigraph-cluster/src/tests.rs @@ -56,6 +56,39 @@ policies: dir } + fn write_mock_embedding_cluster(config_dir: &Path, model: &str) { + fs::write( + config_dir.join(CLUSTER_CONFIG_FILE), + format!( + r#" +version: 1 +metadata: + name: test +state: + backend: cluster + lock: true +providers: + embedding: + default: + kind: mock + model: {model} +graphs: + knowledge: + schema: ./people.pg + embedding_provider: default + queries: + find_person: + file: ./people.gq +policies: + base: + file: ./base.policy.yaml + applies_to: [knowledge] +"# + ), + ) + .unwrap(); + } + async fn init_derived_graph(root: &Path) { let graph_dir = root.join(CLUSTER_GRAPHS_DIR); fs::create_dir_all(&graph_dir).unwrap(); @@ -194,6 +227,95 @@ policies: assert!(codes.contains("dangling_graph_reference")); } + #[test] + fn embedding_provider_config_accepts_provider_resources_and_graph_refs() { + let dir = fixture(); + write_mock_embedding_cluster(dir.path(), "recorded-x"); + + let out = validate_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + let provider_digest = out + .resource_digests + .get("provider.embedding.default") + .expect("provider resource digest"); + assert!( + out.resources + .iter() + .any(|resource| resource.address == "provider.embedding.default" + && resource.kind == "embedding_provider" + && resource.path.is_none()) + ); + assert!( + out.dependencies + .iter() + .any(|dep| dep.from == "graph.knowledge" && dep.to == "provider.embedding.default"), + "{:?}", + out.dependencies + ); + let schema_digest = out.resource_digests.get("schema.knowledge").unwrap(); + let query_digest = out + .resource_digests + .get("query.knowledge.find_person") + .unwrap(); + let expected_graph_digest = graph_digest( + "knowledge", + Some(schema_digest), + Some( + &[("find_person".to_string(), query_digest.clone())] + .into_iter() + .collect(), + ), + Some("provider.embedding.default"), + Some(provider_digest), + ); + assert_eq!( + out.resource_digests["graph.knowledge"], + expected_graph_digest + ); + } + + #[test] + fn embedding_provider_config_rejects_bad_refs_and_inline_secrets() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +providers: + embedding: + default: + kind: openai-compatible + api_key: sk-inline +graphs: + knowledge: + schema: ./people.pg + embedding_provider: provider.policy.default + missing_provider: + schema: ./people.pg + embedding_provider: absent +"#, + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + let codes: BTreeSet<_> = out.diagnostics.iter().map(|d| d.code.as_str()).collect(); + assert!( + codes.contains("embedding_api_key_inline"), + "{:?}", + out.diagnostics + ); + assert!( + codes.contains("wrong_kind_reference"), + "{:?}", + out.diagnostics + ); + assert!( + codes.contains("dangling_embedding_provider_reference"), + "{:?}", + out.diagnostics + ); + } + #[test] fn query_key_mismatch_fails() { let dir = fixture(); @@ -1012,8 +1134,13 @@ graphs: let out = validate_config_dir(config_dir); assert!(out.ok, "{:?}", out.diagnostics); let schema_digest = out.resource_digests.get("schema.knowledge").unwrap().clone(); - let graph_composite = - graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + let graph_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some(&BTreeMap::new()), + None, + None, + ); write_state_resources( config_dir, &[ @@ -1122,6 +1249,8 @@ graphs: .into_iter() .collect(), ), + None, + None, ); assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); assert_eq!( @@ -1136,6 +1265,117 @@ graphs: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } + #[tokio::test] + async fn apply_records_embedding_provider_profile_and_graph_binding() { + let dir = fixture(); + write_mock_embedding_cluster(dir.path(), "recorded-x"); + write_applyable_state(dir.path()); + let desired = validate_config_dir(dir.path()); + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap() + .clone(); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + let provider_digest = desired + .resource_digests + .get("provider.embedding.default") + .unwrap() + .clone(); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.converged, "{out:?}"); + + let state = read_state_json(dir.path()); + let resources = &state["applied_revision"]["resources"]; + let provider = resources["provider.embedding.default"] + .as_object() + .expect("provider resource"); + assert_eq!(provider["digest"], provider_digest); + assert_eq!(provider["embedding_profile"]["kind"], "mock"); + assert_eq!(provider["embedding_profile"]["model"], "recorded-x"); + assert!(provider["embedding_profile"].get("api_key").is_none()); + assert_eq!( + resources["graph.knowledge"]["embedding_provider"], + "provider.embedding.default" + ); + let expected_graph_digest = graph_digest( + "knowledge", + Some(&schema_digest), + Some( + &[("find_person".to_string(), query_digest)] + .into_iter() + .collect(), + ), + Some("provider.embedding.default"), + Some(&provider_digest), + ); + assert_eq!(resources["graph.knowledge"]["digest"], expected_graph_digest); + } + + #[tokio::test] + async fn embedding_provider_changes_update_provider_and_graph_plan() { + let dir = fixture(); + write_mock_embedding_cluster(dir.path(), "recorded-x"); + write_applyable_state(dir.path()); + let first = apply_config_dir(dir.path()).await; + assert!(first.ok && first.converged, "{first:?}"); + + write_mock_embedding_cluster(dir.path(), "recorded-y"); + let plan = plan_config_dir(dir.path()).await; + assert!(plan.ok, "{:?}", plan.diagnostics); + let by_resource: BTreeMap<&str, &PlanChange> = plan + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["provider.embedding.default"].operation, + PlanOperation::Update + ); + assert_eq!( + by_resource["provider.embedding.default"].disposition, + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["graph.knowledge"].operation, + PlanOperation::Update + ); + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Derived) + ); + } + + #[tokio::test] + async fn embedding_binding_survives_refresh() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_mock_embedding_cluster(dir.path(), "recorded-x"); + write_applyable_state(dir.path()); + let apply = apply_config_dir(dir.path()).await; + assert!(apply.ok && apply.converged, "{apply:?}"); + + let refresh = refresh_config_dir(dir.path()).await; + assert!(refresh.ok, "{:?}", refresh.diagnostics); + + let state = read_state_json(dir.path()); + let resources = &state["applied_revision"]["resources"]; + assert_eq!( + resources["graph.knowledge"]["embedding_provider"], + "provider.embedding.default" + ); + assert_eq!( + resources["provider.embedding.default"]["embedding_profile"]["model"], + "recorded-x" + ); + } + fn desired_revision_digest(out: &ApplyOutput) -> String { out.desired_revision.config_digest.clone().unwrap() } @@ -1150,8 +1390,13 @@ graphs: .unwrap() .clone(); let old_digest = "0".repeat(64); - let graph_composite = - graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + let graph_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some(&BTreeMap::new()), + None, + None, + ); write_state_resources( dir.path(), &[ @@ -1190,8 +1435,13 @@ graphs: .clone(); let stale_query_digest = "1".repeat(64); let stale_policy_digest = "2".repeat(64); - let graph_composite = - graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + let graph_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some(&BTreeMap::new()), + None, + None, + ); write_state_resources( dir.path(), &[ @@ -1234,6 +1484,8 @@ graphs: "knowledge", Some(&schema_digest), Some(&[("find_person".to_string(), query_digest)].into_iter().collect()), + None, + None, ); assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); } @@ -1494,8 +1746,13 @@ graphs: .get("schema.knowledge") .unwrap() .clone(); - let graph_composite = - graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + let graph_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some(&BTreeMap::new()), + None, + None, + ); write_state_resources( dir.path(), &[ @@ -2864,6 +3121,54 @@ policies: assert!(snapshot.policies[0].source.contains("rules:")); } + #[tokio::test] + async fn serving_snapshot_uses_applied_embedding_provider_profile() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_mock_embedding_cluster(dir.path(), "recorded-x"); + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + + let snapshot = read_serving_snapshot(dir.path()).await.unwrap(); + let profile = snapshot.graphs[0].embedding.as_ref().unwrap(); + assert_eq!(profile.kind.as_deref(), Some("mock")); + assert_eq!(profile.model.as_deref(), Some("recorded-x")); + } + + #[tokio::test] + async fn serving_snapshot_refuses_missing_embedding_provider_metadata() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_mock_embedding_cluster(dir.path(), "recorded-x"); + write_applyable_state(dir.path()); + let converge = apply_config_dir(dir.path()).await; + assert!(converge.converged, "{converge:?}"); + + let mut state = read_state_json(dir.path()); + state["applied_revision"]["resources"]["provider.embedding.default"] + .as_object_mut() + .unwrap() + .remove("embedding_profile"); + fs::write( + dir.path().join(CLUSTER_STATE_FILE), + serde_json::to_string_pretty(&state).unwrap(), + ) + .unwrap(); + + let err = read_serving_snapshot(dir.path()).await.unwrap_err(); + assert!( + err.iter() + .any(|diagnostic| diagnostic.code == "embedding_provider_profile_missing"), + "{err:?}" + ); + assert!( + err.iter() + .any(|diagnostic| diagnostic.code == "embedding_provider_missing"), + "{err:?}" + ); + } + #[tokio::test] async fn serving_snapshot_refuses_missing_state() { let dir = fixture(); diff --git a/crates/omnigraph-cluster/src/types.rs b/crates/omnigraph-cluster/src/types.rs index 9560673..97ad406 100644 --- a/crates/omnigraph-cluster/src/types.rs +++ b/crates/omnigraph-cluster/src/types.rs @@ -325,6 +325,7 @@ pub(crate) struct DesiredCluster { /// The declared `storage:` root, if any (None ⇒ the config dir itself). pub(crate) storage_root: Option, pub(crate) state_lock: bool, + pub(crate) embedding_providers: BTreeMap, pub(crate) graphs: Vec, pub(crate) resource_digests: BTreeMap, pub(crate) resources: Vec, @@ -337,6 +338,7 @@ pub(crate) struct DesiredCluster { pub(crate) struct DesiredGraph { pub(crate) id: String, pub(crate) schema_digest: String, + pub(crate) embedding_provider: Option, } #[derive(Debug)] @@ -376,6 +378,8 @@ pub(crate) struct RawClusterConfig { #[serde(default)] pub(crate) state: StateConfig, #[serde(default)] + pub(crate) providers: ProvidersConfig, + #[serde(default)] pub(crate) graphs: BTreeMap, #[serde(default)] pub(crate) policies: BTreeMap, @@ -394,41 +398,99 @@ pub(crate) struct StateConfig { pub(crate) lock: Option, } +#[derive(Debug, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub(crate) struct ProvidersConfig { + #[serde(default)] + pub(crate) embedding: BTreeMap, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub(crate) struct GraphConfig { pub(crate) schema: PathBuf, #[serde(default)] pub(crate) queries: QueriesDecl, - /// Optional per-graph embedding provider profile (RFC-012 Phase 5). + /// Optional reference to a top-level `providers.embedding.` profile. #[serde(default)] - pub(crate) embeddings: Option, + pub(crate) embedding_provider: Option, } -/// A graph's embedding provider profile (RFC-012 Phase 5). `provider`/`base_url`/ -/// `model` default exactly as the engine's `EmbeddingConfig::from_env` does; -/// `api_key` is a `${NAME}` env reference resolved at serving boot, never an -/// inline secret. +/// A named cluster embedding provider profile (RFC-012 Phase 5). `kind`/`base_url`/ +/// `model` default exactly as the engine's `EmbeddingConfig::from_env` does. +/// `api_key`, when required, must be a `${NAME}` env reference resolved at +/// serving boot, never an inline secret. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(deny_unknown_fields)] -pub struct EmbeddingProfile { - #[serde(default)] - pub provider: Option, - #[serde(default)] +pub struct EmbeddingProviderConfig { + #[serde(default, alias = "provider", skip_serializing_if = "Option::is_none")] + pub kind: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] pub base_url: Option, - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] pub model: Option, - pub api_key: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub api_key: Option, } -impl EmbeddingProfile { +impl EmbeddingProviderConfig { + pub(crate) fn validate(&self, path: String, diagnostics: &mut Vec) { + if let Err(error) = omnigraph::embedding::EmbeddingConfig::from_parts( + self.kind.as_deref(), + self.base_url.clone(), + self.model.clone(), + "validation-placeholder".to_string(), + ) { + diagnostics.push(Diagnostic::error( + "invalid_embedding_provider", + path.clone(), + error.to_string(), + )); + } + + if self.kind.as_deref() == Some("mock") { + if let Some(api_key) = self.api_key.as_deref() { + if secret_ref_name(api_key).is_err() { + diagnostics.push(Diagnostic::error( + "embedding_api_key_inline", + format!("{path}.api_key"), + "embedding api_key must be a ${NAME} env reference, not an inline secret", + )); + } + } + return; + } + + match self.api_key.as_deref() { + Some(api_key) if secret_ref_name(api_key).is_err() => diagnostics.push( + Diagnostic::error( + "embedding_api_key_inline", + format!("{path}.api_key"), + "embedding api_key must be a ${NAME} env reference, not an inline secret", + ), + ), + Some(_) => {} + None => diagnostics.push(Diagnostic::error( + "embedding_api_key_required", + format!("{path}.api_key"), + "non-mock embedding providers must set api_key to a ${NAME} env reference", + )), + } + } + /// Resolve into an engine `EmbeddingConfig`, reading the `${NAME}` api-key - /// reference from process env. Errors if `api_key` is not a `${NAME}` - /// reference or the named var is unset. + /// reference from process env. Mock profiles do not read env and may omit + /// `api_key`; real providers error if the reference is missing or unset. pub fn resolve(&self) -> Result { - let api_key = resolve_secret_ref(&self.api_key)?; + let api_key = if self.kind.as_deref() == Some("mock") { + String::new() + } else { + resolve_secret_ref(self.api_key.as_deref().ok_or_else(|| { + "embedding api_key is required for non-mock providers".to_string() + })?)? + }; omnigraph::embedding::EmbeddingConfig::from_parts( - self.provider.as_deref(), + self.kind.as_deref(), self.base_url.clone(), self.model.clone(), api_key, @@ -437,16 +499,21 @@ impl EmbeddingProfile { } } -/// Resolve a `${NAME}` secret reference from process env. Rejects an inline value -/// (anything not wrapped in `${…}`) so secrets never sit in the cluster config. -fn resolve_secret_ref(value: &str) -> Result { - let name = value +fn secret_ref_name(value: &str) -> Result<&str, String> { + value .trim() .strip_prefix("${") .and_then(|s| s.strip_suffix('}')) + .filter(|name| !name.trim().is_empty()) .ok_or_else(|| { format!("embedding api_key must be a ${{NAME}} env reference, got '{}'", value.trim()) - })?; + }) +} + +/// Resolve a `${NAME}` secret reference from process env. Rejects an inline value +/// (anything not wrapped in `${…}`) so secrets never sit in the cluster config. +fn resolve_secret_ref(value: &str) -> Result { + let name = secret_ref_name(value)?; std::env::var(name).map_err(|_| format!("embedding api_key env var '{name}' is not set")) } @@ -505,6 +572,16 @@ pub(crate) struct StateResource { /// non-policy resources. #[serde(default, skip_serializing_if = "Option::is_none")] pub(crate) applies_to: Option>, + /// Graph resources only: the applied `provider.embedding.` binding. + /// The provider profile itself is stored on the provider resource so + /// serving can boot without re-reading mutable desired config. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) embedding_provider: Option, + /// Embedding provider resources only: the applied profile with unresolved + /// `${ENV}` references. The server resolves the referenced env var exactly + /// once at boot and injects the resulting engine config into the graph. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) embedding_profile: Option, } #[derive(Debug, Serialize, Deserialize)] @@ -568,18 +645,18 @@ pub(crate) struct SweepOutcome { } #[cfg(test)] -mod embedding_profile_tests { - use super::EmbeddingProfile; +mod embedding_provider_config_tests { + use super::EmbeddingProviderConfig; #[test] fn resolves_secret_from_env_and_applies_defaults() { // SAFETY: a unique var name, no concurrent reader. unsafe { std::env::set_var("OG_TEST_EMBED_KEY_A", "secret-x") }; - let profile = EmbeddingProfile { - provider: Some("openai-compatible".to_string()), + let profile = EmbeddingProviderConfig { + kind: Some("openai-compatible".to_string()), base_url: None, model: Some("m".to_string()), - api_key: "${OG_TEST_EMBED_KEY_A}".to_string(), + api_key: Some("${OG_TEST_EMBED_KEY_A}".to_string()), }; let config = profile.resolve().unwrap(); assert_eq!(config.api_key, "secret-x"); @@ -589,11 +666,11 @@ mod embedding_profile_tests { #[test] fn rejects_inline_api_key() { - let profile = EmbeddingProfile { - provider: None, + let profile = EmbeddingProviderConfig { + kind: None, base_url: None, model: None, - api_key: "sk-inline".to_string(), + api_key: Some("sk-inline".to_string()), }; let err = profile.resolve().unwrap_err(); assert!(err.contains("${NAME}"), "got: {err}"); @@ -601,11 +678,11 @@ mod embedding_profile_tests { #[test] fn errors_on_unset_secret() { - let profile = EmbeddingProfile { - provider: None, + let profile = EmbeddingProviderConfig { + kind: None, base_url: None, model: None, - api_key: "${OG_TEST_DEFINITELY_UNSET_VAR}".to_string(), + api_key: Some("${OG_TEST_DEFINITELY_UNSET_VAR}".to_string()), }; let err = profile.resolve().unwrap_err(); assert!(err.contains("not set"), "got: {err}"); @@ -614,14 +691,26 @@ mod embedding_profile_tests { #[test] fn rejects_unknown_provider() { unsafe { std::env::set_var("OG_TEST_EMBED_KEY_B", "x") }; - let profile = EmbeddingProfile { - provider: Some("cohere".to_string()), + let profile = EmbeddingProviderConfig { + kind: Some("cohere".to_string()), base_url: None, model: None, - api_key: "${OG_TEST_EMBED_KEY_B}".to_string(), + api_key: Some("${OG_TEST_EMBED_KEY_B}".to_string()), }; let err = profile.resolve().unwrap_err(); assert!(err.contains("unknown embedding provider"), "got: {err}"); unsafe { std::env::remove_var("OG_TEST_EMBED_KEY_B") }; } + + #[test] + fn mock_does_not_require_secret_env() { + let profile = EmbeddingProviderConfig { + kind: Some("mock".to_string()), + base_url: None, + model: Some("cluster-mock".to_string()), + api_key: None, + }; + let config = profile.resolve().unwrap(); + assert_eq!(config.model, "cluster-mock"); + } } diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 3761e91..d45c74d 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -218,6 +218,9 @@ pub struct GraphStartupConfig { pub graph_id: String, pub uri: String, pub policy: Option, + /// Pre-resolved embedding config from an applied cluster provider profile. + /// Legacy config paths leave this unset and continue to use env resolution. + pub embedding: Option, /// Per-graph stored-query registry, loaded and identity-checked at /// settings-build time; type-checked against the schema when this /// graph's engine opens. @@ -1156,6 +1159,11 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result> let db = Omnigraph::open(&uri) .await .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?; + let db = if let Some(embedding) = cfg.embedding { + db.with_embedding_config(Arc::new(embedding)) + } else { + db + }; // Validate this graph's stored queries against the live schema and // resolve them to an attachable handle (refuse boot on breakage). @@ -1190,4 +1198,3 @@ async fn shutdown_signal() { info!("shutdown signal received"); } - diff --git a/crates/omnigraph-server/src/settings.rs b/crates/omnigraph-server/src/settings.rs index 890c5da..0054e03 100644 --- a/crates/omnigraph-server/src/settings.rs +++ b/crates/omnigraph-server/src/settings.rs @@ -99,6 +99,15 @@ pub(crate) async fn load_cluster_settings( graph_id: graph.graph_id.clone(), uri: graph.root.to_string_lossy().to_string(), policy: graph_policies.get(&graph.graph_id).cloned(), + embedding: graph + .embedding + .as_ref() + .map(|profile| { + profile.resolve().map_err(|err| { + eyre!("embedding provider for graph '{}': {err}", graph.graph_id) + }) + }) + .transpose()?, queries: registry, }); } @@ -245,6 +254,7 @@ pub async fn load_server_settings( graph_id: name.clone(), uri, policy: config.resolve_target_policy_file(name).map(PolicySource::File), + embedding: None, queries, }); } @@ -748,6 +758,7 @@ server: .to_string_lossy() .into_owned(), policy: None, + embedding: None, queries: crate::queries::QueryRegistry::default(), }], config_path: temp.path().join("omnigraph.yaml"), diff --git a/crates/omnigraph-server/tests/multi_graph.rs b/crates/omnigraph-server/tests/multi_graph.rs index 5ad847f..6410719 100644 --- a/crates/omnigraph-server/tests/multi_graph.rs +++ b/crates/omnigraph-server/tests/multi_graph.rs @@ -5,9 +5,12 @@ use std::fs; use axum::body::{Body, to_bytes}; use axum::http::{Method, Request, StatusCode}; -use omnigraph_server::api::ErrorOutput; +use omnigraph::db::Omnigraph; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_server::api::{ErrorOutput, ReadRequest}; use omnigraph_server::{AppState, build_app}; use serde_json::Value; +use serial_test::serial; use tower::ServiceExt; @@ -457,6 +460,180 @@ async fn cluster_boot_serves_applied_state() { assert_eq!(status, StatusCode::OK, "{body}"); } +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn cluster_boot_injects_embedding_provider_config() { + const EMBED_SCHEMA: &str = r#" +node Doc { + slug: String @key + title: String @index + embedding: Vector(4) @embed("title", model="cluster-mock") @index +} +"#; + const EMBED_QUERY: &str = r#" +query vector_search_string($q: String) { + match { $d: Doc } + return { $d.slug, $d.title } + order { nearest($d.embedding, $q) } + limit 3 +} +"#; + + let alpha = mock_embedding("alpha", 4); + let beta = mock_embedding("beta", 4); + let gamma = mock_embedding("gamma", 4); + let data = format!( + concat!( + r#"{{"type":"Doc","data":{{"slug":"alpha-doc","title":"alpha guide","embedding":[{}]}}}}"#, + "\n", + r#"{{"type":"Doc","data":{{"slug":"beta-doc","title":"beta guide","embedding":[{}]}}}}"#, + "\n", + r#"{{"type":"Doc","data":{{"slug":"gamma-doc","title":"gamma handbook","embedding":[{}]}}}}"# + ), + format_vector(&alpha), + format_vector(&beta), + format_vector(&gamma), + ); + + let temp = tempfile::tempdir().unwrap(); + fs::write(temp.path().join("docs.pg"), EMBED_SCHEMA).unwrap(); + fs::write(temp.path().join("search.gq"), EMBED_QUERY).unwrap(); + fs::write( + temp.path().join("cluster.yaml"), + r#" +version: 1 +providers: + embedding: + default: + kind: mock + model: cluster-mock +graphs: + knowledge: + schema: ./docs.pg + embedding_provider: default + queries: + vector_search_string: + file: ./search.gq +"#, + ) + .unwrap(); + let import = omnigraph_cluster::import_config_dir(temp.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let apply = omnigraph_cluster::apply_config_dir(temp.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + + let graph_uri = temp + .path() + .join("graphs/knowledge.omni") + .to_string_lossy() + .to_string(); + let mut db = Omnigraph::open(&graph_uri).await.unwrap(); + load_jsonl(&mut db, &data, LoadMode::Overwrite) + .await + .unwrap(); + + let _guard = EnvGuard::set(&[ + ("OMNIGRAPH_EMBEDDINGS_MOCK", None), + ("OMNIGRAPH_EMBED_PROVIDER", None), + ("OMNIGRAPH_EMBED_BASE_URL", None), + ("OMNIGRAPH_EMBED_MODEL", None), + ("OPENROUTER_API_KEY", None), + ("OPENAI_API_KEY", None), + ("GEMINI_API_KEY", None), + ]); + let settings = cluster_settings(temp.path()).await.unwrap(); + let omnigraph_server::ServerConfigMode::Multi { + graphs, + config_path, + server_policy, + } = settings.mode + else { + panic!("cluster boot must select multi-graph routing"); + }; + let state = omnigraph_server::open_multi_graph_state( + graphs, + Vec::new(), + server_policy.as_ref(), + config_path, + ) + .await + .unwrap(); + let app = build_app(state); + + let read = ReadRequest { + query_source: EMBED_QUERY.to_string(), + query_name: Some("vector_search_string".to_string()), + params: Some(serde_json::json!({ "q": "alpha" })), + branch: Some("main".to_string()), + snapshot: None, + }; + let (status, body) = json_response( + &app, + Request::builder() + .uri("/graphs/knowledge/read") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&read).unwrap())) + .unwrap(), + ) + .await; + + assert_eq!(status, StatusCode::OK, "{body}"); + assert_eq!(body["row_count"], 3); + assert_eq!(body["rows"][0]["d.slug"], "alpha-doc"); +} + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn cluster_boot_refuses_missing_embedding_secret_env() { + let temp = tempfile::tempdir().unwrap(); + fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("cluster.yaml"), + r#" +version: 1 +providers: + embedding: + default: + kind: openai-compatible + api_key: ${OG_TEST_MISSING_EMBED_KEY} +graphs: + knowledge: + schema: ./people.pg + embedding_provider: default + queries: + find_person: + file: ./people.gq +"#, + ) + .unwrap(); + let import = omnigraph_cluster::import_config_dir(temp.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let apply = omnigraph_cluster::apply_config_dir(temp.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + + let _guard = EnvGuard::set(&[ + ("OG_TEST_MISSING_EMBED_KEY", None), + ("OMNIGRAPH_EMBEDDINGS_MOCK", None), + ]); + let err = cluster_settings(temp.path()).await.unwrap_err(); + let message = err.to_string(); + assert!( + message.contains("embedding provider for graph 'knowledge'"), + "{message}" + ); + assert!(message.contains("OG_TEST_MISSING_EMBED_KEY"), "{message}"); +} + #[tokio::test] async fn cluster_boot_wires_policy_bindings_into_cedar_slots() { let temp = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index dd20efe..a1779a9 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -162,8 +162,8 @@ pub struct Omnigraph { /// avoids the per-query `from_env()` rebuild and keeps the provider HTTP /// connection pool warm. `OnceCell` guarantees a single initialization. embedding: Arc>, - /// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from a - /// cluster `graphs..embeddings` profile via [`Omnigraph::with_embedding_config`]. + /// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from an + /// applied cluster `providers.embedding` profile via [`Omnigraph::with_embedding_config`]. /// When set, the embedding cell builds its client from this instead of /// `EmbeddingClient::from_env()`; `None` keeps the env fallback. embedding_config: Option>, @@ -491,12 +491,9 @@ impl Omnigraph { /// Install a pre-resolved embedding config (RFC-012 Phase 5). Builder-style, /// mirroring [`Omnigraph::with_policy`]: a graph served from a cluster - /// `embeddings` profile injects it here; an embedded/CLI caller that doesn't + /// embedding provider profile injects it here; an embedded/CLI caller that doesn't /// call this keeps the `EmbeddingClient::from_env()` fallback. - pub fn with_embedding_config( - mut self, - config: Arc, - ) -> Self { + pub fn with_embedding_config(mut self, config: Arc) -> Self { self.embedding_config = Some(config); self } diff --git a/crates/omnigraph/src/embedding.rs b/crates/omnigraph/src/embedding.rs index 2af6167..246836c 100644 --- a/crates/omnigraph/src/embedding.rs +++ b/crates/omnigraph/src/embedding.rs @@ -48,10 +48,10 @@ enum EmbedRole { } /// The single source of truth for how embedding text becomes a vector: -/// provider + model + endpoint + key. Resolved once (from env today; from the -/// cluster `providers.embedding` profile in a later RFC-012 phase) and shared by -/// the query path and the offline CLI so stored and query vectors stay -/// same-space by construction. +/// provider + model + endpoint + key. Resolved once (from env for direct +/// engine/CLI callers, or from an applied cluster `providers.embedding` profile +/// at server boot) and shared by the query path and the offline CLI so stored +/// and query vectors stay same-space by construction. #[derive(Clone, Debug)] pub struct EmbeddingConfig { pub provider: Provider, @@ -102,7 +102,7 @@ impl EmbeddingConfig { }) } - /// Build a config from explicit parts — the cluster `embeddings` profile path + /// Build a config from explicit parts — the cluster `providers.embedding` profile path /// (RFC-012 Phase 5). `provider`/`base_url`/`model` default exactly as /// `from_env` does (shared `provider_profile`); `api_key` is already resolved /// (the cluster path resolves a `${NAME}` ref before calling this). @@ -113,7 +113,7 @@ impl EmbeddingConfig { api_key: String, ) -> Result { if provider == Some("mock") { - // An explicit `model` (e.g. a cluster `embeddings` profile) is + // An explicit `model` (e.g. a cluster `providers.embedding` profile) is // authoritative — it is what the same-space check compares against — // so honor it; fall back to `mock()`'s env-based model only when the // caller supplied none. Without this, a profile's `model` is silently @@ -951,7 +951,7 @@ mod tests { #[test] #[serial] fn from_parts_mock_honors_an_explicit_model() { - // A cluster `embeddings` profile that sets `provider: mock, model: X` + // A cluster `providers.embedding` profile that sets `kind: mock, model: X` // must resolve to model X — it is what the query-time same-space check // compares against. Env cleared so the assertion isolates the arg. let _guard = cleared_env(&[]); diff --git a/docs/dev/rfc-012-embedding-provider-config.md b/docs/dev/rfc-012-embedding-provider-config.md index 1604a5e..45083a2 100644 --- a/docs/dev/rfc-012-embedding-provider-config.md +++ b/docs/dev/rfc-012-embedding-provider-config.md @@ -1,9 +1,9 @@ # RFC: Provider-Independent Embedding Configuration -**Status:** Proposed +**Status:** Accepted — Phases 1-5 implemented **Date:** 2026-06-15 **Builds on:** the engine embedding client (`crates/omnigraph/src/embedding.rs`), the `@embed` catalog -annotation (`omnigraph-compiler/src/catalog`), the reserved cluster `embeddings`/`providers` fields +annotation (`omnigraph-compiler/src/catalog`), the cluster `providers.embedding` surface ([cluster-config-specs.md](cluster-config-specs.md), [rfc-007-operator-config.md](rfc-007-operator-config.md) for the secret-resolution pattern). **Target release:** staged — NFR floor first, then the provider-independent config core; ingest-time `@embed` @@ -95,14 +95,18 @@ providers: kind: openai-compatible # openai-compatible | gemini | mock base_url: https://openrouter.ai/api/v1 model: google/gemini-embedding-2 # or openai/text-embedding-3-large, mistralai/mistral-embed, … - dimension: 3072 api_key: ${OPENROUTER_API_KEY} +graphs: + knowledge: + schema: knowledge.pg + embedding_provider: default ``` The same `openai-compatible` kind points at OpenAI direct (`base_url: https://api.openai.com/v1`, `model: text-embedding-3-large`) or a self-hosted endpoint (vLLM/Ollama/LM Studio) by changing `base_url`. Use `kind: gemini` only to reach Google's `generativelanguage` API directly (it keeps the query/document -task-type asymmetry that the OpenAI-compatible shape does not expose). +task-type asymmetry that the OpenAI-compatible shape does not expose). Dimensions are schema-driven by the +target `Vector(N)` column, not duplicated in the provider profile. The zero-config tier keeps working with env only (`OMNIGRAPH_EMBED_PROVIDER`, `OMNIGRAPH_EMBED_BASE_URL`, `OMNIGRAPH_EMBED_MODEL`, and the provider api-key env — `OPENROUTER_API_KEY` / `OPENAI_API_KEY` / @@ -163,11 +167,12 @@ ingest phase needs for throughput, and which removes the open dependency on Gemi ### Config resolution (resolved once, shared) -Precedence, highest first: cluster `providers.embedding.` profile → env (`OMNIGRAPH_EMBED_*`, provider -api-key env) → built-in defaults. The api-key is resolved through the existing operator credential chain -(`${NAME}` → env / `~/.omnigraph/credentials` / server `TokenSource`); it never lives in the schema or any -checked-in file. Resolution happens once; the resolved client is shared by `nearest("string")` and the -offline CLI (replacing the per-query `EmbeddingClient::from_env()` rebuild at `exec/query.rs:238`). +Precedence, highest first for served cluster graphs: applied cluster `providers.embedding.` profile → +env (`OMNIGRAPH_EMBED_*`, provider api-key env) → built-in defaults. The cluster `api_key` value is a +`${NAME}` env reference resolved at server boot; plaintext never lives in the schema, state ledger, or any +checked-in file. Resolution happens once per graph handle; the resolved client is shared by +`nearest("string")`. Direct single-graph serving, embedded callers, and the offline CLI keep the env path +unless they inject an `EmbeddingConfig` directly. ### Identity recorded in the schema IR (not a new store) @@ -215,12 +220,12 @@ the design constraint; deferred to its own RFC/phase. | **2 — Provider-independent config** | `EmbeddingConfig` + `Provider` enum (OpenAiCompatible covering OpenRouter/OpenAI/local, Gemini, Mock); env-first resolution; client reuse | point `base_url` at OpenRouter, run `nearest("string")`, get correct neighbours vs OpenRouter-stored vectors; CLI shares the config | | **3 — Record identity in schema IR** | `@embed` args grammar + catalog + IR persistence | `schema show` reflects recorded model/dim | | **4 — Query-time validation** | compare resolved vs recorded; typed error; planner refusal on identity change | stored model A vs read model B → loud error, never silent garbage | -| **5 — Cluster provider wiring** | un-reserve `providers.embedding`; `${NAME}` resolution | provider profile resolved from `cluster.yaml`; legacy `omnigraph.yaml` untouched | +| **5 — Cluster provider wiring** | `providers.embedding` resources; `graphs..embedding_provider`; `${NAME}` resolution at server boot | provider profile resolved from applied cluster state; legacy `omnigraph.yaml` untouched | | later | ingest-time `@embed` (Shape C) | separate RFC | -**Status:** Phases 1–4 are implemented (`@embed("…", model="…")` is recorded in the schema IR and validated at -query time with a typed same-space error; an unrecorded `@embed` keeps working with no check). Phase 5 (cluster -`providers.embedding` wiring) and ingest-time `@embed` remain. +**Status:** Phases 1–5 are implemented (`@embed("…", model="…")` is recorded in the schema IR and validated at +query time with a typed same-space error; an unrecorded `@embed` keeps working with no check; cluster-served +graphs can bind an applied `providers.embedding` profile). Ingest-time `@embed` remains. ## Invariants & deny-list check diff --git a/docs/user/clusters/config.md b/docs/user/clusters/config.md index df2b236..348d3a4 100644 --- a/docs/user/clusters/config.md +++ b/docs/user/clusters/config.md @@ -13,7 +13,8 @@ catalog writes, **graph creation** (a declared graph that does not exist yet is initialized by apply at the derived root), **schema updates** (soft drops only), and — behind an explicit, digest-bound **approval** — **graph deletion**. It does not perform data-loss schema migrations, start servers, -or serve anything it applies: the server still boots from `omnigraph.yaml`. +or run data loads. A server can boot from the applied ledger with +`omnigraph-server --cluster `. ## Commands @@ -57,7 +58,7 @@ The exact contract: ## Supported `cluster.yaml` -Stage 3A accepts only this resource subset: +The current config surface accepts this resource subset: ```yaml version: 1 @@ -68,9 +69,18 @@ state: backend: cluster lock: true +providers: + embedding: + default: + kind: openai-compatible + base_url: https://openrouter.ai/api/v1 + model: openai/text-embedding-3-large + api_key: ${OPENROUTER_API_KEY} + graphs: knowledge: schema: knowledge.pg + embedding_provider: default queries: queries/ # discover every `query ` in queries/*.gq policies: @@ -99,6 +109,17 @@ updates all of its queries together. Paths are relative to the config directory — the cluster is one explicit folder, so no `./` prefixes are needed. +`providers.embedding.` defines a query-time embedding provider profile +for cluster-served graphs. A graph opts in with `embedding_provider: `; +bare names normalize to `provider.embedding.`. Supported provider +`kind` values are `openai-compatible` (default/OpenRouter-compatible), +`openai` (OpenAI's own host), `gemini`, and `mock`. Real providers require +`api_key: ${ENV_VAR}`; inline secrets are rejected. The env var is resolved +only when a `--cluster` server boots, so `cluster validate`, `plan`, and +`apply` do not need deployment secrets. `mock` is deterministic and does not +require `api_key`. Vector dimensions stay schema-driven by the target +`Vector(N)` column, not the provider profile. + `storage:` (optional) is the **storage root URI** for everything the cluster stores — the state ledger, lock, content-addressed catalog, recovery sidecars, approval artifacts, and the derived graph roots @@ -133,10 +154,12 @@ operation is active. - stored-query parsing and query-name matching - stored-query type-checking against the desired schema - policy `applies_to` graph references +- embedding provider profiles and graph `embedding_provider` references -Fields reserved for later phases, such as `pipelines`, `embeddings`, `ui`, -`aliases`, and `bindings`, fail with a typed diagnostic instead of being -silently ignored. +Fields reserved for later phases, such as `pipelines`, top-level +`embeddings`, `ui`, `aliases`, and `bindings`, fail with a typed diagnostic +instead of being silently ignored. Under `providers`, only `embedding` is +supported today; other provider namespaces fail as unsupported config. ## Planning @@ -156,9 +179,21 @@ resource is planned as a create. If present, the file must use this shape: "applied_revision": { "config_digest": "...", "resources": { - "graph.knowledge": { "digest": "..." }, "schema.knowledge": { "digest": "..." }, "query.knowledge.find_experts": { "digest": "..." }, + "provider.embedding.default": { + "digest": "...", + "embedding_profile": { + "kind": "openai-compatible", + "base_url": "https://openrouter.ai/api/v1", + "model": "openai/text-embedding-3-large", + "api_key": "${OPENROUTER_API_KEY}" + } + }, + "graph.knowledge": { + "digest": "...", + "embedding_provider": "provider.embedding.default" + }, "policy.base": { "digest": "...", "applies_to": ["cluster", "graph.knowledge"] diff --git a/docs/user/reference/constants.md b/docs/user/reference/constants.md index 2cad0d1..ec19f4d 100644 --- a/docs/user/reference/constants.md +++ b/docs/user/reference/constants.md @@ -19,11 +19,13 @@ | Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = cost-based auto) | traversal | | Default body limit | `1 MB` | HTTP server | | Ingest body limit | `32 MB` | HTTP server | -| Engine embed model | `gemini-embedding-2-preview` | engine embedding | -| Compiler embed model | `text-embedding-3-small` | compiler embedding | -| Embed timeout | `30 000 ms` | both clients | -| Embed retries | `4` | both clients | -| Embed retry backoff | `200 ms` | both clients | +| Default embed provider/model | `openai-compatible` / `openai/text-embedding-3-large` | engine embedding | +| OpenAI-direct embed model | `text-embedding-3-large` | engine embedding | +| Gemini-direct embed model | `gemini-embedding-2` | engine embedding | +| Embed deadline | `OMNIGRAPH_EMBED_DEADLINE_MS=60000` | engine embedding | +| Embed timeout | `OMNIGRAPH_EMBED_TIMEOUT_MS=30000` | engine embedding | +| Embed retries | `OMNIGRAPH_EMBED_RETRY_ATTEMPTS=4` | engine embedding | +| Embed retry backoff | `OMNIGRAPH_EMBED_RETRY_BACKOFF_MS=200` | engine embedding | | LANCE memory pool default | `1 GB` (raised in v0.3.0) | runtime | **Expand traversal dispatch.** With `OMNIGRAPH_TRAVERSAL_MODE` unset, the engine diff --git a/docs/user/search/embeddings.md b/docs/user/search/embeddings.md index 9e3fd55..e69d928 100644 --- a/docs/user/search/embeddings.md +++ b/docs/user/search/embeddings.md @@ -16,6 +16,36 @@ query vectors and document vectors share one model and one vector space. Vectors are stored L2-normalized as `FixedSizeList(Float32, dim)`; the requested output dimension is driven by the target column width and sent as Gemini `outputDimensionality` / OpenAI `dimensions`. +## Configuration (cluster) + +Cluster-served graphs can pin their query-time embedder in `cluster.yaml`: + +```yaml +providers: + embedding: + default: + kind: openai-compatible + base_url: https://openrouter.ai/api/v1 + model: openai/text-embedding-3-large + api_key: ${OPENROUTER_API_KEY} + +graphs: + knowledge: + schema: knowledge.pg + embedding_provider: default +``` + +`embedding_provider` references `providers.embedding.`; bare names are +normalized to that typed ref. The server resolves `${ENV_VAR}` only when it +boots from the applied cluster ledger, so `cluster validate`, `plan`, and +`apply` do not need provider secrets. Inline API keys are rejected. `mock` +needs no key. Vector dimensions stay schema-driven by the target `Vector(N)` +column. + +Direct single-graph serving, embedded callers, and the offline +`omnigraph embed` pipeline use environment configuration unless they inject an +`EmbeddingConfig` directly. + ## Configuration (environment) | Variable | Meaning |