From 3de93ae7ab33df197fdcf53a2b419bc1604633b5 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Mon, 15 Jun 2026 21:50:55 +0200 Subject: [PATCH] feat(engine): inject embedding config into the handle (RFC-012 Phase 5) The Omnigraph handle gains an optional embedding_config (Arc) injected via with_embedding_config(), mirroring with_policy(). The query path now threads an EmbeddingResolver (bundling the reuse OnceCell + the optional injected config) instead of the bare cell; its lazy resolve() builds the client from the injected config when present, else EmbeddingClient::from_env(). Laziness preserved (a graph that never embeds needs no key). This is the engine half of cluster per-graph embedding wiring; the cluster config + serving injection follow. New search test proves the injected config is used (from_env would error with no keys). Self-contained: no cluster dependency. 25 search tests pass. --- crates/omnigraph/src/db/omnigraph.rs | 24 +++++++++++++++ crates/omnigraph/src/exec/query.rs | 46 ++++++++++++++++++++++------ crates/omnigraph/tests/search.rs | 37 ++++++++++++++++++++++ 3 files changed, 98 insertions(+), 9 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 45b3553..dd20efe 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -162,6 +162,11 @@ pub struct Omnigraph { /// avoids the per-query `from_env()` rebuild and keeps the provider HTTP /// connection pool warm. `OnceCell` guarantees a single initialization. embedding: Arc>, + /// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from a + /// cluster `graphs..embeddings` profile via [`Omnigraph::with_embedding_config`]. + /// When set, the embedding cell builds its client from this instead of + /// `EmbeddingClient::from_env()`; `None` keeps the env fallback. + embedding_config: Option>, } /// Whether [`Omnigraph::open`] runs the open-time recovery sweep. @@ -326,6 +331,7 @@ impl Omnigraph { merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), policy: None, embedding: Arc::new(tokio::sync::OnceCell::new()), + embedding_config: None, }) } @@ -426,6 +432,7 @@ impl Omnigraph { merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), policy: None, embedding: Arc::new(tokio::sync::OnceCell::new()), + embedding_config: None, }) } @@ -482,6 +489,23 @@ impl Omnigraph { &self.embedding } + /// Install a pre-resolved embedding config (RFC-012 Phase 5). Builder-style, + /// mirroring [`Omnigraph::with_policy`]: a graph served from a cluster + /// `embeddings` profile injects it here; an embedded/CLI caller that doesn't + /// call this keeps the `EmbeddingClient::from_env()` fallback. + pub fn with_embedding_config( + mut self, + config: Arc, + ) -> Self { + self.embedding_config = Some(config); + self + } + + /// The injected embedding config, if any (see the `embedding_config` field). + pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> { + self.embedding_config.as_deref() + } + /// Engine-layer policy enforcement gate (MR-722 chassis core). /// /// * If no policy is installed → no-op (returns `Ok(())`). diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 8efadad..b12e26b 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -2,6 +2,30 @@ use super::*; use super::projection::{apply_filter, apply_ordering, project_return}; +/// Bundles the per-handle embedding client cell with the optional injected +/// config (RFC-012 Phase 5) so the lazy init uses the injected config when +/// present, else `EmbeddingClient::from_env()`. Threaded through the query path +/// in place of the bare cell, preserving laziness (a graph that never embeds +/// builds no client and needs no key). +pub(crate) struct EmbeddingResolver<'a> { + cell: &'a tokio::sync::OnceCell, + config: Option<&'a crate::embedding::EmbeddingConfig>, +} + +impl EmbeddingResolver<'_> { + async fn resolve(&self) -> Result<&EmbeddingClient> { + let config = self.config.cloned(); + self.cell + .get_or_try_init(|| async move { + match config { + Some(cfg) => EmbeddingClient::new(cfg), + None => EmbeddingClient::from_env(), + } + }) + .await + } +} + impl Omnigraph { /// Run a named query against an explicit branch or snapshot target. pub async fn query( @@ -37,7 +61,10 @@ impl Omnigraph { &resolved.snapshot, &graph_index, &catalog, - self.embedding_cell(), + &EmbeddingResolver { + cell: self.embedding_cell(), + config: self.embedding_config_ref(), + }, ) .await } @@ -86,7 +113,10 @@ impl Omnigraph { &snapshot, &graph_index, &catalog, - self.embedding_cell(), + &EmbeddingResolver { + cell: self.embedding_cell(), + config: self.embedding_config_ref(), + }, ) .await } @@ -118,7 +148,7 @@ async fn extract_search_mode( ir: &QueryIR, params: &ParamMap, catalog: &Catalog, - embedding: &tokio::sync::OnceCell, + embedding: &EmbeddingResolver<'_>, ) -> Result { if ir.order_by.is_empty() { return Ok(SearchMode::default()); @@ -201,7 +231,7 @@ async fn extract_sub_search_mode( params: &ParamMap, catalog: &Catalog, limit: Option, - embedding: &tokio::sync::OnceCell, + embedding: &EmbeddingResolver<'_>, ) -> Result { match expr { IRExpr::Nearest { @@ -250,7 +280,7 @@ async fn resolve_nearest_query_vec( property: &str, expr: &IRExpr, params: &ParamMap, - embedding: &tokio::sync::OnceCell, + embedding: &EmbeddingResolver<'_>, ) -> Result> { let lit = resolve_literal_or_param(expr, params)?; match lit { @@ -261,9 +291,7 @@ async fn resolve_nearest_query_vec( // Lazily resolve the per-handle client once, then reuse it across // queries (keeps the provider connection pool warm); a graph that // never embeds never builds a client and needs no provider key. - let client = embedding - .get_or_try_init(|| async { EmbeddingClient::from_env() }) - .await?; + let client = embedding.resolve().await?; // Same-space guarantee: if the property recorded the model that // produced its stored vectors (`@embed("…", model="…")`), the query // embedder must resolve to that same model — otherwise the comparison @@ -392,7 +420,7 @@ pub async fn execute_query( snapshot: &Snapshot, graph_index: &GraphIndexHandle<'_>, catalog: &Catalog, - embedding: &tokio::sync::OnceCell, + embedding: &EmbeddingResolver<'_>, ) -> Result { let search_mode = extract_search_mode(ir, params, catalog, embedding).await?; diff --git a/crates/omnigraph/tests/search.rs b/crates/omnigraph/tests/search.rs index fb6e853..425c51b 100644 --- a/crates/omnigraph/tests/search.rs +++ b/crates/omnigraph/tests/search.rs @@ -615,6 +615,43 @@ async fn nearest_string_errors_when_query_model_differs_from_recorded_model() { assert!(msg.contains("test-model-b"), "got: {msg}"); } +#[tokio::test] +#[serial] +async fn injected_embedding_config_is_used_instead_of_env() { + // No mock flag and no provider keys in env, so `from_env()` would error. + // Injecting a Mock config proves the resolver uses the injected config + // (RFC-012 Phase 5), and its model satisfies the recorded same-space check. + let _guard = EnvGuard::set(&[ + ("OMNIGRAPH_EMBEDDINGS_MOCK", None), + ("OMNIGRAPH_EMBED_PROVIDER", None), + ("OMNIGRAPH_EMBED_MODEL", None), + ("OPENROUTER_API_KEY", None), + ("OPENAI_API_KEY", None), + ("GEMINI_API_KEY", None), + ]); + + let dir = tempfile::tempdir().unwrap(); + let mut db = init_model_recorded_search_db(&dir) + .await + .with_embedding_config(std::sync::Arc::new(omnigraph::embedding::EmbeddingConfig { + provider: omnigraph::embedding::Provider::Mock, + model: "test-model-a".to_string(), + base_url: String::new(), + api_key: String::new(), + })); + + let result = query_main( + &mut db, + MOCK_SEARCH_QUERIES, + "vector_search_string", + ¶ms(&[("$q", "alpha")]), + ) + .await + .unwrap(); + + assert_eq!(result_slugs(&result)[0], "alpha-doc"); +} + // ─── BM25 search ──────────────────────────────────────────────────────────── #[tokio::test]