diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 6c80117..45b3553 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -156,6 +156,12 @@ pub struct Omnigraph { /// `apply_schema_as` consults this field (PR #2 proof-of-concept); /// PR #3 fans the `enforce()` call out to the remaining writers. policy: Option>, + /// Lazily-built, reused-across-queries embedding client. Built on the first + /// `nearest($v, "string")` that needs server-side embedding (so a graph that + /// never embeds needs no provider key), then shared by every later query — + /// avoids the per-query `from_env()` rebuild and keeps the provider HTTP + /// connection pool warm. `OnceCell` guarantees a single initialization. + embedding: Arc>, } /// Whether [`Omnigraph::open`] runs the open-time recovery sweep. @@ -319,6 +325,7 @@ impl Omnigraph { write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), policy: None, + embedding: Arc::new(tokio::sync::OnceCell::new()), }) } @@ -418,6 +425,7 @@ impl Omnigraph { write_queue: Arc::new(crate::db::write_queue::WriteQueueManager::new()), merge_exclusive: Arc::new(tokio::sync::Mutex::new(())), policy: None, + embedding: Arc::new(tokio::sync::OnceCell::new()), }) } @@ -465,6 +473,15 @@ impl Omnigraph { self } + /// The lazily-initialized, reused-across-queries embedding client cell + /// (see the `embedding` field doc). The query executor resolves the client + /// through this on the first `nearest($v, "string")` that needs embedding. + pub(crate) fn embedding_cell( + &self, + ) -> &tokio::sync::OnceCell { + &self.embedding + } + /// Engine-layer policy enforcement gate (MR-722 chassis core). /// /// * If no policy is installed → no-op (returns `Ok(())`). diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 4c1822f..8411dd3 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -31,7 +31,15 @@ impl Omnigraph { GraphIndexHandle::none() }; - execute_query(&ir, params, &resolved.snapshot, &graph_index, &catalog).await + execute_query( + &ir, + params, + &resolved.snapshot, + &graph_index, + &catalog, + self.embedding_cell(), + ) + .await } /// Run a named query against the graph as it existed at a prior manifest version. @@ -72,7 +80,15 @@ impl Omnigraph { GraphIndexHandle::none() }; - execute_query(&ir, params, &snapshot, &graph_index, &catalog).await + execute_query( + &ir, + params, + &snapshot, + &graph_index, + &catalog, + self.embedding_cell(), + ) + .await } } @@ -102,6 +118,7 @@ async fn extract_search_mode( ir: &QueryIR, params: &ParamMap, catalog: &Catalog, + embedding: &tokio::sync::OnceCell, ) -> Result { if ir.order_by.is_empty() { return Ok(SearchMode::default()); @@ -114,7 +131,8 @@ async fn extract_search_mode( query, } => { let vec = - resolve_nearest_query_vec(ir, catalog, variable, property, query, params).await?; + resolve_nearest_query_vec(ir, catalog, variable, property, query, params, embedding) + .await?; let k = ir.limit.ok_or_else(|| { OmniError::manifest("nearest() ordering requires a limit clause".to_string()) })? as usize; @@ -157,9 +175,10 @@ async fn extract_search_mode( .unwrap_or(60) as u32; let primary_mode = - extract_sub_search_mode(ir, primary, params, catalog, ir.limit).await?; + extract_sub_search_mode(ir, primary, params, catalog, ir.limit, embedding).await?; let secondary_mode = - extract_sub_search_mode(ir, secondary, params, catalog, ir.limit).await?; + extract_sub_search_mode(ir, secondary, params, catalog, ir.limit, embedding) + .await?; Ok(SearchMode { rrf: Some(RrfMode { @@ -182,6 +201,7 @@ async fn extract_sub_search_mode( params: &ParamMap, catalog: &Catalog, limit: Option, + embedding: &tokio::sync::OnceCell, ) -> Result { match expr { IRExpr::Nearest { @@ -190,7 +210,8 @@ async fn extract_sub_search_mode( query, } => { let vec = - resolve_nearest_query_vec(ir, catalog, variable, property, query, params).await?; + resolve_nearest_query_vec(ir, catalog, variable, property, query, params, embedding) + .await?; let k = limit.unwrap_or(100) as usize; Ok(SearchMode { nearest: Some((variable.clone(), property.clone(), vec, k)), @@ -229,15 +250,20 @@ async fn resolve_nearest_query_vec( property: &str, expr: &IRExpr, params: &ParamMap, + embedding: &tokio::sync::OnceCell, ) -> Result> { let lit = resolve_literal_or_param(expr, params)?; match lit { Literal::List(_) => literal_to_f32_vec(&lit), Literal::String(text) => { let expected_dim = nearest_property_dimension(ir, catalog, variable, property)?; - EmbeddingClient::from_env()? - .embed_query_text(&text, expected_dim) - .await + // Lazily resolve the per-handle client once, then reuse it across + // queries (keeps the provider connection pool warm); a graph that + // never embeds never builds a client and needs no provider key. + let client = embedding + .get_or_try_init(|| async { EmbeddingClient::from_env() }) + .await?; + client.embed_query_text(&text, expected_dim).await } _ => Err(OmniError::manifest( "nearest query must be a string or list of floats".to_string(), @@ -341,8 +367,9 @@ pub async fn execute_query( snapshot: &Snapshot, graph_index: &GraphIndexHandle<'_>, catalog: &Catalog, + embedding: &tokio::sync::OnceCell, ) -> Result { - let search_mode = extract_search_mode(ir, params, catalog).await?; + let search_mode = extract_search_mode(ir, params, catalog, embedding).await?; // RRF requires forked execution if let Some(ref rrf) = search_mode.rrf {