mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-07-03 02:51:04 +02:00
feat(engine): inject embedding config into the handle (RFC-012 Phase 5)
The Omnigraph handle gains an optional embedding_config (Arc<EmbeddingConfig>) injected via with_embedding_config(), mirroring with_policy(). The query path now threads an EmbeddingResolver (bundling the reuse OnceCell + the optional injected config) instead of the bare cell; its lazy resolve() builds the client from the injected config when present, else EmbeddingClient::from_env(). Laziness preserved (a graph that never embeds needs no key). This is the engine half of cluster per-graph embedding wiring; the cluster config + serving injection follow. New search test proves the injected config is used (from_env would error with no keys). Self-contained: no cluster dependency. 25 search tests pass.
This commit is contained in:
parent
ffb4a2c9ab
commit
3de93ae7ab
3 changed files with 98 additions and 9 deletions
|
|
@ -162,6 +162,11 @@ pub struct Omnigraph {
|
||||||
/// avoids the per-query `from_env()` rebuild and keeps the provider HTTP
|
/// avoids the per-query `from_env()` rebuild and keeps the provider HTTP
|
||||||
/// connection pool warm. `OnceCell` guarantees a single initialization.
|
/// connection pool warm. `OnceCell` guarantees a single initialization.
|
||||||
embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
|
embedding: Arc<tokio::sync::OnceCell<crate::embedding::EmbeddingClient>>,
|
||||||
|
/// Optional pre-resolved embedding config (RFC-012 Phase 5), injected from a
|
||||||
|
/// cluster `graphs.<id>.embeddings` profile via [`Omnigraph::with_embedding_config`].
|
||||||
|
/// When set, the embedding cell builds its client from this instead of
|
||||||
|
/// `EmbeddingClient::from_env()`; `None` keeps the env fallback.
|
||||||
|
embedding_config: Option<Arc<crate::embedding::EmbeddingConfig>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
|
/// Whether [`Omnigraph::open`] runs the open-time recovery sweep.
|
||||||
|
|
@ -326,6 +331,7 @@ impl Omnigraph {
|
||||||
merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
|
merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
|
||||||
policy: None,
|
policy: None,
|
||||||
embedding: Arc::new(tokio::sync::OnceCell::new()),
|
embedding: Arc::new(tokio::sync::OnceCell::new()),
|
||||||
|
embedding_config: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -426,6 +432,7 @@ impl Omnigraph {
|
||||||
merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
|
merge_exclusive: Arc::new(tokio::sync::Mutex::new(())),
|
||||||
policy: None,
|
policy: None,
|
||||||
embedding: Arc::new(tokio::sync::OnceCell::new()),
|
embedding: Arc::new(tokio::sync::OnceCell::new()),
|
||||||
|
embedding_config: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -482,6 +489,23 @@ impl Omnigraph {
|
||||||
&self.embedding
|
&self.embedding
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Install a pre-resolved embedding config (RFC-012 Phase 5). Builder-style,
|
||||||
|
/// mirroring [`Omnigraph::with_policy`]: a graph served from a cluster
|
||||||
|
/// `embeddings` profile injects it here; an embedded/CLI caller that doesn't
|
||||||
|
/// call this keeps the `EmbeddingClient::from_env()` fallback.
|
||||||
|
pub fn with_embedding_config(
|
||||||
|
mut self,
|
||||||
|
config: Arc<crate::embedding::EmbeddingConfig>,
|
||||||
|
) -> Self {
|
||||||
|
self.embedding_config = Some(config);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The injected embedding config, if any (see the `embedding_config` field).
|
||||||
|
pub(crate) fn embedding_config_ref(&self) -> Option<&crate::embedding::EmbeddingConfig> {
|
||||||
|
self.embedding_config.as_deref()
|
||||||
|
}
|
||||||
|
|
||||||
/// Engine-layer policy enforcement gate (MR-722 chassis core).
|
/// Engine-layer policy enforcement gate (MR-722 chassis core).
|
||||||
///
|
///
|
||||||
/// * If no policy is installed → no-op (returns `Ok(())`).
|
/// * If no policy is installed → no-op (returns `Ok(())`).
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,30 @@ use super::*;
|
||||||
|
|
||||||
use super::projection::{apply_filter, apply_ordering, project_return};
|
use super::projection::{apply_filter, apply_ordering, project_return};
|
||||||
|
|
||||||
|
/// Bundles the per-handle embedding client cell with the optional injected
|
||||||
|
/// config (RFC-012 Phase 5) so the lazy init uses the injected config when
|
||||||
|
/// present, else `EmbeddingClient::from_env()`. Threaded through the query path
|
||||||
|
/// in place of the bare cell, preserving laziness (a graph that never embeds
|
||||||
|
/// builds no client and needs no key).
|
||||||
|
pub(crate) struct EmbeddingResolver<'a> {
|
||||||
|
cell: &'a tokio::sync::OnceCell<EmbeddingClient>,
|
||||||
|
config: Option<&'a crate::embedding::EmbeddingConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EmbeddingResolver<'_> {
|
||||||
|
async fn resolve(&self) -> Result<&EmbeddingClient> {
|
||||||
|
let config = self.config.cloned();
|
||||||
|
self.cell
|
||||||
|
.get_or_try_init(|| async move {
|
||||||
|
match config {
|
||||||
|
Some(cfg) => EmbeddingClient::new(cfg),
|
||||||
|
None => EmbeddingClient::from_env(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Omnigraph {
|
impl Omnigraph {
|
||||||
/// Run a named query against an explicit branch or snapshot target.
|
/// Run a named query against an explicit branch or snapshot target.
|
||||||
pub async fn query(
|
pub async fn query(
|
||||||
|
|
@ -37,7 +61,10 @@ impl Omnigraph {
|
||||||
&resolved.snapshot,
|
&resolved.snapshot,
|
||||||
&graph_index,
|
&graph_index,
|
||||||
&catalog,
|
&catalog,
|
||||||
self.embedding_cell(),
|
&EmbeddingResolver {
|
||||||
|
cell: self.embedding_cell(),
|
||||||
|
config: self.embedding_config_ref(),
|
||||||
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
@ -86,7 +113,10 @@ impl Omnigraph {
|
||||||
&snapshot,
|
&snapshot,
|
||||||
&graph_index,
|
&graph_index,
|
||||||
&catalog,
|
&catalog,
|
||||||
self.embedding_cell(),
|
&EmbeddingResolver {
|
||||||
|
cell: self.embedding_cell(),
|
||||||
|
config: self.embedding_config_ref(),
|
||||||
|
},
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
@ -118,7 +148,7 @@ async fn extract_search_mode(
|
||||||
ir: &QueryIR,
|
ir: &QueryIR,
|
||||||
params: &ParamMap,
|
params: &ParamMap,
|
||||||
catalog: &Catalog,
|
catalog: &Catalog,
|
||||||
embedding: &tokio::sync::OnceCell<EmbeddingClient>,
|
embedding: &EmbeddingResolver<'_>,
|
||||||
) -> Result<SearchMode> {
|
) -> Result<SearchMode> {
|
||||||
if ir.order_by.is_empty() {
|
if ir.order_by.is_empty() {
|
||||||
return Ok(SearchMode::default());
|
return Ok(SearchMode::default());
|
||||||
|
|
@ -201,7 +231,7 @@ async fn extract_sub_search_mode(
|
||||||
params: &ParamMap,
|
params: &ParamMap,
|
||||||
catalog: &Catalog,
|
catalog: &Catalog,
|
||||||
limit: Option<u64>,
|
limit: Option<u64>,
|
||||||
embedding: &tokio::sync::OnceCell<EmbeddingClient>,
|
embedding: &EmbeddingResolver<'_>,
|
||||||
) -> Result<SearchMode> {
|
) -> Result<SearchMode> {
|
||||||
match expr {
|
match expr {
|
||||||
IRExpr::Nearest {
|
IRExpr::Nearest {
|
||||||
|
|
@ -250,7 +280,7 @@ async fn resolve_nearest_query_vec(
|
||||||
property: &str,
|
property: &str,
|
||||||
expr: &IRExpr,
|
expr: &IRExpr,
|
||||||
params: &ParamMap,
|
params: &ParamMap,
|
||||||
embedding: &tokio::sync::OnceCell<EmbeddingClient>,
|
embedding: &EmbeddingResolver<'_>,
|
||||||
) -> Result<Vec<f32>> {
|
) -> Result<Vec<f32>> {
|
||||||
let lit = resolve_literal_or_param(expr, params)?;
|
let lit = resolve_literal_or_param(expr, params)?;
|
||||||
match lit {
|
match lit {
|
||||||
|
|
@ -261,9 +291,7 @@ async fn resolve_nearest_query_vec(
|
||||||
// Lazily resolve the per-handle client once, then reuse it across
|
// Lazily resolve the per-handle client once, then reuse it across
|
||||||
// queries (keeps the provider connection pool warm); a graph that
|
// queries (keeps the provider connection pool warm); a graph that
|
||||||
// never embeds never builds a client and needs no provider key.
|
// never embeds never builds a client and needs no provider key.
|
||||||
let client = embedding
|
let client = embedding.resolve().await?;
|
||||||
.get_or_try_init(|| async { EmbeddingClient::from_env() })
|
|
||||||
.await?;
|
|
||||||
// Same-space guarantee: if the property recorded the model that
|
// Same-space guarantee: if the property recorded the model that
|
||||||
// produced its stored vectors (`@embed("…", model="…")`), the query
|
// produced its stored vectors (`@embed("…", model="…")`), the query
|
||||||
// embedder must resolve to that same model — otherwise the comparison
|
// embedder must resolve to that same model — otherwise the comparison
|
||||||
|
|
@ -392,7 +420,7 @@ pub async fn execute_query(
|
||||||
snapshot: &Snapshot,
|
snapshot: &Snapshot,
|
||||||
graph_index: &GraphIndexHandle<'_>,
|
graph_index: &GraphIndexHandle<'_>,
|
||||||
catalog: &Catalog,
|
catalog: &Catalog,
|
||||||
embedding: &tokio::sync::OnceCell<EmbeddingClient>,
|
embedding: &EmbeddingResolver<'_>,
|
||||||
) -> Result<QueryResult> {
|
) -> Result<QueryResult> {
|
||||||
let search_mode = extract_search_mode(ir, params, catalog, embedding).await?;
|
let search_mode = extract_search_mode(ir, params, catalog, embedding).await?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -615,6 +615,43 @@ async fn nearest_string_errors_when_query_model_differs_from_recorded_model() {
|
||||||
assert!(msg.contains("test-model-b"), "got: {msg}");
|
assert!(msg.contains("test-model-b"), "got: {msg}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[serial]
|
||||||
|
async fn injected_embedding_config_is_used_instead_of_env() {
|
||||||
|
// No mock flag and no provider keys in env, so `from_env()` would error.
|
||||||
|
// Injecting a Mock config proves the resolver uses the injected config
|
||||||
|
// (RFC-012 Phase 5), and its model satisfies the recorded same-space check.
|
||||||
|
let _guard = EnvGuard::set(&[
|
||||||
|
("OMNIGRAPH_EMBEDDINGS_MOCK", None),
|
||||||
|
("OMNIGRAPH_EMBED_PROVIDER", None),
|
||||||
|
("OMNIGRAPH_EMBED_MODEL", None),
|
||||||
|
("OPENROUTER_API_KEY", None),
|
||||||
|
("OPENAI_API_KEY", None),
|
||||||
|
("GEMINI_API_KEY", None),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let mut db = init_model_recorded_search_db(&dir)
|
||||||
|
.await
|
||||||
|
.with_embedding_config(std::sync::Arc::new(omnigraph::embedding::EmbeddingConfig {
|
||||||
|
provider: omnigraph::embedding::Provider::Mock,
|
||||||
|
model: "test-model-a".to_string(),
|
||||||
|
base_url: String::new(),
|
||||||
|
api_key: String::new(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
let result = query_main(
|
||||||
|
&mut db,
|
||||||
|
MOCK_SEARCH_QUERIES,
|
||||||
|
"vector_search_string",
|
||||||
|
¶ms(&[("$q", "alpha")]),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result_slugs(&result)[0], "alpha-doc");
|
||||||
|
}
|
||||||
|
|
||||||
// ─── BM25 search ────────────────────────────────────────────────────────────
|
// ─── BM25 search ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue