diff --git a/crates/omnigraph/src/embedding.rs b/crates/omnigraph/src/embedding.rs index c141a2b..abb321c 100644 --- a/crates/omnigraph/src/embedding.rs +++ b/crates/omnigraph/src/embedding.rs @@ -17,7 +17,7 @@ const DEFAULT_GEMINI_MODEL: &str = "gemini-embedding-2"; const DEFAULT_TIMEOUT_MS: u64 = 30_000; const DEFAULT_RETRY_ATTEMPTS: usize = 4; const DEFAULT_RETRY_BACKOFF_MS: u64 = 200; -const DEFAULT_QUERY_DEADLINE_MS: u64 = 60_000; +const DEFAULT_DEADLINE_MS: u64 = 60_000; const GEMINI_QUERY_TASK_TYPE: &str = "RETRIEVAL_QUERY"; const GEMINI_DOCUMENT_TASK_TYPE: &str = "RETRIEVAL_DOCUMENT"; @@ -76,21 +76,33 @@ impl EmbeddingConfig { // The default base URL and model depend on the provider *alias*, not just // the wire shape: `openai-compatible` (and the unset default) point at the // OpenRouter gateway, while `openai` points at OpenAI's own host. - let (provider, default_base, default_model) = + // Each provider alias yields its full default profile — base URL, model, + // and the ordered api-key envs to try — so every alias-dependent default + // lives in one place. (Dispatching the key on the `Provider` enum would + // collapse `openai` and `openai-compatible` and send an OpenRouter key to + // OpenAI's host.) `openai` targets OpenAI directly and takes only + // `OPENAI_API_KEY`; the OpenRouter gateway default prefers + // `OPENROUTER_API_KEY`, falling back to `OPENAI_API_KEY`. + let (provider, default_base, default_model, key_envs): (Provider, &str, &str, &[&str]) = match env_string("OMNIGRAPH_EMBED_PROVIDER").as_deref() { None | Some("openai-compatible") => ( Provider::OpenAiCompatible, DEFAULT_OPENROUTER_BASE_URL, DEFAULT_OPENROUTER_MODEL, + &["OPENROUTER_API_KEY", "OPENAI_API_KEY"], ), Some("openai") => ( Provider::OpenAiCompatible, DEFAULT_OPENAI_BASE_URL, DEFAULT_OPENAI_MODEL, + &["OPENAI_API_KEY"], + ), + Some("gemini") => ( + Provider::Gemini, + DEFAULT_GEMINI_BASE_URL, + DEFAULT_GEMINI_MODEL, + &["GEMINI_API_KEY"], ), - Some("gemini") => { - (Provider::Gemini, DEFAULT_GEMINI_BASE_URL, DEFAULT_GEMINI_MODEL) - } Some("mock") => return Ok(Self::mock()), Some(other) => { return Err(OmniError::manifest_internal(format!( @@ -107,21 +119,13 @@ impl EmbeddingConfig { let model = env_string("OMNIGRAPH_EMBED_MODEL").unwrap_or_else(|| default_model.to_string()); - let api_key = match provider { - Provider::OpenAiCompatible => env_string("OPENROUTER_API_KEY") - .or_else(|| env_string("OPENAI_API_KEY")) - .ok_or_else(|| { - OmniError::manifest_internal( - "OPENROUTER_API_KEY or OPENAI_API_KEY is required for the openai-compatible embedding provider", - ) - })?, - Provider::Gemini => env_string("GEMINI_API_KEY").ok_or_else(|| { - OmniError::manifest_internal( - "GEMINI_API_KEY is required for the gemini embedding provider", - ) - })?, - Provider::Mock => unreachable!("mock returns early"), - }; + let api_key = key_envs.iter().copied().find_map(env_string).ok_or_else(|| { + OmniError::manifest_internal(format!( + "{} is required for the {} embedding provider", + key_envs.join(" or "), + env_string("OMNIGRAPH_EMBED_PROVIDER").as_deref().unwrap_or("openai-compatible") + )) + })?; Ok(Self { provider, @@ -150,8 +154,8 @@ pub struct EmbeddingClient { retry_attempts: usize, retry_backoff_ms: u64, /// Total wall-clock budget for one embed call, across all retries - /// (`OMNIGRAPH_EMBED_QUERY_DEADLINE_MS`). `0` = unbounded. - query_deadline_ms: u64, + /// (`OMNIGRAPH_EMBED_DEADLINE_MS`). `0` = unbounded. + deadline_ms: u64, } struct EmbedCallError { @@ -210,8 +214,8 @@ impl EmbeddingClient { parse_env_usize("OMNIGRAPH_EMBED_RETRY_ATTEMPTS", DEFAULT_RETRY_ATTEMPTS); let retry_backoff_ms = parse_env_u64("OMNIGRAPH_EMBED_RETRY_BACKOFF_MS", DEFAULT_RETRY_BACKOFF_MS); - let query_deadline_ms = - parse_env_u64_allow_zero("OMNIGRAPH_EMBED_QUERY_DEADLINE_MS", DEFAULT_QUERY_DEADLINE_MS); + let deadline_ms = + parse_env_u64_allow_zero("OMNIGRAPH_EMBED_DEADLINE_MS", DEFAULT_DEADLINE_MS); let timeout_ms = parse_env_u64("OMNIGRAPH_EMBED_TIMEOUT_MS", DEFAULT_TIMEOUT_MS); let http = Client::builder() .timeout(Duration::from_millis(timeout_ms)) @@ -225,7 +229,7 @@ impl EmbeddingClient { http, retry_attempts, retry_backoff_ms, - query_deadline_ms, + deadline_ms, }) } @@ -288,22 +292,23 @@ impl EmbeddingClient { result } - /// Bound the whole embed operation (all retries + backoff) by - /// `query_deadline_ms`, so a degraded provider can never hang a read for the - /// full retry envelope. `0` = unbounded. Read-path only, so cancelling the + /// Bound the whole embed operation (all retries + backoff) by `deadline_ms`, + /// so a degraded provider can never hang the caller for the full retry + /// envelope. Applies to every embed call (query and document). `0` = + /// unbounded. Embedding has no Lance/manifest side effects, so cancelling the /// in-flight request future on elapse is safe. async fn run_with_deadline(&self, fut: F) -> Result> where F: Future>>, { - if self.query_deadline_ms == 0 { + if self.deadline_ms == 0 { return fut.await; } - match tokio::time::timeout(Duration::from_millis(self.query_deadline_ms), fut).await { + match tokio::time::timeout(Duration::from_millis(self.deadline_ms), fut).await { Ok(res) => res, Err(_elapsed) => Err(OmniError::manifest_internal(format!( "embedding deadline exceeded after {} ms (provider={:?}, model={})", - self.query_deadline_ms, self.config.provider, self.config.model + self.deadline_ms, self.config.provider, self.config.model ))), } } @@ -792,7 +797,7 @@ mod tests { #[tokio::test] async fn run_with_deadline_aborts_slow_future() { let mut client = EmbeddingClient::mock_for_tests(); - client.query_deadline_ms = 20; + client.deadline_ms = 20; let slow = async { tokio::time::sleep(Duration::from_secs(5)).await; Ok(vec![0.0_f32]) @@ -814,7 +819,7 @@ mod tests { #[tokio::test] async fn run_with_deadline_zero_is_unbounded() { let mut client = EmbeddingClient::mock_for_tests(); - client.query_deadline_ms = 0; + client.deadline_ms = 0; let ok = client .run_with_deadline(async { Ok(vec![3.0_f32]) }) .await @@ -847,6 +852,31 @@ mod tests { assert_eq!(config.api_key, "k"); } + #[test] + #[serial] + fn from_env_openai_alias_prefers_openai_key_over_openrouter() { + // `openai` targets api.openai.com, so an OpenRouter key must not be sent there. + let _guard = cleared_env(&[ + ("OMNIGRAPH_EMBED_PROVIDER", Some("openai")), + ("OPENROUTER_API_KEY", Some("router")), + ("OPENAI_API_KEY", Some("openai")), + ]); + let config = EmbeddingConfig::from_env().unwrap(); + assert_eq!(config.base_url, DEFAULT_OPENAI_BASE_URL); + assert_eq!(config.api_key, "openai"); + } + + #[test] + #[serial] + fn from_env_openai_alias_errors_when_only_openrouter_key_is_set() { + let _guard = cleared_env(&[ + ("OMNIGRAPH_EMBED_PROVIDER", Some("openai")), + ("OPENROUTER_API_KEY", Some("router")), + ]); + let err = EmbeddingConfig::from_env().unwrap_err(); + assert!(err.to_string().contains("OPENAI_API_KEY"), "got: {err}"); + } + #[test] #[serial] fn from_env_openai_compatible_prefers_openrouter_key() { diff --git a/docs/dev/rfc-012-embedding-provider-config.md b/docs/dev/rfc-012-embedding-provider-config.md index de523c7..1604a5e 100644 --- a/docs/dev/rfc-012-embedding-provider-config.md +++ b/docs/dev/rfc-012-embedding-provider-config.md @@ -187,8 +187,9 @@ provider). This is the only behaviour that closes P3 by construction. ### NFR floor (independent of the provider work) -- **Deadline:** wrap the query-time embed in a total-operation deadline (`OMNIGRAPH_EMBED_QUERY_DEADLINE_MS`) - so a degraded provider cannot hang a read for the current ~121 s worst case (4 × 30 s timeout + backoff). +- **Deadline:** wrap every embed call (query or document) in a total-operation deadline + (`OMNIGRAPH_EMBED_DEADLINE_MS`) so a degraded provider cannot hang the caller for the current ~121 s worst + case (4 × 30 s timeout + backoff). - **Observability:** `tracing` span per embed call (provider, model, dim, attempts, outcome, elapsed; `warn!` per retry; token usage when the provider returns it). The subsystem has zero instrumentation today. - **Single normalization:** one `normalize_vector` (the dead client carried a divergent second copy; removed diff --git a/docs/user/search/embeddings.md b/docs/user/search/embeddings.md index cd65587..9e3fd55 100644 --- a/docs/user/search/embeddings.md +++ b/docs/user/search/embeddings.md @@ -25,7 +25,7 @@ the target column width and sent as Gemini `outputDimensionality` / OpenAI `dime | `OMNIGRAPH_EMBED_MODEL` | model id; defaults `openai/text-embedding-3-large` (OpenRouter), `text-embedding-3-large` (`openai`), `gemini-embedding-2` (`gemini`) | | `OPENROUTER_API_KEY` / `OPENAI_API_KEY` | api key for `openai-compatible` (OpenRouter preferred) | | `GEMINI_API_KEY` | api key for `gemini` | -| `OMNIGRAPH_EMBED_QUERY_DEADLINE_MS` | total wall-clock budget for one embed call across all retries (default `60000`; `0` = unbounded) | +| `OMNIGRAPH_EMBED_DEADLINE_MS` | total wall-clock budget for one embed call across all retries (default `60000`; `0` = unbounded) | | `OMNIGRAPH_EMBED_TIMEOUT_MS` | per-request HTTP timeout (default `30000`) | | `OMNIGRAPH_EMBED_RETRY_ATTEMPTS` / `OMNIGRAPH_EMBED_RETRY_BACKOFF_MS` | retry policy (defaults `4` / `200`) | | `OMNIGRAPH_EMBEDDINGS_MOCK` | set truthy to force the deterministic mock provider | @@ -35,7 +35,7 @@ The default zero-config path is OpenRouter: set `OPENROUTER_API_KEY` and run. Re ### Behavior notes -- **Bounded latency.** Each embed call is wrapped in `OMNIGRAPH_EMBED_QUERY_DEADLINE_MS`, so a degraded +- **Bounded latency.** Each embed call is wrapped in `OMNIGRAPH_EMBED_DEADLINE_MS`, so a degraded provider cannot hang a read for the full retry envelope. - **Reuse.** The query path builds the client once per graph handle (on the first `nearest($v, "string")` that needs embedding) and reuses it, keeping the provider connection pool warm. A graph that never embeds