Wire cluster embedding providers

This commit is contained in:
aaltshuler 2026-06-16 04:02:08 +03:00
parent 8d2128438e
commit 16e4a833c0
16 changed files with 1132 additions and 165 deletions

View file

@ -42,7 +42,12 @@ pub(crate) fn resolve_query_decls(
return (
map.iter()
.map(|(name, config)| {
(name.clone(), QueryConfig { file: config.file.clone() })
(
name.clone(),
QueryConfig {
file: config.file.clone(),
},
)
})
.collect(),
BTreeMap::new(),
@ -66,7 +71,10 @@ pub(crate) fn resolve_query_decls(
diagnostics.push(Diagnostic::error(
"query_dir_unreadable",
format!("graphs.{graph_id}.queries"),
format!("could not list query directory '{}': {err}", resolved.display()),
format!(
"could not list query directory '{}': {err}",
resolved.display()
),
));
continue;
}
@ -76,7 +84,10 @@ pub(crate) fn resolve_query_decls(
diagnostics.push(Diagnostic::warning(
"query_dir_empty",
format!("graphs.{graph_id}.queries"),
format!("query directory '{}' contains no .gq files", resolved.display()),
format!(
"query directory '{}' contains no .gq files",
resolved.display()
),
));
}
for path in entries {
@ -132,7 +143,12 @@ pub(crate) fn resolve_query_decls(
continue;
}
origin.insert(name.clone(), declared.clone());
registry.insert(name, QueryConfig { file: declared.clone() });
registry.insert(
name,
QueryConfig {
file: declared.clone(),
},
);
}
contents.insert(declared, source);
}
@ -269,8 +285,6 @@ pub(crate) fn validate_cluster_header(
}
}
pub(crate) fn state_resource_digests(state: &ClusterState) -> BTreeMap<String, String> {
state
.applied_revision
@ -295,7 +309,6 @@ pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState {
}
}
pub(crate) async fn observe_declared_graphs(
desired: &DesiredCluster,
backend: &ClusterStore,
@ -350,19 +363,28 @@ pub(crate) async fn observe_declared_graphs(
StateResource {
digest: observation.schema_digest.clone(),
applies_to: None,
embedding_provider: None,
embedding_profile: None,
},
);
let query_digests = state_query_digests_for_graph(state, &graph.id);
let embedding_provider = state_graph_embedding_provider(state, &graph.id);
let embedding_provider_digest =
state_embedding_provider_digest(state, embedding_provider.as_deref());
let graph_digest_value = graph_digest(
&graph.id,
Some(&observation.schema_digest),
Some(&query_digests),
embedding_provider.as_deref(),
embedding_provider_digest.as_ref(),
);
state.applied_revision.resources.insert(
graph_address.clone(),
StateResource {
digest: graph_digest_value,
applies_to: None,
embedding_provider,
embedding_profile: None,
},
);
state.observations.insert(
@ -499,7 +521,6 @@ pub(crate) fn graph_observation_json(observation: GraphObservationJson<'_>) -> s
})
}
pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
let parsed = parse_cluster_config(config_dir);
let config_dir = parsed.config_dir;
@ -519,6 +540,35 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
let mut dependencies = BTreeSet::new();
let mut graph_query_digests: BTreeMap<String, BTreeMap<String, String>> = BTreeMap::new();
let mut graph_schema_digests: BTreeMap<String, String> = BTreeMap::new();
let mut graph_embedding_providers: BTreeMap<String, String> = BTreeMap::new();
let mut embedding_provider_digests: BTreeMap<String, String> = BTreeMap::new();
let mut embedding_providers: BTreeMap<String, EmbeddingProviderConfig> = BTreeMap::new();
for (provider_name, profile) in &raw.providers.embedding {
validate_id(
"embedding provider name",
&format!("providers.embedding.{provider_name}"),
provider_name,
&mut diagnostics,
);
let address = embedding_provider_address(provider_name);
profile.validate(
format!("providers.embedding.{provider_name}"),
&mut diagnostics,
);
let digest = embedding_provider_digest(profile);
embedding_provider_digests.insert(address.clone(), digest.clone());
embedding_providers.insert(address.clone(), profile.clone());
resources.insert(
address.clone(),
ResourceSummary {
address,
kind: "embedding_provider".to_string(),
digest,
path: None,
},
);
}
for (graph_id, graph) in &raw.graphs {
validate_id(
@ -533,6 +583,35 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
from: schema_address.clone(),
to: graph_address.clone(),
});
if let Some(provider_ref) = graph.embedding_provider.as_deref() {
match normalize_embedding_provider_target(provider_ref) {
EmbeddingProviderTarget::Provider(provider_name) => {
let provider_address = embedding_provider_address(&provider_name);
if raw.providers.embedding.contains_key(&provider_name) {
dependencies.insert(Dependency {
from: graph_address.clone(),
to: provider_address.clone(),
});
graph_embedding_providers.insert(graph_id.clone(), provider_address);
} else {
diagnostics.push(Diagnostic::error(
"dangling_embedding_provider_reference",
format!("graphs.{graph_id}.embedding_provider"),
format!(
"graph references embedding provider `{provider_name}`, but no providers.embedding.{provider_name} profile is declared"
),
));
}
}
EmbeddingProviderTarget::WrongKind(kind) => diagnostics.push(Diagnostic::error(
"wrong_kind_reference",
format!("graphs.{graph_id}.embedding_provider"),
format!(
"embedding_provider expects a providers.embedding ref or bare provider name, got `{kind}`"
),
)),
}
}
let schema_path = resolve_config_path(&config_dir, &graph.schema);
let schema_source = match fs::read_to_string(&schema_path) {
@ -646,10 +725,15 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
}
for graph_id in raw.graphs.keys() {
let embedding_provider = graph_embedding_providers.get(graph_id);
let embedding_provider_digest =
embedding_provider.and_then(|address| embedding_provider_digests.get(address));
let digest = graph_digest(
graph_id,
graph_schema_digests.get(graph_id),
graph_query_digests.get(graph_id),
embedding_provider.map(String::as_str),
embedding_provider_digest,
);
resources.insert(
graph_address(graph_id),
@ -754,6 +838,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
.get(graph_id)
.cloned()
.unwrap_or_default(),
embedding_provider: graph_embedding_providers.get(graph_id).cloned(),
})
.collect();
let config_digest = desired_config_digest(&raw, &resource_digests);
@ -769,6 +854,7 @@ pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome {
resources: resource_list,
dependencies,
policy_bindings,
embedding_providers,
}),
diagnostics,
config_dir,
@ -828,7 +914,6 @@ pub(crate) fn future_field_diagnostics(text: &str) -> Vec<Diagnostic> {
let future_fields = [
"apply",
"env_file",
"providers",
"pipelines",
"embeddings",
"ui",
@ -882,6 +967,21 @@ pub(crate) fn normalize_policy_target(value: &str) -> PolicyTarget {
}
}
enum EmbeddingProviderTarget {
Provider(String),
WrongKind(String),
}
fn normalize_embedding_provider_target(value: &str) -> EmbeddingProviderTarget {
if let Some(name) = value.strip_prefix("provider.embedding.") {
EmbeddingProviderTarget::Provider(name.to_string())
} else if value.contains('.') {
EmbeddingProviderTarget::WrongKind(value.to_string())
} else {
EmbeddingProviderTarget::Provider(value.to_string())
}
}
pub(crate) fn graph_address(graph_id: &str) -> String {
format!("graph.{graph_id}")
}
@ -898,6 +998,10 @@ pub(crate) fn policy_address(policy_name: &str) -> String {
format!("policy.{policy_name}")
}
pub(crate) fn embedding_provider_address(provider_name: &str) -> String {
format!("provider.embedding.{provider_name}")
}
pub(crate) fn resolve_config_path(config_dir: &Path, path: &Path) -> PathBuf {
if path.is_absolute() {
path.to_path_buf()

View file

@ -152,7 +152,9 @@ pub(crate) fn approved_resources(
let candidates: Vec<&ApprovalArtifact> = artifacts
.iter()
.map(|(_, artifact)| artifact)
.filter(|artifact| artifact.consumed_at.is_none() && artifact.resource == change.resource)
.filter(|artifact| {
artifact.consumed_at.is_none() && artifact.resource == change.resource
})
.collect();
if candidates.is_empty() {
continue;
@ -181,6 +183,7 @@ pub(crate) enum ResourceKind {
Schema(String),
Query { graph: String, name: String },
Policy(String),
EmbeddingProvider(String),
Unknown,
}
@ -199,6 +202,8 @@ pub(crate) fn resource_kind(address: &str) -> ResourceKind {
}
} else if let Some(name) = address.strip_prefix("policy.") {
ResourceKind::Policy(name.to_string())
} else if let Some(name) = address.strip_prefix("provider.embedding.") {
ResourceKind::EmbeddingProvider(name.to_string())
} else {
ResourceKind::Unknown
}
@ -261,8 +266,7 @@ pub(crate) fn classify_changes(
let (disposition, reason) = match resource_kind(&change.resource) {
ResourceKind::Schema(graph) => match change.operation {
PlanOperation::Create
if graph_creates.contains(&graph)
&& !pending_recovery.contains(&graph) =>
if graph_creates.contains(&graph) && !pending_recovery.contains(&graph) =>
{
// Applied with the graph create — the init carries it.
(ApplyDisposition::Applied, None)
@ -325,10 +329,7 @@ pub(crate) fn classify_changes(
if pending_recovery.contains(&graph) {
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
} else if schema_pending.contains(&graph) {
(
ApplyDisposition::Blocked,
Some("dependency_not_applied"),
)
(ApplyDisposition::Blocked, Some("dependency_not_applied"))
} else {
// A graph create in the same plan no longer blocks:
// creates execute first in the same apply run.
@ -353,9 +354,8 @@ pub(crate) fn classify_changes(
}
}
},
ResourceKind::Unknown => {
(ApplyDisposition::Deferred, Some("apply_unsupported_kind"))
}
ResourceKind::EmbeddingProvider(_) => (ApplyDisposition::Applied, None),
ResourceKind::Unknown => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
};
change.disposition = Some(disposition);
change.reason = reason.map(str::to_string);

View file

@ -20,18 +20,34 @@ use ulid::Ulid;
pub mod failpoints;
mod config;
mod types;
mod diff;
mod serve;
mod sweep;
mod store;
mod sweep;
mod types;
use config::{
QueriesDecl, future_field_diagnostics, graph_address, initial_import_state, load_desired,
normalize_policy_target, observe_declared_graphs, observe_live_graph, parse_cluster_config,
policy_address, preview_schema_migration, query_address, resolve_config_path,
resolve_query_decls, schema_address, state_resource_digests, validate_cluster_header,
validate_id, validate_query_source,
};
use diff::{
FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources,
classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs,
diff_resources, resource_kind,
};
pub use serve::{
ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, cluster_root_for_graph_uri,
read_serving_snapshot, read_serving_snapshot_from_storage, resolve_graph_storage_uri,
};
use store::{ClusterStore, StateLockGuard, StateSnapshot};
use sweep::{
mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars,
tombstone_graph_subtree, warn_pending_recovery_sidecars,
};
pub use types::*;
use types::*;
pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, cluster_root_for_graph_uri, read_serving_snapshot, read_serving_snapshot_from_storage, resolve_graph_storage_uri};
use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source};
use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind};
use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars};
pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml";
pub const CLUSTER_GRAPHS_DIR: &str = "graphs";
@ -44,10 +60,7 @@ pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals";
/// The store for a load outcome: the declared `storage:` root when present,
/// the config directory itself otherwise. A bad root is a loud error.
fn store_for(
config_dir: &Path,
storage_root: Option<&str>,
) -> Result<ClusterStore, Diagnostic> {
fn store_for(config_dir: &Path, storage_root: Option<&str>) -> Result<ClusterStore, Diagnostic> {
match storage_root {
Some(root) => ClusterStore::for_storage_root(root),
None => Ok(ClusterStore::for_config_dir(config_dir)),
@ -179,7 +192,12 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
&desired.config_digest,
&mut diagnostics,
);
classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new(), &approved);
classify_changes(
&mut changes,
&desired.dependencies,
&BTreeSet::new(),
&approved,
);
// Embed real migration steps for schema updates so plan is a data-aware
// preview; failures degrade to the digest diff with a warning.
@ -282,9 +300,7 @@ pub async fn apply_config_dir_with_options(
ok: !has_errors(&diagnostics),
config_dir,
actor: actor_for_output.clone(),
desired_revision: DesiredRevision {
config_digest,
},
desired_revision: DesiredRevision { config_digest },
state_observations: observations,
changes,
applied_count: 0,
@ -464,8 +480,7 @@ pub async fn apply_config_dir_with_options(
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
continue;
}
let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id)
else {
let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else {
continue;
};
let graph_uri = backend.graph_root(graph_id);
@ -604,8 +619,7 @@ pub async fn apply_config_dir_with_options(
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
continue;
}
let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id)
else {
let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else {
continue;
};
let graph_uri = backend.graph_root(graph_id);
@ -955,8 +969,10 @@ pub async fn apply_config_dir_with_options(
.expect("create/update always carries an after digest"),
// Policies record their applied bindings so the
// ledger is serving-sufficient (RFC-005 §D3).
applies_to: desired
.policy_bindings
applies_to: desired.policy_bindings.get(&change.resource).cloned(),
embedding_provider: None,
embedding_profile: desired
.embedding_providers
.get(&change.resource)
.cloned(),
},
@ -964,7 +980,10 @@ pub async fn apply_config_dir_with_options(
set_resource_status_applied(&mut new_state, &change.resource);
}
PlanOperation::Delete => {
new_state.applied_revision.resources.remove(&change.resource);
new_state
.applied_revision
.resources
.remove(&change.resource);
new_state.resource_statuses.remove(&change.resource);
}
},
@ -1219,7 +1238,6 @@ pub async fn approve_config_dir(
}
}
pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
let parsed = parse_cluster_config(config_dir.as_ref());
let mut diagnostics = parsed.diagnostics;
@ -1238,7 +1256,9 @@ pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
}
};
let mut observations = backend.observations();
backend.observe_lock(&mut observations, &mut diagnostics).await;
backend
.observe_lock(&mut observations, &mut diagnostics)
.await;
warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics);
let mut resource_digests = BTreeMap::new();
@ -1254,9 +1274,7 @@ pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
// Read-only point-in-time catalog check: report the
// findings as diagnostics; persisting Drifted statuses
// is refresh's job. Status never writes state.
for (address, finding) in
verify_catalog_payloads(&backend, &state).await
{
for (address, finding) in verify_catalog_payloads(&backend, &state).await {
diagnostics.push(payload_finding_diagnostic(&address, &finding));
}
resource_digests = state_resource_digests(&state);
@ -1312,7 +1330,10 @@ pub async fn force_unlock_config_dir(
if let Some(raw) = parsed.raw.as_ref() {
let _settings = validate_cluster_header(raw, &mut diagnostics);
if !has_errors(&diagnostics) {
match backend.force_unlock(lock_id.as_ref(), &mut observations).await {
match backend
.force_unlock(lock_id.as_ref(), &mut observations)
.await
{
Ok(()) => lock_removed = true,
Err(diagnostic) => diagnostics.push(diagnostic),
}
@ -1380,7 +1401,10 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
let operation_label = state_sync_operation_label(operation);
let _lock_guard = if desired.state_lock {
match backend.acquire_lock(operation_label, &mut observations).await {
match backend
.acquire_lock(operation_label, &mut observations)
.await
{
Ok(guard) => Some(guard),
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -1542,7 +1566,10 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
state.state_revision = state.state_revision.saturating_add(1);
}
match backend.write_state(&state, expected_cas.as_deref(), &mut observations).await {
match backend
.write_state(&state, expected_cas.as_deref(), &mut observations)
.await
{
Ok(()) => {
// Completed sweep sidecars are deleted only after their outcome
// is durably recorded; on failure they stay and re-sweep.
@ -1569,9 +1596,6 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
}
}
#[derive(Debug, PartialEq, Eq)]
enum PayloadFinding {
Missing,
@ -1650,7 +1674,10 @@ async fn write_resource_payload(
Diagnostic::error(
"resource_payload_write_error",
resource,
format!("could not read resource source '{}': {err}", source.display()),
format!(
"could not read resource source '{}': {err}",
source.display()
),
)
})?;
if sha256_hex(&bytes) != expected_digest {
@ -1692,7 +1719,11 @@ async fn write_resource_payload(
fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredCluster) {
for graph in &desired.graphs {
let graph_address = graph_address(&graph.id);
if !state.applied_revision.resources.contains_key(&graph_address) {
if !state
.applied_revision
.resources
.contains_key(&graph_address)
{
continue;
}
let schema_digest = state
@ -1701,11 +1732,26 @@ fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredClus
.get(&schema_address(&graph.id))
.map(|resource| resource.digest.clone());
let query_digests = state_query_digests_for_graph(state, &graph.id);
let digest = graph_digest(&graph.id, schema_digest.as_ref(), Some(&query_digests));
state
.applied_revision
.resources
.insert(graph_address, StateResource { digest, applies_to: None });
let embedding_provider = graph.embedding_provider.as_deref();
let embedding_provider_digest = embedding_provider
.and_then(|address| state.applied_revision.resources.get(address))
.map(|resource| resource.digest.clone());
let digest = graph_digest(
&graph.id,
schema_digest.as_ref(),
Some(&query_digests),
embedding_provider,
embedding_provider_digest.as_ref(),
);
state.applied_revision.resources.insert(
graph_address,
StateResource {
digest,
applies_to: None,
embedding_provider: graph.embedding_provider.clone(),
embedding_profile: None,
},
);
}
}
@ -1773,7 +1819,6 @@ fn duplicate_key_diagnostics(text: &str) -> Vec<Diagnostic> {
diagnostics
}
fn strip_comment(line: &str) -> String {
let mut in_single_quote = false;
let mut in_double_quote = false;
@ -1796,7 +1841,6 @@ fn strip_comment(line: &str) -> String {
line.to_string()
}
fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap<String, String> {
let prefix = format!("query.{graph_id}.");
state
@ -1811,6 +1855,23 @@ fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeM
.collect()
}
fn state_graph_embedding_provider(state: &ClusterState, graph_id: &str) -> Option<String> {
state
.applied_revision
.resources
.get(&graph_address(graph_id))
.and_then(|resource| resource.embedding_provider.clone())
}
fn state_embedding_provider_digest(
state: &ClusterState,
embedding_provider: Option<&str>,
) -> Option<String> {
embedding_provider
.and_then(|address| state.applied_revision.resources.get(address))
.map(|resource| resource.digest.clone())
}
fn set_resource_status_applied(state: &mut ClusterState, address: &str) {
state.resource_statuses.insert(
address.to_string(),
@ -1843,6 +1904,8 @@ fn graph_digest(
graph_id: &str,
schema_digest: Option<&String>,
query_digests: Option<&BTreeMap<String, String>>,
embedding_provider: Option<&str>,
embedding_provider_digest: Option<&String>,
) -> String {
let mut input = format!(
"graph\0{graph_id}\0schema\0{}\0",
@ -1857,6 +1920,21 @@ fn graph_digest(
input.push('\0');
}
}
if let Some(provider) = embedding_provider {
input.push_str("embedding_provider\0");
input.push_str(provider);
input.push('\0');
input.push_str(embedding_provider_digest.map_or("", String::as_str));
input.push('\0');
}
sha256_hex(input.as_bytes())
}
fn embedding_provider_digest(profile: &EmbeddingProviderConfig) -> String {
let mut input = String::from("embedding-provider\0");
let config_semantics =
serde_json::to_string(profile).expect("embedding provider config must serialize");
input.push_str(&config_semantics);
sha256_hex(input.as_bytes())
}
@ -1930,7 +2008,6 @@ fn display_path(path: &Path) -> String {
path.display().to_string()
}
#[cfg(test)]
#[path = "tests.rs"]
mod tests;

View file

@ -8,6 +8,7 @@ use super::*;
pub struct ServingGraph {
pub graph_id: String,
pub root: PathBuf,
pub embedding: Option<EmbeddingProviderConfig>,
}
/// One stored query: its graph binding, registry name, and verified source.
@ -111,7 +112,10 @@ pub async fn cluster_root_for_graph_uri(graph_uri: &str) -> Option<String> {
///
/// `cluster` is a config directory or a storage-root URI (`s3://…`, config-free),
/// mirroring the server's `--cluster` dispatch.
pub async fn resolve_graph_storage_uri(cluster: &str, graph_id: &str) -> Result<String, Diagnostic> {
pub async fn resolve_graph_storage_uri(
cluster: &str,
graph_id: &str,
) -> Result<String, Diagnostic> {
let backend = if cluster.contains("://") {
ClusterStore::for_storage_root(cluster)?
} else {
@ -200,15 +204,73 @@ async fn read_snapshot_with_store(
return Err(diagnostics);
};
let mut embedding_profiles: BTreeMap<String, EmbeddingProviderConfig> = BTreeMap::new();
for (address, entry) in &state.applied_revision.resources {
if !matches!(resource_kind(address), ResourceKind::EmbeddingProvider(_)) {
continue;
}
let Some(profile) = entry.embedding_profile.clone() else {
diagnostics.push(Diagnostic::error(
"embedding_provider_profile_missing",
address.clone(),
"no applied embedding provider profile recorded; re-run `cluster apply` to backfill",
));
continue;
};
let actual_digest = embedding_provider_digest(&profile);
if actual_digest != entry.digest {
diagnostics.push(Diagnostic::error(
"embedding_provider_digest_mismatch",
address.clone(),
format!(
"applied embedding provider profile does not match its recorded digest (actual sha256:{actual_digest}); run `cluster refresh` then `cluster apply`, and restart"
),
));
continue;
}
embedding_profiles.insert(address.clone(), profile);
}
let mut graphs = Vec::new();
let mut queries = Vec::new();
let mut policies = Vec::new();
for (address, entry) in &state.applied_revision.resources {
match resource_kind(address) {
ResourceKind::Graph(graph_id) => {
let embedding = match entry.embedding_provider.as_deref() {
Some(provider_address) => match resource_kind(provider_address) {
ResourceKind::EmbeddingProvider(_) => {
match embedding_profiles.get(provider_address) {
Some(profile) => Some(profile.clone()),
None => {
diagnostics.push(Diagnostic::error(
"embedding_provider_missing",
address.clone(),
format!(
"graph references `{provider_address}`, but no applied embedding provider profile is available; re-run `cluster apply`"
),
));
None
}
}
}
_ => {
diagnostics.push(Diagnostic::error(
"wrong_kind_reference",
address.clone(),
format!(
"graph embedding_provider expects `provider.embedding.<name>`, got `{provider_address}`"
),
));
None
}
},
None => None,
};
graphs.push(ServingGraph {
root: PathBuf::from(backend.graph_root(&graph_id)),
graph_id,
embedding,
});
}
ResourceKind::Schema(_) => {}
@ -216,7 +278,10 @@ async fn read_snapshot_with_store(
let ResourceKind::Query { graph, name } = &kind else {
unreachable!()
};
match backend.read_verified_payload(&kind, &entry.digest, address).await {
match backend
.read_verified_payload(&kind, &entry.digest, address)
.await
{
Ok(source) => queries.push(ServingQuery {
graph_id: graph.clone(),
name: name.clone(),
@ -237,7 +302,10 @@ async fn read_snapshot_with_store(
));
continue;
};
match backend.read_verified_payload(&kind, &entry.digest, address).await {
match backend
.read_verified_payload(&kind, &entry.digest, address)
.await
{
Ok(source) => policies.push(ServingPolicy {
name: name.clone(),
source,
@ -246,6 +314,7 @@ async fn read_snapshot_with_store(
Err(diagnostic) => diagnostics.push(diagnostic),
}
}
ResourceKind::EmbeddingProvider(_) => {}
ResourceKind::Unknown => {}
}
}
@ -313,4 +382,3 @@ mod tests {
);
}
}

View file

@ -19,13 +19,29 @@ pub(crate) async fn sweep_recovery_sidecars(
for (path, sidecar) in backend.list_recovery_sidecars(diagnostics).await {
match sidecar.kind {
RecoverySidecarKind::GraphCreate => {
sweep_graph_create_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await;
sweep_graph_create_sidecar(
backend,
path,
sidecar,
state,
diagnostics,
&mut outcome,
)
.await;
}
RecoverySidecarKind::SchemaApply => {
sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await;
}
RecoverySidecarKind::GraphDelete => {
sweep_graph_delete_sidecar(backend, path, sidecar, state, diagnostics, &mut outcome).await;
sweep_graph_delete_sidecar(
backend,
path,
sidecar,
state,
diagnostics,
&mut outcome,
)
.await;
}
}
}
@ -71,15 +87,30 @@ pub(crate) async fn sweep_graph_create_sidecar(
StateResource {
digest: live_digest.clone(),
applies_to: None,
embedding_provider: None,
embedding_profile: None,
},
);
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
let composite =
graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests));
state
.applied_revision
.resources
.insert(graph_address.clone(), StateResource { digest: composite, applies_to: None });
let embedding_provider = state_graph_embedding_provider(state, &sidecar.graph_id);
let embedding_provider_digest =
state_embedding_provider_digest(state, embedding_provider.as_deref());
let composite = graph_digest(
&sidecar.graph_id,
Some(&live_digest),
Some(&query_digests),
embedding_provider.as_deref(),
embedding_provider_digest.as_ref(),
);
state.applied_revision.resources.insert(
graph_address.clone(),
StateResource {
digest: composite,
applies_to: None,
embedding_provider,
embedding_profile: None,
},
);
set_resource_status_applied(state, &graph_address);
set_resource_status_applied(state, &schema_addr);
state.recovery_records.insert(
@ -200,14 +231,30 @@ pub(crate) async fn sweep_schema_apply_sidecar(
StateResource {
digest: live_digest.clone(),
applies_to: None,
embedding_provider: None,
embedding_profile: None,
},
);
let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id);
let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests));
state
.applied_revision
.resources
.insert(graph_address.clone(), StateResource { digest: composite, applies_to: None });
let embedding_provider = state_graph_embedding_provider(state, &sidecar.graph_id);
let embedding_provider_digest =
state_embedding_provider_digest(state, embedding_provider.as_deref());
let composite = graph_digest(
&sidecar.graph_id,
Some(&live_digest),
Some(&query_digests),
embedding_provider.as_deref(),
embedding_provider_digest.as_ref(),
);
state.applied_revision.resources.insert(
graph_address.clone(),
StateResource {
digest: composite,
applies_to: None,
embedding_provider,
embedding_profile: None,
},
);
set_resource_status_applied(state, &graph_address);
set_resource_status_applied(state, &schema_addr);
state.recovery_records.insert(
@ -274,7 +321,11 @@ pub(crate) async fn sweep_graph_delete_sidecar(
return;
}
if !state.applied_revision.resources.contains_key(&graph_address) {
if !state
.applied_revision
.resources
.contains_key(&graph_address)
{
// Row 7: already tombstoned (or never recorded); crash fell between
// the state CAS and sidecar delete.
outcome.completed_sidecars.push(path);
@ -283,7 +334,12 @@ pub(crate) async fn sweep_graph_delete_sidecar(
// Row 7b: the root is gone, the ledger is stale — roll forward the
// tombstone, consume the approval the sidecar carries, audit.
tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref());
tombstone_graph_subtree(
state,
&sidecar.graph_id,
sidecar.approval_id.as_deref(),
sidecar.actor.as_deref(),
);
state.recovery_records.insert(
sidecar.operation_id.clone(),
json!({
@ -342,7 +398,11 @@ pub(crate) fn tombstone_graph_subtree(
/// Record approval consumption in the state ledger. The artifact FILE is
/// rewritten with consumed_at only after the state write lands, so a failed
/// CAS leaves the approval valid for the retry.
pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) {
pub(crate) fn record_approval_consumed(
state: &mut ClusterState,
approval_id: &str,
operation_id: &str,
) {
state.approval_records.insert(
approval_id.to_string(),
json!({

View file

@ -56,6 +56,39 @@ policies:
dir
}
fn write_mock_embedding_cluster(config_dir: &Path, model: &str) {
fs::write(
config_dir.join(CLUSTER_CONFIG_FILE),
format!(
r#"
version: 1
metadata:
name: test
state:
backend: cluster
lock: true
providers:
embedding:
default:
kind: mock
model: {model}
graphs:
knowledge:
schema: ./people.pg
embedding_provider: default
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [knowledge]
"#
),
)
.unwrap();
}
async fn init_derived_graph(root: &Path) {
let graph_dir = root.join(CLUSTER_GRAPHS_DIR);
fs::create_dir_all(&graph_dir).unwrap();
@ -194,6 +227,95 @@ policies:
assert!(codes.contains("dangling_graph_reference"));
}
#[test]
fn embedding_provider_config_accepts_provider_resources_and_graph_refs() {
let dir = fixture();
write_mock_embedding_cluster(dir.path(), "recorded-x");
let out = validate_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
let provider_digest = out
.resource_digests
.get("provider.embedding.default")
.expect("provider resource digest");
assert!(
out.resources
.iter()
.any(|resource| resource.address == "provider.embedding.default"
&& resource.kind == "embedding_provider"
&& resource.path.is_none())
);
assert!(
out.dependencies
.iter()
.any(|dep| dep.from == "graph.knowledge" && dep.to == "provider.embedding.default"),
"{:?}",
out.dependencies
);
let schema_digest = out.resource_digests.get("schema.knowledge").unwrap();
let query_digest = out
.resource_digests
.get("query.knowledge.find_person")
.unwrap();
let expected_graph_digest = graph_digest(
"knowledge",
Some(schema_digest),
Some(
&[("find_person".to_string(), query_digest.clone())]
.into_iter()
.collect(),
),
Some("provider.embedding.default"),
Some(provider_digest),
);
assert_eq!(
out.resource_digests["graph.knowledge"],
expected_graph_digest
);
}
#[test]
fn embedding_provider_config_rejects_bad_refs_and_inline_secrets() {
let dir = fixture();
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
r#"
version: 1
providers:
embedding:
default:
kind: openai-compatible
api_key: sk-inline
graphs:
knowledge:
schema: ./people.pg
embedding_provider: provider.policy.default
missing_provider:
schema: ./people.pg
embedding_provider: absent
"#,
)
.unwrap();
let out = validate_config_dir(dir.path());
assert!(!out.ok);
let codes: BTreeSet<_> = out.diagnostics.iter().map(|d| d.code.as_str()).collect();
assert!(
codes.contains("embedding_api_key_inline"),
"{:?}",
out.diagnostics
);
assert!(
codes.contains("wrong_kind_reference"),
"{:?}",
out.diagnostics
);
assert!(
codes.contains("dangling_embedding_provider_reference"),
"{:?}",
out.diagnostics
);
}
#[test]
fn query_key_mismatch_fails() {
let dir = fixture();
@ -1012,8 +1134,13 @@ graphs:
let out = validate_config_dir(config_dir);
assert!(out.ok, "{:?}", out.diagnostics);
let schema_digest = out.resource_digests.get("schema.knowledge").unwrap().clone();
let graph_composite =
graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new()));
let graph_composite = graph_digest(
"knowledge",
Some(&schema_digest),
Some(&BTreeMap::new()),
None,
None,
);
write_state_resources(
config_dir,
&[
@ -1122,6 +1249,8 @@ graphs:
.into_iter()
.collect(),
),
None,
None,
);
assert_eq!(resources["graph.knowledge"]["digest"], expected_composite);
assert_eq!(
@ -1136,6 +1265,117 @@ graphs:
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[tokio::test]
async fn apply_records_embedding_provider_profile_and_graph_binding() {
let dir = fixture();
write_mock_embedding_cluster(dir.path(), "recorded-x");
write_applyable_state(dir.path());
let desired = validate_config_dir(dir.path());
let query_digest = desired
.resource_digests
.get("query.knowledge.find_person")
.unwrap()
.clone();
let schema_digest = desired
.resource_digests
.get("schema.knowledge")
.unwrap()
.clone();
let provider_digest = desired
.resource_digests
.get("provider.embedding.default")
.unwrap()
.clone();
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.converged, "{out:?}");
let state = read_state_json(dir.path());
let resources = &state["applied_revision"]["resources"];
let provider = resources["provider.embedding.default"]
.as_object()
.expect("provider resource");
assert_eq!(provider["digest"], provider_digest);
assert_eq!(provider["embedding_profile"]["kind"], "mock");
assert_eq!(provider["embedding_profile"]["model"], "recorded-x");
assert!(provider["embedding_profile"].get("api_key").is_none());
assert_eq!(
resources["graph.knowledge"]["embedding_provider"],
"provider.embedding.default"
);
let expected_graph_digest = graph_digest(
"knowledge",
Some(&schema_digest),
Some(
&[("find_person".to_string(), query_digest)]
.into_iter()
.collect(),
),
Some("provider.embedding.default"),
Some(&provider_digest),
);
assert_eq!(resources["graph.knowledge"]["digest"], expected_graph_digest);
}
#[tokio::test]
async fn embedding_provider_changes_update_provider_and_graph_plan() {
let dir = fixture();
write_mock_embedding_cluster(dir.path(), "recorded-x");
write_applyable_state(dir.path());
let first = apply_config_dir(dir.path()).await;
assert!(first.ok && first.converged, "{first:?}");
write_mock_embedding_cluster(dir.path(), "recorded-y");
let plan = plan_config_dir(dir.path()).await;
assert!(plan.ok, "{:?}", plan.diagnostics);
let by_resource: BTreeMap<&str, &PlanChange> = plan
.changes
.iter()
.map(|change| (change.resource.as_str(), change))
.collect();
assert_eq!(
by_resource["provider.embedding.default"].operation,
PlanOperation::Update
);
assert_eq!(
by_resource["provider.embedding.default"].disposition,
Some(ApplyDisposition::Applied)
);
assert_eq!(
by_resource["graph.knowledge"].operation,
PlanOperation::Update
);
assert_eq!(
by_resource["graph.knowledge"].disposition,
Some(ApplyDisposition::Derived)
);
}
#[tokio::test]
async fn embedding_binding_survives_refresh() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_mock_embedding_cluster(dir.path(), "recorded-x");
write_applyable_state(dir.path());
let apply = apply_config_dir(dir.path()).await;
assert!(apply.ok && apply.converged, "{apply:?}");
let refresh = refresh_config_dir(dir.path()).await;
assert!(refresh.ok, "{:?}", refresh.diagnostics);
let state = read_state_json(dir.path());
let resources = &state["applied_revision"]["resources"];
assert_eq!(
resources["graph.knowledge"]["embedding_provider"],
"provider.embedding.default"
);
assert_eq!(
resources["provider.embedding.default"]["embedding_profile"]["model"],
"recorded-x"
);
}
fn desired_revision_digest(out: &ApplyOutput) -> String {
out.desired_revision.config_digest.clone().unwrap()
}
@ -1150,8 +1390,13 @@ graphs:
.unwrap()
.clone();
let old_digest = "0".repeat(64);
let graph_composite =
graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new()));
let graph_composite = graph_digest(
"knowledge",
Some(&schema_digest),
Some(&BTreeMap::new()),
None,
None,
);
write_state_resources(
dir.path(),
&[
@ -1190,8 +1435,13 @@ graphs:
.clone();
let stale_query_digest = "1".repeat(64);
let stale_policy_digest = "2".repeat(64);
let graph_composite =
graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new()));
let graph_composite = graph_digest(
"knowledge",
Some(&schema_digest),
Some(&BTreeMap::new()),
None,
None,
);
write_state_resources(
dir.path(),
&[
@ -1234,6 +1484,8 @@ graphs:
"knowledge",
Some(&schema_digest),
Some(&[("find_person".to_string(), query_digest)].into_iter().collect()),
None,
None,
);
assert_eq!(resources["graph.knowledge"]["digest"], expected_composite);
}
@ -1494,8 +1746,13 @@ graphs:
.get("schema.knowledge")
.unwrap()
.clone();
let graph_composite =
graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new()));
let graph_composite = graph_digest(
"knowledge",
Some(&schema_digest),
Some(&BTreeMap::new()),
None,
None,
);
write_state_resources(
dir.path(),
&[
@ -2864,6 +3121,54 @@ policies:
assert!(snapshot.policies[0].source.contains("rules:"));
}
#[tokio::test]
async fn serving_snapshot_uses_applied_embedding_provider_profile() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_mock_embedding_cluster(dir.path(), "recorded-x");
write_applyable_state(dir.path());
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
let snapshot = read_serving_snapshot(dir.path()).await.unwrap();
let profile = snapshot.graphs[0].embedding.as_ref().unwrap();
assert_eq!(profile.kind.as_deref(), Some("mock"));
assert_eq!(profile.model.as_deref(), Some("recorded-x"));
}
#[tokio::test]
async fn serving_snapshot_refuses_missing_embedding_provider_metadata() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_mock_embedding_cluster(dir.path(), "recorded-x");
write_applyable_state(dir.path());
let converge = apply_config_dir(dir.path()).await;
assert!(converge.converged, "{converge:?}");
let mut state = read_state_json(dir.path());
state["applied_revision"]["resources"]["provider.embedding.default"]
.as_object_mut()
.unwrap()
.remove("embedding_profile");
fs::write(
dir.path().join(CLUSTER_STATE_FILE),
serde_json::to_string_pretty(&state).unwrap(),
)
.unwrap();
let err = read_serving_snapshot(dir.path()).await.unwrap_err();
assert!(
err.iter()
.any(|diagnostic| diagnostic.code == "embedding_provider_profile_missing"),
"{err:?}"
);
assert!(
err.iter()
.any(|diagnostic| diagnostic.code == "embedding_provider_missing"),
"{err:?}"
);
}
#[tokio::test]
async fn serving_snapshot_refuses_missing_state() {
let dir = fixture();

View file

@ -325,6 +325,7 @@ pub(crate) struct DesiredCluster {
/// The declared `storage:` root, if any (None ⇒ the config dir itself).
pub(crate) storage_root: Option<String>,
pub(crate) state_lock: bool,
pub(crate) embedding_providers: BTreeMap<String, EmbeddingProviderConfig>,
pub(crate) graphs: Vec<DesiredGraph>,
pub(crate) resource_digests: BTreeMap<String, String>,
pub(crate) resources: Vec<ResourceSummary>,
@ -337,6 +338,7 @@ pub(crate) struct DesiredCluster {
pub(crate) struct DesiredGraph {
pub(crate) id: String,
pub(crate) schema_digest: String,
pub(crate) embedding_provider: Option<String>,
}
#[derive(Debug)]
@ -376,6 +378,8 @@ pub(crate) struct RawClusterConfig {
#[serde(default)]
pub(crate) state: StateConfig,
#[serde(default)]
pub(crate) providers: ProvidersConfig,
#[serde(default)]
pub(crate) graphs: BTreeMap<String, GraphConfig>,
#[serde(default)]
pub(crate) policies: BTreeMap<String, PolicyConfig>,
@ -394,41 +398,99 @@ pub(crate) struct StateConfig {
pub(crate) lock: Option<bool>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub(crate) struct ProvidersConfig {
#[serde(default)]
pub(crate) embedding: BTreeMap<String, EmbeddingProviderConfig>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub(crate) struct GraphConfig {
pub(crate) schema: PathBuf,
#[serde(default)]
pub(crate) queries: QueriesDecl,
/// Optional per-graph embedding provider profile (RFC-012 Phase 5).
/// Optional reference to a top-level `providers.embedding.<name>` profile.
#[serde(default)]
pub(crate) embeddings: Option<EmbeddingProfile>,
pub(crate) embedding_provider: Option<String>,
}
/// A graph's embedding provider profile (RFC-012 Phase 5). `provider`/`base_url`/
/// `model` default exactly as the engine's `EmbeddingConfig::from_env` does;
/// `api_key` is a `${NAME}` env reference resolved at serving boot, never an
/// inline secret.
/// A named cluster embedding provider profile (RFC-012 Phase 5). `kind`/`base_url`/
/// `model` default exactly as the engine's `EmbeddingConfig::from_env` does.
/// `api_key`, when required, must be a `${NAME}` env reference resolved at
/// serving boot, never an inline secret.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct EmbeddingProfile {
#[serde(default)]
pub provider: Option<String>,
#[serde(default)]
pub struct EmbeddingProviderConfig {
#[serde(default, alias = "provider", skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub base_url: Option<String>,
#[serde(default)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
pub api_key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub api_key: Option<String>,
}
impl EmbeddingProfile {
impl EmbeddingProviderConfig {
pub(crate) fn validate(&self, path: String, diagnostics: &mut Vec<Diagnostic>) {
if let Err(error) = omnigraph::embedding::EmbeddingConfig::from_parts(
self.kind.as_deref(),
self.base_url.clone(),
self.model.clone(),
"validation-placeholder".to_string(),
) {
diagnostics.push(Diagnostic::error(
"invalid_embedding_provider",
path.clone(),
error.to_string(),
));
}
if self.kind.as_deref() == Some("mock") {
if let Some(api_key) = self.api_key.as_deref() {
if secret_ref_name(api_key).is_err() {
diagnostics.push(Diagnostic::error(
"embedding_api_key_inline",
format!("{path}.api_key"),
"embedding api_key must be a ${NAME} env reference, not an inline secret",
));
}
}
return;
}
match self.api_key.as_deref() {
Some(api_key) if secret_ref_name(api_key).is_err() => diagnostics.push(
Diagnostic::error(
"embedding_api_key_inline",
format!("{path}.api_key"),
"embedding api_key must be a ${NAME} env reference, not an inline secret",
),
),
Some(_) => {}
None => diagnostics.push(Diagnostic::error(
"embedding_api_key_required",
format!("{path}.api_key"),
"non-mock embedding providers must set api_key to a ${NAME} env reference",
)),
}
}
/// Resolve into an engine `EmbeddingConfig`, reading the `${NAME}` api-key
/// reference from process env. Errors if `api_key` is not a `${NAME}`
/// reference or the named var is unset.
/// reference from process env. Mock profiles do not read env and may omit
/// `api_key`; real providers error if the reference is missing or unset.
pub fn resolve(&self) -> Result<omnigraph::embedding::EmbeddingConfig, String> {
let api_key = resolve_secret_ref(&self.api_key)?;
let api_key = if self.kind.as_deref() == Some("mock") {
String::new()
} else {
resolve_secret_ref(self.api_key.as_deref().ok_or_else(|| {
"embedding api_key is required for non-mock providers".to_string()
})?)?
};
omnigraph::embedding::EmbeddingConfig::from_parts(
self.provider.as_deref(),
self.kind.as_deref(),
self.base_url.clone(),
self.model.clone(),
api_key,
@ -437,16 +499,21 @@ impl EmbeddingProfile {
}
}
/// Resolve a `${NAME}` secret reference from process env. Rejects an inline value
/// (anything not wrapped in `${…}`) so secrets never sit in the cluster config.
fn resolve_secret_ref(value: &str) -> Result<String, String> {
let name = value
fn secret_ref_name(value: &str) -> Result<&str, String> {
value
.trim()
.strip_prefix("${")
.and_then(|s| s.strip_suffix('}'))
.filter(|name| !name.trim().is_empty())
.ok_or_else(|| {
format!("embedding api_key must be a ${{NAME}} env reference, got '{}'", value.trim())
})?;
})
}
/// Resolve a `${NAME}` secret reference from process env. Rejects an inline value
/// (anything not wrapped in `${…}`) so secrets never sit in the cluster config.
fn resolve_secret_ref(value: &str) -> Result<String, String> {
let name = secret_ref_name(value)?;
std::env::var(name).map_err(|_| format!("embedding api_key env var '{name}' is not set"))
}
@ -505,6 +572,16 @@ pub(crate) struct StateResource {
/// non-policy resources.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) applies_to: Option<Vec<String>>,
/// Graph resources only: the applied `provider.embedding.<name>` binding.
/// The provider profile itself is stored on the provider resource so
/// serving can boot without re-reading mutable desired config.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) embedding_provider: Option<String>,
/// Embedding provider resources only: the applied profile with unresolved
/// `${ENV}` references. The server resolves the referenced env var exactly
/// once at boot and injects the resulting engine config into the graph.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) embedding_profile: Option<EmbeddingProviderConfig>,
}
#[derive(Debug, Serialize, Deserialize)]
@ -568,18 +645,18 @@ pub(crate) struct SweepOutcome {
}
#[cfg(test)]
mod embedding_profile_tests {
use super::EmbeddingProfile;
mod embedding_provider_config_tests {
use super::EmbeddingProviderConfig;
#[test]
fn resolves_secret_from_env_and_applies_defaults() {
// SAFETY: a unique var name, no concurrent reader.
unsafe { std::env::set_var("OG_TEST_EMBED_KEY_A", "secret-x") };
let profile = EmbeddingProfile {
provider: Some("openai-compatible".to_string()),
let profile = EmbeddingProviderConfig {
kind: Some("openai-compatible".to_string()),
base_url: None,
model: Some("m".to_string()),
api_key: "${OG_TEST_EMBED_KEY_A}".to_string(),
api_key: Some("${OG_TEST_EMBED_KEY_A}".to_string()),
};
let config = profile.resolve().unwrap();
assert_eq!(config.api_key, "secret-x");
@ -589,11 +666,11 @@ mod embedding_profile_tests {
#[test]
fn rejects_inline_api_key() {
let profile = EmbeddingProfile {
provider: None,
let profile = EmbeddingProviderConfig {
kind: None,
base_url: None,
model: None,
api_key: "sk-inline".to_string(),
api_key: Some("sk-inline".to_string()),
};
let err = profile.resolve().unwrap_err();
assert!(err.contains("${NAME}"), "got: {err}");
@ -601,11 +678,11 @@ mod embedding_profile_tests {
#[test]
fn errors_on_unset_secret() {
let profile = EmbeddingProfile {
provider: None,
let profile = EmbeddingProviderConfig {
kind: None,
base_url: None,
model: None,
api_key: "${OG_TEST_DEFINITELY_UNSET_VAR}".to_string(),
api_key: Some("${OG_TEST_DEFINITELY_UNSET_VAR}".to_string()),
};
let err = profile.resolve().unwrap_err();
assert!(err.contains("not set"), "got: {err}");
@ -614,14 +691,26 @@ mod embedding_profile_tests {
#[test]
fn rejects_unknown_provider() {
unsafe { std::env::set_var("OG_TEST_EMBED_KEY_B", "x") };
let profile = EmbeddingProfile {
provider: Some("cohere".to_string()),
let profile = EmbeddingProviderConfig {
kind: Some("cohere".to_string()),
base_url: None,
model: None,
api_key: "${OG_TEST_EMBED_KEY_B}".to_string(),
api_key: Some("${OG_TEST_EMBED_KEY_B}".to_string()),
};
let err = profile.resolve().unwrap_err();
assert!(err.contains("unknown embedding provider"), "got: {err}");
unsafe { std::env::remove_var("OG_TEST_EMBED_KEY_B") };
}
#[test]
fn mock_does_not_require_secret_env() {
let profile = EmbeddingProviderConfig {
kind: Some("mock".to_string()),
base_url: None,
model: Some("cluster-mock".to_string()),
api_key: None,
};
let config = profile.resolve().unwrap();
assert_eq!(config.model, "cluster-mock");
}
}

View file

@ -218,6 +218,9 @@ pub struct GraphStartupConfig {
pub graph_id: String,
pub uri: String,
pub policy: Option<PolicySource>,
/// Pre-resolved embedding config from an applied cluster provider profile.
/// Legacy config paths leave this unset and continue to use env resolution.
pub embedding: Option<omnigraph::embedding::EmbeddingConfig>,
/// Per-graph stored-query registry, loaded and identity-checked at
/// settings-build time; type-checked against the schema when this
/// graph's engine opens.
@ -1156,6 +1159,11 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>>
let db = Omnigraph::open(&uri)
.await
.map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
let db = if let Some(embedding) = cfg.embedding {
db.with_embedding_config(Arc::new(embedding))
} else {
db
};
// Validate this graph's stored queries against the live schema and
// resolve them to an attachable handle (refuse boot on breakage).
@ -1190,4 +1198,3 @@ async fn shutdown_signal() {
info!("shutdown signal received");
}

View file

@ -99,6 +99,15 @@ pub(crate) async fn load_cluster_settings(
graph_id: graph.graph_id.clone(),
uri: graph.root.to_string_lossy().to_string(),
policy: graph_policies.get(&graph.graph_id).cloned(),
embedding: graph
.embedding
.as_ref()
.map(|profile| {
profile.resolve().map_err(|err| {
eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
})
})
.transpose()?,
queries: registry,
});
}
@ -245,6 +254,7 @@ pub async fn load_server_settings(
graph_id: name.clone(),
uri,
policy: config.resolve_target_policy_file(name).map(PolicySource::File),
embedding: None,
queries,
});
}
@ -748,6 +758,7 @@ server:
.to_string_lossy()
.into_owned(),
policy: None,
embedding: None,
queries: crate::queries::QueryRegistry::default(),
}],
config_path: temp.path().join("omnigraph.yaml"),

View file

@ -5,9 +5,12 @@ use std::fs;
use axum::body::{Body, to_bytes};
use axum::http::{Method, Request, StatusCode};
use omnigraph_server::api::ErrorOutput;
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_server::api::{ErrorOutput, ReadRequest};
use omnigraph_server::{AppState, build_app};
use serde_json::Value;
use serial_test::serial;
use tower::ServiceExt;
@ -457,6 +460,180 @@ async fn cluster_boot_serves_applied_state() {
assert_eq!(status, StatusCode::OK, "{body}");
}
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn cluster_boot_injects_embedding_provider_config() {
const EMBED_SCHEMA: &str = r#"
node Doc {
slug: String @key
title: String @index
embedding: Vector(4) @embed("title", model="cluster-mock") @index
}
"#;
const EMBED_QUERY: &str = r#"
query vector_search_string($q: String) {
match { $d: Doc }
return { $d.slug, $d.title }
order { nearest($d.embedding, $q) }
limit 3
}
"#;
let alpha = mock_embedding("alpha", 4);
let beta = mock_embedding("beta", 4);
let gamma = mock_embedding("gamma", 4);
let data = format!(
concat!(
r#"{{"type":"Doc","data":{{"slug":"alpha-doc","title":"alpha guide","embedding":[{}]}}}}"#,
"\n",
r#"{{"type":"Doc","data":{{"slug":"beta-doc","title":"beta guide","embedding":[{}]}}}}"#,
"\n",
r#"{{"type":"Doc","data":{{"slug":"gamma-doc","title":"gamma handbook","embedding":[{}]}}}}"#
),
format_vector(&alpha),
format_vector(&beta),
format_vector(&gamma),
);
let temp = tempfile::tempdir().unwrap();
fs::write(temp.path().join("docs.pg"), EMBED_SCHEMA).unwrap();
fs::write(temp.path().join("search.gq"), EMBED_QUERY).unwrap();
fs::write(
temp.path().join("cluster.yaml"),
r#"
version: 1
providers:
embedding:
default:
kind: mock
model: cluster-mock
graphs:
knowledge:
schema: ./docs.pg
embedding_provider: default
queries:
vector_search_string:
file: ./search.gq
"#,
)
.unwrap();
let import = omnigraph_cluster::import_config_dir(temp.path()).await;
assert!(import.ok, "{:?}", import.diagnostics);
let apply = omnigraph_cluster::apply_config_dir(temp.path()).await;
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
let graph_uri = temp
.path()
.join("graphs/knowledge.omni")
.to_string_lossy()
.to_string();
let mut db = Omnigraph::open(&graph_uri).await.unwrap();
load_jsonl(&mut db, &data, LoadMode::Overwrite)
.await
.unwrap();
let _guard = EnvGuard::set(&[
("OMNIGRAPH_EMBEDDINGS_MOCK", None),
("OMNIGRAPH_EMBED_PROVIDER", None),
("OMNIGRAPH_EMBED_BASE_URL", None),
("OMNIGRAPH_EMBED_MODEL", None),
("OPENROUTER_API_KEY", None),
("OPENAI_API_KEY", None),
("GEMINI_API_KEY", None),
]);
let settings = cluster_settings(temp.path()).await.unwrap();
let omnigraph_server::ServerConfigMode::Multi {
graphs,
config_path,
server_policy,
} = settings.mode
else {
panic!("cluster boot must select multi-graph routing");
};
let state = omnigraph_server::open_multi_graph_state(
graphs,
Vec::new(),
server_policy.as_ref(),
config_path,
)
.await
.unwrap();
let app = build_app(state);
let read = ReadRequest {
query_source: EMBED_QUERY.to_string(),
query_name: Some("vector_search_string".to_string()),
params: Some(serde_json::json!({ "q": "alpha" })),
branch: Some("main".to_string()),
snapshot: None,
};
let (status, body) = json_response(
&app,
Request::builder()
.uri("/graphs/knowledge/read")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&read).unwrap()))
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::OK, "{body}");
assert_eq!(body["row_count"], 3);
assert_eq!(body["rows"][0]["d.slug"], "alpha-doc");
}
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn cluster_boot_refuses_missing_embedding_secret_env() {
let temp = tempfile::tempdir().unwrap();
fs::write(
temp.path().join("people.pg"),
"\nnode Person {\n name: String @key\n}\n",
)
.unwrap();
fs::write(
temp.path().join("people.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
fs::write(
temp.path().join("cluster.yaml"),
r#"
version: 1
providers:
embedding:
default:
kind: openai-compatible
api_key: ${OG_TEST_MISSING_EMBED_KEY}
graphs:
knowledge:
schema: ./people.pg
embedding_provider: default
queries:
find_person:
file: ./people.gq
"#,
)
.unwrap();
let import = omnigraph_cluster::import_config_dir(temp.path()).await;
assert!(import.ok, "{:?}", import.diagnostics);
let apply = omnigraph_cluster::apply_config_dir(temp.path()).await;
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
let _guard = EnvGuard::set(&[
("OG_TEST_MISSING_EMBED_KEY", None),
("OMNIGRAPH_EMBEDDINGS_MOCK", None),
]);
let err = cluster_settings(temp.path()).await.unwrap_err();
let message = err.to_string();
assert!(
message.contains("embedding provider for graph 'knowledge'"),
"{message}"
);
assert!(message.contains("OG_TEST_MISSING_EMBED_KEY"), "{message}");
}
#[tokio::test]
async fn cluster_boot_wires_policy_bindings_into_cedar_slots() {
let temp = tempfile::tempdir().unwrap();

View file

@ -162,8 +162,8 @@ pub struct Omnigraph {
/// avoids the per-query `from_env()` rebuild and keeps the provider HTTP
/// connection pool warm. `OnceCell` guarantees a single initialization.
embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
/// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from a
/// cluster `graphs.<id>.embeddings` profile via [`Omnigraph::with_embedding_config`].
/// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from an
/// applied cluster `providers.embedding` profile via [`Omnigraph::with_embedding_config`].
/// When set, the embedding cell builds its client from this instead of
/// `EmbeddingClient::from_env()`; `None` keeps the env fallback.
embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
@ -491,12 +491,9 @@ impl Omnigraph {
/// Install a pre-resolved embedding config (RFC-012 Phase 5). Builder-style,
/// mirroring [`Omnigraph::with_policy`]: a graph served from a cluster
/// `embeddings` profile injects it here; an embedded/CLI caller that doesn't
/// embedding provider profile injects it here; an embedded/CLI caller that doesn't
/// call this keeps the `EmbeddingClient::from_env()` fallback.
pub fn with_embedding_config(
mut self,
config: Arc<crate::embedding::EmbeddingConfig>,
) -> Self {
pub fn with_embedding_config(mut self, config: Arc<crate::embedding::EmbeddingConfig>) -> Self {
self.embedding_config = Some(config);
self
}

View file

@ -48,10 +48,10 @@ enum EmbedRole {
}
/// The single source of truth for how embedding text becomes a vector:
/// provider + model + endpoint + key. Resolved once (from env today; from the
/// cluster `providers.embedding` profile in a later RFC-012 phase) and shared by
/// the query path and the offline CLI so stored and query vectors stay
/// same-space by construction.
/// provider + model + endpoint + key. Resolved once (from env for direct
/// engine/CLI callers, or from an applied cluster `providers.embedding` profile
/// at server boot) and shared by the query path and the offline CLI so stored
/// and query vectors stay same-space by construction.
#[derive(Clone, Debug)]
pub struct EmbeddingConfig {
pub provider: Provider,
@ -102,7 +102,7 @@ impl EmbeddingConfig {
})
}
/// Build a config from explicit parts — the cluster `embeddings` profile path
/// Build a config from explicit parts — the cluster `providers.embedding` profile path
/// (RFC-012 Phase 5). `provider`/`base_url`/`model` default exactly as
/// `from_env` does (shared `provider_profile`); `api_key` is already resolved
/// (the cluster path resolves a `${NAME}` ref before calling this).
@ -113,7 +113,7 @@ impl EmbeddingConfig {
api_key: String,
) -> Result<Self> {
if provider == Some("mock") {
// An explicit `model` (e.g. a cluster `embeddings` profile) is
// An explicit `model` (e.g. a cluster `providers.embedding` profile) is
// authoritative — it is what the same-space check compares against —
// so honor it; fall back to `mock()`'s env-based model only when the
// caller supplied none. Without this, a profile's `model` is silently
@ -951,7 +951,7 @@ mod tests {
#[test]
#[serial]
fn from_parts_mock_honors_an_explicit_model() {
// A cluster `embeddings` profile that sets `provider: mock, model: X`
// A cluster `providers.embedding` profile that sets `kind: mock, model: X`
// must resolve to model X — it is what the query-time same-space check
// compares against. Env cleared so the assertion isolates the arg.
let _guard = cleared_env(&[]);

View file

@ -1,9 +1,9 @@
# RFC: Provider-Independent Embedding Configuration
**Status:** Proposed
**Status:** Accepted — Phases 1-5 implemented
**Date:** 2026-06-15
**Builds on:** the engine embedding client (`crates/omnigraph/src/embedding.rs`), the `@embed` catalog
annotation (`omnigraph-compiler/src/catalog`), the reserved cluster `embeddings`/`providers` fields
annotation (`omnigraph-compiler/src/catalog`), the cluster `providers.embedding` surface
([cluster-config-specs.md](cluster-config-specs.md), [rfc-007-operator-config.md](rfc-007-operator-config.md)
for the secret-resolution pattern).
**Target release:** staged — NFR floor first, then the provider-independent config core; ingest-time `@embed`
@ -95,14 +95,18 @@ providers:
kind: openai-compatible # openai-compatible | gemini | mock
base_url: https://openrouter.ai/api/v1
model: google/gemini-embedding-2 # or openai/text-embedding-3-large, mistralai/mistral-embed, …
dimension: 3072
api_key: ${OPENROUTER_API_KEY}
graphs:
knowledge:
schema: knowledge.pg
embedding_provider: default
```
The same `openai-compatible` kind points at OpenAI direct (`base_url: https://api.openai.com/v1`,
`model: text-embedding-3-large`) or a self-hosted endpoint (vLLM/Ollama/LM Studio) by changing `base_url`. Use
`kind: gemini` only to reach Google's `generativelanguage` API directly (it keeps the query/document
task-type asymmetry that the OpenAI-compatible shape does not expose).
task-type asymmetry that the OpenAI-compatible shape does not expose). Dimensions are schema-driven by the
target `Vector(N)` column, not duplicated in the provider profile.
The zero-config tier keeps working with env only (`OMNIGRAPH_EMBED_PROVIDER`, `OMNIGRAPH_EMBED_BASE_URL`,
`OMNIGRAPH_EMBED_MODEL`, and the provider api-key env — `OPENROUTER_API_KEY` / `OPENAI_API_KEY` /
@ -163,11 +167,12 @@ ingest phase needs for throughput, and which removes the open dependency on Gemi
### Config resolution (resolved once, shared)
Precedence, highest first: cluster `providers.embedding.<name>` profile → env (`OMNIGRAPH_EMBED_*`, provider
api-key env) → built-in defaults. The api-key is resolved through the existing operator credential chain
(`${NAME}` → env / `~/.omnigraph/credentials` / server `TokenSource`); it never lives in the schema or any
checked-in file. Resolution happens once; the resolved client is shared by `nearest("string")` and the
offline CLI (replacing the per-query `EmbeddingClient::from_env()` rebuild at `exec/query.rs:238`).
Precedence, highest first for served cluster graphs: applied cluster `providers.embedding.<name>` profile →
env (`OMNIGRAPH_EMBED_*`, provider api-key env) → built-in defaults. The cluster `api_key` value is a
`${NAME}` env reference resolved at server boot; plaintext never lives in the schema, state ledger, or any
checked-in file. Resolution happens once per graph handle; the resolved client is shared by
`nearest("string")`. Direct single-graph serving, embedded callers, and the offline CLI keep the env path
unless they inject an `EmbeddingConfig` directly.
### Identity recorded in the schema IR (not a new store)
@ -215,12 +220,12 @@ the design constraint; deferred to its own RFC/phase.
| **2 — Provider-independent config** | `EmbeddingConfig` + `Provider` enum (OpenAiCompatible covering OpenRouter/OpenAI/local, Gemini, Mock); env-first resolution; client reuse | point `base_url` at OpenRouter, run `nearest("string")`, get correct neighbours vs OpenRouter-stored vectors; CLI shares the config |
| **3 — Record identity in schema IR** | `@embed` args grammar + catalog + IR persistence | `schema show` reflects recorded model/dim |
| **4 — Query-time validation** | compare resolved vs recorded; typed error; planner refusal on identity change | stored model A vs read model B → loud error, never silent garbage |
| **5 — Cluster provider wiring** | un-reserve `providers.embedding`; `${NAME}` resolution | provider profile resolved from `cluster.yaml`; legacy `omnigraph.yaml` untouched |
| **5 — Cluster provider wiring** | `providers.embedding` resources; `graphs.<id>.embedding_provider`; `${NAME}` resolution at server boot | provider profile resolved from applied cluster state; legacy `omnigraph.yaml` untouched |
| later | ingest-time `@embed` (Shape C) | separate RFC |
**Status:** Phases 14 are implemented (`@embed("…", model="…")` is recorded in the schema IR and validated at
query time with a typed same-space error; an unrecorded `@embed` keeps working with no check). Phase 5 (cluster
`providers.embedding` wiring) and ingest-time `@embed` remain.
**Status:** Phases 15 are implemented (`@embed("…", model="…")` is recorded in the schema IR and validated at
query time with a typed same-space error; an unrecorded `@embed` keeps working with no check; cluster-served
graphs can bind an applied `providers.embedding` profile). Ingest-time `@embed` remains.
## Invariants & deny-list check

View file

@ -13,7 +13,8 @@ catalog writes, **graph creation** (a declared graph that does not exist yet
is initialized by apply at the derived root), **schema updates** (soft drops
only), and — behind an explicit, digest-bound **approval** — **graph
deletion**. It does not perform data-loss schema migrations, start servers,
or serve anything it applies: the server still boots from `omnigraph.yaml`.
or run data loads. A server can boot from the applied ledger with
`omnigraph-server --cluster <config-dir | storage-root>`.
## Commands
@ -57,7 +58,7 @@ The exact contract:
## Supported `cluster.yaml`
Stage 3A accepts only this resource subset:
The current config surface accepts this resource subset:
```yaml
version: 1
@ -68,9 +69,18 @@ state:
backend: cluster
lock: true
providers:
embedding:
default:
kind: openai-compatible
base_url: https://openrouter.ai/api/v1
model: openai/text-embedding-3-large
api_key: ${OPENROUTER_API_KEY}
graphs:
knowledge:
schema: knowledge.pg
embedding_provider: default
queries: queries/ # discover every `query <name>` in queries/*.gq
policies:
@ -99,6 +109,17 @@ updates all of its queries together. Paths are relative to the config
directory — the cluster is one explicit folder, so no `./` prefixes are
needed.
`providers.embedding.<name>` defines a query-time embedding provider profile
for cluster-served graphs. A graph opts in with `embedding_provider: <name>`;
bare names normalize to `provider.embedding.<name>`. Supported provider
`kind` values are `openai-compatible` (default/OpenRouter-compatible),
`openai` (OpenAI's own host), `gemini`, and `mock`. Real providers require
`api_key: ${ENV_VAR}`; inline secrets are rejected. The env var is resolved
only when a `--cluster` server boots, so `cluster validate`, `plan`, and
`apply` do not need deployment secrets. `mock` is deterministic and does not
require `api_key`. Vector dimensions stay schema-driven by the target
`Vector(N)` column, not the provider profile.
`storage:` (optional) is the **storage root URI** for everything the cluster
stores — the state ledger, lock, content-addressed catalog, recovery
sidecars, approval artifacts, and the derived graph roots
@ -133,10 +154,12 @@ operation is active.
- stored-query parsing and query-name matching
- stored-query type-checking against the desired schema
- policy `applies_to` graph references
- embedding provider profiles and graph `embedding_provider` references
Fields reserved for later phases, such as `pipelines`, `embeddings`, `ui`,
`aliases`, and `bindings`, fail with a typed diagnostic instead of being
silently ignored.
Fields reserved for later phases, such as `pipelines`, top-level
`embeddings`, `ui`, `aliases`, and `bindings`, fail with a typed diagnostic
instead of being silently ignored. Under `providers`, only `embedding` is
supported today; other provider namespaces fail as unsupported config.
## Planning
@ -156,9 +179,21 @@ resource is planned as a create. If present, the file must use this shape:
"applied_revision": {
"config_digest": "...",
"resources": {
"graph.knowledge": { "digest": "..." },
"schema.knowledge": { "digest": "..." },
"query.knowledge.find_experts": { "digest": "..." },
"provider.embedding.default": {
"digest": "...",
"embedding_profile": {
"kind": "openai-compatible",
"base_url": "https://openrouter.ai/api/v1",
"model": "openai/text-embedding-3-large",
"api_key": "${OPENROUTER_API_KEY}"
}
},
"graph.knowledge": {
"digest": "...",
"embedding_provider": "provider.embedding.default"
},
"policy.base": {
"digest": "...",
"applies_to": ["cluster", "graph.knowledge"]

View file

@ -19,11 +19,13 @@
| Expand mode override | `OMNIGRAPH_TRAVERSAL_MODE` (`indexed`\|`csr`; unset = cost-based auto) | traversal |
| Default body limit | `1 MB` | HTTP server |
| Ingest body limit | `32 MB` | HTTP server |
| Engine embed model | `gemini-embedding-2-preview` | engine embedding |
| Compiler embed model | `text-embedding-3-small` | compiler embedding |
| Embed timeout | `30 000 ms` | both clients |
| Embed retries | `4` | both clients |
| Embed retry backoff | `200 ms` | both clients |
| Default embed provider/model | `openai-compatible` / `openai/text-embedding-3-large` | engine embedding |
| OpenAI-direct embed model | `text-embedding-3-large` | engine embedding |
| Gemini-direct embed model | `gemini-embedding-2` | engine embedding |
| Embed deadline | `OMNIGRAPH_EMBED_DEADLINE_MS=60000` | engine embedding |
| Embed timeout | `OMNIGRAPH_EMBED_TIMEOUT_MS=30000` | engine embedding |
| Embed retries | `OMNIGRAPH_EMBED_RETRY_ATTEMPTS=4` | engine embedding |
| Embed retry backoff | `OMNIGRAPH_EMBED_RETRY_BACKOFF_MS=200` | engine embedding |
| LANCE memory pool default | `1 GB` (raised in v0.3.0) | runtime |
**Expand traversal dispatch.** With `OMNIGRAPH_TRAVERSAL_MODE` unset, the engine

View file

@ -16,6 +16,36 @@ query vectors and document vectors share one model and one vector space.
Vectors are stored L2-normalized as `FixedSizeList(Float32, dim)`; the requested output dimension is driven by
the target column width and sent as Gemini `outputDimensionality` / OpenAI `dimensions`.
## Configuration (cluster)
Cluster-served graphs can pin their query-time embedder in `cluster.yaml`:
```yaml
providers:
embedding:
default:
kind: openai-compatible
base_url: https://openrouter.ai/api/v1
model: openai/text-embedding-3-large
api_key: ${OPENROUTER_API_KEY}
graphs:
knowledge:
schema: knowledge.pg
embedding_provider: default
```
`embedding_provider` references `providers.embedding.<name>`; bare names are
normalized to that typed ref. The server resolves `${ENV_VAR}` only when it
boots from the applied cluster ledger, so `cluster validate`, `plan`, and
`apply` do not need provider secrets. Inline API keys are rejected. `mock`
needs no key. Vector dimensions stay schema-driven by the target `Vector(N)`
column.
Direct single-graph serving, embedded callers, and the offline
`omnigraph embed` pipeline use environment configuration unless they inject an
`EmbeddingConfig` directly.
## Configuration (environment)
| Variable | Meaning |