mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-18 02:24:27 +02:00
refactor(compiler): remove dead OpenAI embedding client (RFC-012 Phase 1)
The compiler-side EmbeddingClient (OpenAI/`text-embedding-3-small`) was pub(crate), #![allow(dead_code)], and had zero callers anywhere in the workspace; the live nearest("string") path and the offline `omnigraph embed` CLI both use the engine Gemini client. It carried the only NANOGRAPH_* env vars (vestigial 'nanograph os' naming) and was the sole user of reqwest+tokio in omnigraph-compiler — dropping them removes an HTTP client and async runtime from a crate that advertises 'Zero Lance dependency' (invariant 11).
Rewrites docs/user/search/embeddings.md to the single-client reality and corrects the false 'engine embeds @embed at ingest' claim. Verified: build green, 238 compiler tests pass, `rg NANOGRAPH_` empty.
This commit is contained in:
parent
5b83e6b56d
commit
7c916f5b98
6 changed files with 17 additions and 401 deletions
|
|
@ -80,7 +80,7 @@ Full diagram and concurrency model: [docs/dev/architecture.md](docs/dev/architec
|
|||
| Mutations — insert/update/delete, D2, atomicity | [docs/user/mutations/index.md](docs/user/mutations/index.md) |
|
||||
| Search funcs (`nearest`/`bm25`/`rrf`), hybrid ranking | [docs/user/search/index.md](docs/user/search/index.md) |
|
||||
| Indexes (BTREE / inverted / vector / graph topology) | [docs/user/search/indexes.md](docs/user/search/indexes.md) |
|
||||
| Embeddings (compiler + engine clients, env vars, `@embed`) | [docs/user/search/embeddings.md](docs/user/search/embeddings.md) |
|
||||
| Embeddings (engine client, env vars, `@embed`) | [docs/user/search/embeddings.md](docs/user/search/embeddings.md) |
|
||||
| Concepts — what OmniGraph is, L1/L2 framing | [docs/user/concepts/index.md](docs/user/concepts/index.md) |
|
||||
| Quickstart — init → load → query → branch | [docs/user/quickstart.md](docs/user/quickstart.md) |
|
||||
| Branches, commit graph, system branches | [docs/user/branching/index.md](docs/user/branching/index.md) |
|
||||
|
|
|
|||
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -4915,12 +4915,10 @@ dependencies = [
|
|||
"arrow-select",
|
||||
"pest",
|
||||
"pest_derive",
|
||||
"reqwest 0.12.28",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.10.9",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -20,10 +20,5 @@ pest_derive = { workspace = true }
|
|||
thiserror = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
ahash = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -1,379 +0,0 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use reqwest::Client;
|
||||
use serde::Deserialize;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::error::{NanoError, Result};
|
||||
|
||||
const DEFAULT_EMBED_MODEL: &str = "text-embedding-3-small";
|
||||
const DEFAULT_OPENAI_BASE_URL: &str = "https://api.openai.com/v1";
|
||||
const DEFAULT_TIMEOUT_MS: u64 = 30_000;
|
||||
const DEFAULT_RETRY_ATTEMPTS: usize = 4;
|
||||
const DEFAULT_RETRY_BACKOFF_MS: u64 = 200;
|
||||
|
||||
#[derive(Clone)]
|
||||
enum EmbeddingTransport {
|
||||
Mock,
|
||||
OpenAi {
|
||||
api_key: String,
|
||||
base_url: String,
|
||||
http: Client,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct EmbeddingClient {
|
||||
model: String,
|
||||
retry_attempts: usize,
|
||||
retry_backoff_ms: u64,
|
||||
transport: EmbeddingTransport,
|
||||
}
|
||||
|
||||
struct EmbedCallError {
|
||||
message: String,
|
||||
retryable: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct OpenAiEmbeddingResponse {
|
||||
data: Vec<OpenAiEmbeddingDatum>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct OpenAiEmbeddingDatum {
|
||||
index: usize,
|
||||
embedding: Vec<f32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct OpenAiErrorEnvelope {
|
||||
error: OpenAiErrorBody,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct OpenAiErrorBody {
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl EmbeddingClient {
|
||||
pub(crate) fn from_env() -> Result<Self> {
|
||||
let model = std::env::var("NANOGRAPH_EMBED_MODEL")
|
||||
.ok()
|
||||
.map(|v| v.trim().to_string())
|
||||
.filter(|v| !v.is_empty())
|
||||
.unwrap_or_else(|| DEFAULT_EMBED_MODEL.to_string());
|
||||
let retry_attempts =
|
||||
parse_env_usize("NANOGRAPH_EMBED_RETRY_ATTEMPTS", DEFAULT_RETRY_ATTEMPTS);
|
||||
let retry_backoff_ms =
|
||||
parse_env_u64("NANOGRAPH_EMBED_RETRY_BACKOFF_MS", DEFAULT_RETRY_BACKOFF_MS);
|
||||
|
||||
if env_flag("NANOGRAPH_EMBEDDINGS_MOCK") {
|
||||
return Ok(Self {
|
||||
model,
|
||||
retry_attempts,
|
||||
retry_backoff_ms,
|
||||
transport: EmbeddingTransport::Mock,
|
||||
});
|
||||
}
|
||||
|
||||
let api_key = std::env::var("OPENAI_API_KEY")
|
||||
.ok()
|
||||
.map(|v| v.trim().to_string())
|
||||
.filter(|v| !v.is_empty())
|
||||
.ok_or_else(|| {
|
||||
NanoError::Execution(
|
||||
"OPENAI_API_KEY is required when an embedding call is needed".to_string(),
|
||||
)
|
||||
})?;
|
||||
let base_url = std::env::var("OPENAI_BASE_URL")
|
||||
.ok()
|
||||
.map(|v| v.trim_end_matches('/').to_string())
|
||||
.filter(|v| !v.is_empty())
|
||||
.unwrap_or_else(|| DEFAULT_OPENAI_BASE_URL.to_string());
|
||||
let timeout_ms = parse_env_u64("NANOGRAPH_EMBED_TIMEOUT_MS", DEFAULT_TIMEOUT_MS);
|
||||
let http = Client::builder()
|
||||
.timeout(Duration::from_millis(timeout_ms))
|
||||
.build()
|
||||
.map_err(|e| {
|
||||
NanoError::Execution(format!("failed to initialize HTTP client: {}", e))
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
model,
|
||||
retry_attempts,
|
||||
retry_backoff_ms,
|
||||
transport: EmbeddingTransport::OpenAi {
|
||||
api_key,
|
||||
base_url,
|
||||
http,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn mock_for_tests() -> Self {
|
||||
Self {
|
||||
model: DEFAULT_EMBED_MODEL.to_string(),
|
||||
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
|
||||
retry_backoff_ms: DEFAULT_RETRY_BACKOFF_MS,
|
||||
transport: EmbeddingTransport::Mock,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn model(&self) -> &str {
|
||||
&self.model
|
||||
}
|
||||
|
||||
pub(crate) async fn embed_text(&self, input: &str, expected_dim: usize) -> Result<Vec<f32>> {
|
||||
let mut vectors = self.embed_texts(&[input.to_string()], expected_dim).await?;
|
||||
vectors.pop().ok_or_else(|| {
|
||||
NanoError::Execution("embedding provider returned no vector".to_string())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn embed_texts(
|
||||
&self,
|
||||
inputs: &[String],
|
||||
expected_dim: usize,
|
||||
) -> Result<Vec<Vec<f32>>> {
|
||||
if expected_dim == 0 {
|
||||
return Err(NanoError::Execution(
|
||||
"embedding dimension must be greater than zero".to_string(),
|
||||
));
|
||||
}
|
||||
if inputs.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
match &self.transport {
|
||||
EmbeddingTransport::Mock => Ok(inputs
|
||||
.iter()
|
||||
.map(|input| mock_embedding(input, expected_dim))
|
||||
.collect()),
|
||||
EmbeddingTransport::OpenAi { .. } => {
|
||||
self.embed_texts_openai_with_retry(inputs, expected_dim)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn embed_texts_openai_with_retry(
|
||||
&self,
|
||||
inputs: &[String],
|
||||
expected_dim: usize,
|
||||
) -> Result<Vec<Vec<f32>>> {
|
||||
let max_attempt = self.retry_attempts.max(1);
|
||||
let mut attempt = 0usize;
|
||||
loop {
|
||||
attempt += 1;
|
||||
match self.embed_texts_openai_once(inputs, expected_dim).await {
|
||||
Ok(vectors) => return Ok(vectors),
|
||||
Err(err) => {
|
||||
if !err.retryable || attempt >= max_attempt {
|
||||
return Err(NanoError::Execution(err.message));
|
||||
}
|
||||
let shift = (attempt - 1).min(10) as u32;
|
||||
let delay = self.retry_backoff_ms.saturating_mul(1u64 << shift);
|
||||
sleep(Duration::from_millis(delay)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn embed_texts_openai_once(
|
||||
&self,
|
||||
inputs: &[String],
|
||||
expected_dim: usize,
|
||||
) -> std::result::Result<Vec<Vec<f32>>, EmbedCallError> {
|
||||
let (api_key, base_url, http) = match &self.transport {
|
||||
EmbeddingTransport::OpenAi {
|
||||
api_key,
|
||||
base_url,
|
||||
http,
|
||||
} => (api_key, base_url, http),
|
||||
EmbeddingTransport::Mock => unreachable!("mock transport should not call OpenAI"),
|
||||
};
|
||||
|
||||
let request = serde_json::json!({
|
||||
"model": self.model,
|
||||
"input": inputs,
|
||||
"dimensions": expected_dim,
|
||||
});
|
||||
let url = format!("{}/embeddings", base_url);
|
||||
let response = http
|
||||
.post(&url)
|
||||
.bearer_auth(api_key)
|
||||
.json(&request)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let response = match response {
|
||||
Ok(resp) => resp,
|
||||
Err(err) => {
|
||||
let retryable = err.is_timeout() || err.is_connect() || err.is_request();
|
||||
return Err(EmbedCallError {
|
||||
message: format!("embedding request failed: {}", err),
|
||||
retryable,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let status = response.status();
|
||||
let body = match response.text().await {
|
||||
Ok(body) => body,
|
||||
Err(err) => {
|
||||
return Err(EmbedCallError {
|
||||
message: format!(
|
||||
"embedding response read failed (status {}): {}",
|
||||
status, err
|
||||
),
|
||||
retryable: status.is_server_error() || status.as_u16() == 429,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
if !status.is_success() {
|
||||
let message = parse_openai_error_message(&body).unwrap_or_else(|| body.clone());
|
||||
return Err(EmbedCallError {
|
||||
message: format!(
|
||||
"embedding request failed with status {}: {}",
|
||||
status, message
|
||||
),
|
||||
retryable: status.is_server_error() || status.as_u16() == 429,
|
||||
});
|
||||
}
|
||||
|
||||
let mut parsed: OpenAiEmbeddingResponse =
|
||||
serde_json::from_str(&body).map_err(|err| EmbedCallError {
|
||||
message: format!("embedding response decode failed: {}", err),
|
||||
retryable: false,
|
||||
})?;
|
||||
|
||||
if parsed.data.len() != inputs.len() {
|
||||
return Err(EmbedCallError {
|
||||
message: format!(
|
||||
"embedding response size mismatch: expected {}, got {}",
|
||||
inputs.len(),
|
||||
parsed.data.len()
|
||||
),
|
||||
retryable: false,
|
||||
});
|
||||
}
|
||||
|
||||
parsed.data.sort_by_key(|item| item.index);
|
||||
let mut vectors = Vec::with_capacity(parsed.data.len());
|
||||
for (idx, item) in parsed.data.into_iter().enumerate() {
|
||||
if item.index != idx {
|
||||
return Err(EmbedCallError {
|
||||
message: format!(
|
||||
"embedding response index mismatch at position {}: got {}",
|
||||
idx, item.index
|
||||
),
|
||||
retryable: false,
|
||||
});
|
||||
}
|
||||
if item.embedding.len() != expected_dim {
|
||||
return Err(EmbedCallError {
|
||||
message: format!(
|
||||
"embedding dimension mismatch: expected {}, got {}",
|
||||
expected_dim,
|
||||
item.embedding.len()
|
||||
),
|
||||
retryable: false,
|
||||
});
|
||||
}
|
||||
vectors.push(item.embedding);
|
||||
}
|
||||
Ok(vectors)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_openai_error_message(body: &str) -> Option<String> {
|
||||
serde_json::from_str::<OpenAiErrorEnvelope>(body)
|
||||
.ok()
|
||||
.map(|e| e.error.message)
|
||||
.filter(|msg| !msg.trim().is_empty())
|
||||
}
|
||||
|
||||
fn parse_env_usize(name: &str, default: usize) -> usize {
|
||||
std::env::var(name)
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<usize>().ok())
|
||||
.filter(|v| *v > 0)
|
||||
.unwrap_or(default)
|
||||
}
|
||||
|
||||
fn parse_env_u64(name: &str, default: u64) -> u64 {
|
||||
std::env::var(name)
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
.filter(|v| *v > 0)
|
||||
.unwrap_or(default)
|
||||
}
|
||||
|
||||
fn env_flag(name: &str) -> bool {
|
||||
std::env::var(name)
|
||||
.ok()
|
||||
.map(|v| {
|
||||
let s = v.trim().to_ascii_lowercase();
|
||||
s == "1" || s == "true" || s == "yes" || s == "on"
|
||||
})
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn mock_embedding(input: &str, dim: usize) -> Vec<f32> {
|
||||
let mut seed = fnv1a64(input.as_bytes());
|
||||
let mut out = Vec::with_capacity(dim);
|
||||
for _ in 0..dim {
|
||||
seed = xorshift64(seed);
|
||||
let ratio = (seed as f64 / u64::MAX as f64) as f32;
|
||||
out.push((ratio * 2.0) - 1.0);
|
||||
}
|
||||
|
||||
let norm = out
|
||||
.iter()
|
||||
.map(|v| (*v as f64) * (*v as f64))
|
||||
.sum::<f64>()
|
||||
.sqrt() as f32;
|
||||
if norm > f32::EPSILON {
|
||||
for value in &mut out {
|
||||
*value /= norm;
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
fn fnv1a64(bytes: &[u8]) -> u64 {
|
||||
let mut hash = 14695981039346656037u64;
|
||||
for byte in bytes {
|
||||
hash ^= *byte as u64;
|
||||
hash = hash.wrapping_mul(1099511628211u64);
|
||||
}
|
||||
hash
|
||||
}
|
||||
|
||||
fn xorshift64(mut x: u64) -> u64 {
|
||||
x ^= x << 13;
|
||||
x ^= x >> 7;
|
||||
x ^= x << 17;
|
||||
x
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn mock_embeddings_are_deterministic() {
|
||||
let client = EmbeddingClient::mock_for_tests();
|
||||
let a = client.embed_text("alpha", 8).await.unwrap();
|
||||
let b = client.embed_text("alpha", 8).await.unwrap();
|
||||
let c = client.embed_text("beta", 8).await.unwrap();
|
||||
assert_eq!(a, b);
|
||||
assert_ne!(a, c);
|
||||
assert_eq!(a.len(), 8);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,4 @@
|
|||
pub mod catalog;
|
||||
pub mod embedding;
|
||||
pub mod error;
|
||||
pub mod ir;
|
||||
pub mod json_output;
|
||||
|
|
|
|||
|
|
@ -1,28 +1,31 @@
|
|||
# Embeddings
|
||||
|
||||
OmniGraph has **two** embedding clients with different defaults and purposes.
|
||||
OmniGraph embeds text through a **single engine-side client** (Google Gemini). It is used in two places: the
|
||||
query-time auto-embed of a string passed to `nearest($v, "string")`, and the offline `omnigraph embed` file
|
||||
pipeline. Both paths use the same client, so query vectors and CLI-produced document vectors share one model
|
||||
and one vector space.
|
||||
|
||||
## Compiler-side client — query-time normalization
|
||||
|
||||
- Default model: `text-embedding-3-small` (OpenAI-style schema)
|
||||
- Env: `NANOGRAPH_EMBED_MODEL`, `OPENAI_API_KEY`, `OPENAI_BASE_URL` (default `https://api.openai.com/v1`), `NANOGRAPH_EMBEDDINGS_MOCK`, `NANOGRAPH_EMBED_TIMEOUT_MS=30000`, `NANOGRAPH_EMBED_RETRY_ATTEMPTS=4`, `NANOGRAPH_EMBED_RETRY_BACKOFF_MS=200`
|
||||
- Methods: `embed_text(input, expected_dim)`, `embed_texts(inputs, expected_dim)`
|
||||
- Mock mode: deterministic FNV-1a + xorshift64 → L2-normalized vectors
|
||||
|
||||
## Engine-side client — runtime ingest
|
||||
## Engine embedding client
|
||||
|
||||
- Model: `gemini-embedding-2-preview`
|
||||
- Env: `GEMINI_API_KEY`, `OMNIGRAPH_GEMINI_BASE_URL` (default Google generativelanguage v1beta), `OMNIGRAPH_EMBED_TIMEOUT_MS=30000`, `OMNIGRAPH_EMBED_RETRY_ATTEMPTS=4`, `OMNIGRAPH_EMBED_RETRY_BACKOFF_MS=200`, `OMNIGRAPH_EMBEDDINGS_MOCK`
|
||||
- Two task types: `embed_query_text` (RETRIEVAL_QUERY) and `embed_document_text` (RETRIEVAL_DOCUMENT)
|
||||
- Two task types: `embed_query_text` (RETRIEVAL_QUERY) for query strings and `embed_document_text` (RETRIEVAL_DOCUMENT) for stored documents
|
||||
- Exponential backoff with retryable detection (timeouts, 429, 5xx)
|
||||
- Vectors are stored as L2-normalized `FixedSizeList(Float32, dim)`; the requested dimension is driven by the target column width
|
||||
|
||||
## Schema integration
|
||||
## `@embed` schema annotation
|
||||
|
||||
Mark a Vector property with `@embed("source_text_property")`. At ingest, the engine pulls the source text and writes the embedding into the vector column. Stored as L2-normalized FixedSizeList(Float32, dim).
|
||||
Mark a Vector property with `@embed("source_text_property")`. Today this is a **catalog annotation** consumed
|
||||
by the query typechecker and linter: it records which String property is the embedding source and lets
|
||||
`nearest($v, "string")` auto-embed a query string for comparison against that vector column.
|
||||
|
||||
**It does not embed at ingest.** Stored vectors are supplied directly in your load data, or pre-filled by the
|
||||
offline `omnigraph embed` pipeline below. (Ingest-time execution of `@embed` is a planned enhancement; until
|
||||
it ships, populate the vector column yourself.)
|
||||
|
||||
## CLI `omnigraph embed` (offline file pipeline)
|
||||
|
||||
Operates on **JSONL files** (not on a graph). Three modes (mutually exclusive):
|
||||
Operates on **JSONL files** (not on a graph), using the same engine client. Three modes (mutually exclusive):
|
||||
|
||||
- (default) `fill_missing` — only embed rows whose target field is empty
|
||||
- `--reembed-all` — overwrite all
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue