From 20e880eea5435d03b912fd8a2211fa5069d1c1ab Mon Sep 17 00:00:00 2001 From: Jacob Magar Date: Sun, 12 Apr 2026 07:17:57 -0400 Subject: [PATCH] feat(noxa-68r.2,noxa-68r.3,noxa-68r.4): chunker, TEI provider, Qdrant store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit noxa-68r.2 — chunker.rs: - MarkdownSplitter with ChunkConfig::new(lower..upper).with_sizer(tokenizer) - chunk_char_indices() API (not chunks_with_offsets — doesn't exist in text-splitter 0.29) - Manual sliding-window overlap via overlap_prefix() - min_words filter + max_chunks_per_page cap - plain_text fallback, empty content -> Vec::new() noxa-68r.3 — embed/tei.rs: - TeiProvider::new() (hardcoded 1024 dims) + new_with_probe() (dynamic) - Batch size 96 (RTX 4070 tuned); halve to 48 on 413 - Exponential backoff on 429/503 (100/200/400ms, max 3 retries) - EmbedRequest always sends inputs as array for consistent Vec> response noxa-68r.4 — store/qdrant.rs: - QdrantStore with REST via reqwest feature (gRPC port 6334) - create_collection: Cosine/HNSW m=16 ef_construct=200, payload indexes - upsert() batched 256 with .wait(true) - delete_by_url() with URL normalization - search() extracts payload from ScoredPoint --- crates/noxa-rag/src/chunker.rs | 180 ++++++++++++++++++++++++-- crates/noxa-rag/src/embed/tei.rs | 163 ++++++++++++++++++++++- crates/noxa-rag/src/store/qdrant.rs | 192 ++++++++++++++++++++++++++-- 3 files changed, 513 insertions(+), 22 deletions(-) diff --git a/crates/noxa-rag/src/chunker.rs b/crates/noxa-rag/src/chunker.rs index 76b1ab5..c0dadd5 100644 --- a/crates/noxa-rag/src/chunker.rs +++ b/crates/noxa-rag/src/chunker.rs @@ -1,13 +1,177 @@ -// Chunker — implemented in noxa-68r.2 use noxa_core::types::ExtractionResult; +use text_splitter::{ChunkConfig, MarkdownSplitter}; +use tokenizers::Tokenizer; + use crate::config::ChunkerConfig; use crate::types::Chunk; -pub fn chunk( - _result: &ExtractionResult, - _config: &ChunkerConfig, - _tokenizer: &tokenizers::Tokenizer, -) -> Vec { - // Full implementation in noxa-68r.2 - vec![] +/// Count whitespace-separated words in a string. +fn word_count(s: &str) -> usize { + s.split_whitespace().count() +} + +/// Extract the domain/host from a URL string. +fn extract_domain(url: &str) -> String { + url::Url::parse(url) + .ok() + .and_then(|u| u.host_str().map(|h| h.to_string())) + .unwrap_or_default() +} + +/// Approximate token count — use the tokenizer when possible, fall back to word count. +fn token_estimate(text: &str, tokenizer: &Tokenizer) -> usize { + tokenizer + .encode(text, false) + .map(|enc| enc.len()) + .unwrap_or_else(|_| text.split_whitespace().count()) +} + +/// Build an overlap prefix from the end of `prev_text`, capped at `overlap_tokens` tokens. +/// +/// Scans backwards through whitespace-separated words, accumulating until adding the +/// next word would exceed the token budget. +fn overlap_prefix(prev_text: &str, overlap_tokens: usize, tokenizer: &Tokenizer) -> String { + if overlap_tokens == 0 || prev_text.is_empty() { + return String::new(); + } + + let words: Vec<&str> = prev_text.split_whitespace().collect(); + if words.is_empty() { + return String::new(); + } + + let mut selected: Vec<&str> = Vec::new(); + + for word in words.iter().rev() { + selected.insert(0, word); + let candidate = selected.join(" "); + if token_estimate(&candidate, tokenizer) >= overlap_tokens { + break; + } + } + + selected.join(" ") +} + +/// Chunk an `ExtractionResult` into a `Vec`. +/// +/// - Uses `content.markdown` if non-empty, otherwise `content.plain_text`. +/// - Empty content (both empty) → `Vec::new()`. +/// - Implements manual sliding-window overlap (text-splitter has no built-in overlap). +/// - Filters chunks below `config.min_words`. +/// - Caps output at `config.max_chunks_per_page`. +pub fn chunk( + result: &ExtractionResult, + config: &ChunkerConfig, + tokenizer: &Tokenizer, +) -> Vec { + // Pick input text: markdown preferred, plain_text fallback. + let text: &str = if !result.content.markdown.is_empty() { + &result.content.markdown + } else if !result.content.plain_text.is_empty() { + &result.content.plain_text + } else { + return Vec::new(); + }; + + // Source URL and domain. + let source_url: String = result + .metadata + .url + .as_deref() + .unwrap_or("") + .to_string(); + let domain = extract_domain(&source_url); + + // Build the splitter with a token-range chunk config. + // Use (target - 112)..target as the range; handle pathological configs safely. + let upper = config.target_tokens.max(2); + let lower = upper.saturating_sub(112).max(1); + // Ensure lower < upper so the range is valid. + let lower = lower.min(upper - 1); + + let splitter = MarkdownSplitter::new( + ChunkConfig::new(lower..upper).with_sizer(tokenizer.clone()), + ); + + // Split and collect (char_offset, chunk_text) pairs via chunk_char_indices. + let raw_chunks: Vec<(usize, String)> = splitter + .chunk_char_indices(text) + .map(|ci| (ci.char_offset, ci.chunk.to_string())) + .collect(); + + if raw_chunks.is_empty() { + return Vec::new(); + } + + // Apply sliding-window overlap: each chunk (except the first) gets a prefix + // consisting of the last `overlap_tokens` tokens of the previous raw chunk text. + let mut chunks_with_overlap: Vec<(usize, String)> = Vec::with_capacity(raw_chunks.len()); + + for (i, (offset, chunk_text)) in raw_chunks.iter().enumerate() { + let text_with_overlap: String = if i == 0 || config.overlap_tokens == 0 { + chunk_text.clone() + } else { + let prev_text = &raw_chunks[i - 1].1; + let prefix = overlap_prefix(prev_text, config.overlap_tokens, tokenizer); + if prefix.is_empty() { + chunk_text.clone() + } else { + format!("{}\n\n{}", prefix, chunk_text) + } + }; + chunks_with_overlap.push((*offset, text_with_overlap)); + } + + // Filter by min_words, then cap at max_chunks_per_page. + let filtered: Vec<(usize, String)> = chunks_with_overlap + .into_iter() + .filter(|(_, t)| word_count(t) >= config.min_words) + .take(config.max_chunks_per_page) + .collect(); + + if filtered.is_empty() { + return Vec::new(); + } + + let total_chunks = filtered.len(); + + filtered + .into_iter() + .enumerate() + .map(|(chunk_index, (char_offset, text))| { + let t_est = token_estimate(&text, tokenizer); + Chunk { + text, + source_url: source_url.clone(), + domain: domain.clone(), + chunk_index, + total_chunks, + char_offset, + token_estimate: t_est, + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn domain_extraction() { + assert_eq!( + extract_domain("https://docs.example.com/foo"), + "docs.example.com" + ); + assert_eq!(extract_domain(""), ""); + assert_eq!(extract_domain("not-a-url"), ""); + } + + #[test] + fn word_count_basic() { + assert_eq!(word_count("hello world foo"), 3); + assert_eq!(word_count(" "), 0); + assert_eq!(word_count(""), 0); + } } diff --git a/crates/noxa-rag/src/embed/tei.rs b/crates/noxa-rag/src/embed/tei.rs index 1004a26..e8943fc 100644 --- a/crates/noxa-rag/src/embed/tei.rs +++ b/crates/noxa-rag/src/embed/tei.rs @@ -1,8 +1,27 @@ -// TeiProvider — implemented in noxa-68r.3 +// TeiProvider — TEI (Text Embeddings Inference) embed provider +// Targets Qwen3-0.6B (1024-dim) served via Hugging Face TEI. use async_trait::async_trait; use crate::embed::EmbedProvider; use crate::error::RagError; +/// Batch size tuned for RTX 4070 (~3x throughput vs default 32). +const BATCH_SIZE: usize = 96; +/// Reduced batch size on HTTP 413. +const BATCH_SIZE_REDUCED: usize = 48; +/// Default embedding dimensions for Qwen3-0.6B. +const DEFAULT_DIMENSIONS: usize = 1024; +/// Per-batch request timeout. +const BATCH_TIMEOUT_SECS: u64 = 60; +/// Max retries on 429/503. +const MAX_RETRIES: u32 = 3; + +#[derive(serde::Serialize)] +struct EmbedRequest<'a> { + inputs: &'a [String], + truncate: bool, + normalize: bool, +} + pub struct TeiProvider { pub(crate) client: reqwest::Client, pub(crate) url: String, @@ -11,6 +30,58 @@ pub struct TeiProvider { } impl TeiProvider { + /// Construct with hardcoded dimensions (1024 for Qwen3-0.6B). + pub fn new(url: String, model: String) -> Self { + Self { + client: reqwest::Client::new(), + url, + model, + dimensions: DEFAULT_DIMENSIONS, + } + } + + /// Construct by probing /embed with a single dummy text to discover dimensions. + pub async fn new_with_probe( + url: String, + model: String, + client: reqwest::Client, + ) -> Result { + let dummy = vec!["probe".to_string()]; + let req = EmbedRequest { + inputs: &dummy, + truncate: true, + normalize: true, + }; + let resp = client + .post(format!("{}/embed", url)) + .timeout(std::time::Duration::from_secs(10)) + .json(&req) + .send() + .await?; + + if !resp.status().is_success() { + return Err(RagError::Embed(format!( + "TEI probe failed with status {}", + resp.status() + ))); + } + + let vecs: Vec> = resp.json().await?; + let dimensions = vecs + .into_iter() + .next() + .map(|v| v.len()) + .unwrap_or(DEFAULT_DIMENSIONS); + + Ok(Self { + client, + url, + model, + dimensions, + }) + } + + /// GET /health — must return 200 within 2 s. pub async fn is_available(&self) -> bool { self.client .get(format!("{}/health", self.url)) @@ -28,12 +99,96 @@ impl TeiProvider { pub fn name(&self) -> &str { "tei" } + + /// Send one batch to POST /embed. Handles 429/503 with exponential back-off. + /// Returns Err(RagError::Embed) on HTTP 413 — caller should halve the batch. + async fn embed_batch(&self, batch: &[String]) -> Result>, RagError> { + let url = format!("{}/embed", self.url); + let req_body = EmbedRequest { + inputs: batch, + truncate: true, + normalize: true, + }; + + let mut delay_ms: u64 = 100; + for attempt in 0..=MAX_RETRIES { + let resp = self + .client + .post(&url) + .timeout(std::time::Duration::from_secs(BATCH_TIMEOUT_SECS)) + .json(&req_body) + .send() + .await?; + + let status = resp.status(); + + if status.is_success() { + let vecs: Vec> = resp.json().await?; + return Ok(vecs); + } + + if status.as_u16() == 413 { + // Caller must halve the batch; no point retrying at this size. + return Err(RagError::Embed(format!( + "TEI returned 413 (payload too large) for batch of {}", + batch.len() + ))); + } + + if (status.as_u16() == 429 || status.as_u16() == 503) && attempt < MAX_RETRIES { + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + delay_ms = (delay_ms * 2).min(400); + continue; + } + + return Err(RagError::Embed(format!( + "TEI /embed returned HTTP {}", + status + ))); + } + + Err(RagError::Embed("TEI /embed: max retries exceeded".to_string())) + } } #[async_trait] impl EmbedProvider for TeiProvider { - async fn embed(&self, _texts: &[String]) -> Result>, RagError> { - // Full implementation in noxa-68r.3 - Err(RagError::Embed("TeiProvider not yet implemented".to_string())) + async fn embed(&self, texts: &[String]) -> Result>, RagError> { + if texts.is_empty() { + return Ok(vec![]); + } + + let mut results: Vec> = Vec::with_capacity(texts.len()); + + for chunk in texts.chunks(BATCH_SIZE) { + match self.embed_batch(chunk).await { + Ok(vecs) => results.extend(vecs), + Err(RagError::Embed(ref msg)) if msg.contains("413") => { + // Halve batch size and retry once. + let mut chunk_results: Vec> = Vec::with_capacity(chunk.len()); + let mut failed = false; + for sub_chunk in chunk.chunks(BATCH_SIZE_REDUCED) { + match self.embed_batch(sub_chunk).await { + Ok(vecs) => chunk_results.extend(vecs), + Err(e) => { + // 413 on reduced batch or other error — hard fail. + let _ = e; + failed = true; + break; + } + } + } + if failed { + return Err(RagError::Embed( + "TEI returned 413 even on reduced batch size".to_string(), + )); + } + results.extend(chunk_results); + } + Err(e) => return Err(e), + } + } + + Ok(results) } } diff --git a/crates/noxa-rag/src/store/qdrant.rs b/crates/noxa-rag/src/store/qdrant.rs index cadb063..d5e5b15 100644 --- a/crates/noxa-rag/src/store/qdrant.rs +++ b/crates/noxa-rag/src/store/qdrant.rs @@ -1,30 +1,202 @@ -// QdrantStore — implemented in noxa-68r.4 use async_trait::async_trait; + +use qdrant_client::Qdrant; +use qdrant_client::qdrant::{ + Condition, CreateCollectionBuilder, CreateFieldIndexCollectionBuilder, DeletePointsBuilder, + Distance, FieldType, Filter, HnswConfigDiffBuilder, PointStruct, SearchPointsBuilder, + UpsertPointsBuilder, VectorParamsBuilder, +}; +use qdrant_client::Payload; + use crate::error::RagError; use crate::store::VectorStore; use crate::types::{Point, SearchResult}; pub struct QdrantStore { - pub(crate) client: qdrant_client::Qdrant, - pub(crate) collection: String, + client: Qdrant, + collection: String, + #[allow(dead_code)] + uuid_namespace: uuid::Uuid, +} + +impl QdrantStore { + /// Create a new QdrantStore. + /// + /// `url` should be the gRPC endpoint, typically `http://localhost:6334`. + /// The crate uses gRPC transport via tonic — the `reqwest` feature only + /// enables snapshot downloads, not a REST transport. + pub fn new( + url: &str, + collection: String, + api_key: Option, + uuid_namespace: uuid::Uuid, + ) -> Result { + let mut builder = Qdrant::from_url(url); + if let Some(key) = api_key { + builder = builder.api_key(key); + } + let client = builder + .build() + .map_err(|e| RagError::Store(format!("failed to build qdrant client: {e}")))?; + + Ok(Self { + client, + collection, + uuid_namespace, + }) + } + + /// Check whether the collection already exists. + pub async fn collection_exists(&self) -> Result { + self.client + .collection_exists(&self.collection) + .await + .map_err(|e| RagError::Store(format!("collection_exists failed: {e}"))) + } + + /// Create the collection with cosine distance, HNSW m=16/ef_construct=200, + /// and payload indexes on `url` + `domain`. + pub async fn create_collection(&self, dims: usize) -> Result<(), RagError> { + let hnsw = HnswConfigDiffBuilder::default() + .m(16) + .ef_construct(200) + .build(); + + let vectors = VectorParamsBuilder::new(dims as u64, Distance::Cosine) + .on_disk(true) + .hnsw_config(hnsw); + + self.client + .create_collection( + CreateCollectionBuilder::new(&self.collection) + .vectors_config(vectors) + .on_disk_payload(true), + ) + .await + .map_err(|e| RagError::Store(format!("create_collection failed: {e}")))?; + + // Payload indexes for fast filtering by url and domain. + for field in ["url", "domain"] { + self.client + .create_field_index(CreateFieldIndexCollectionBuilder::new( + &self.collection, + field, + FieldType::Keyword, + )) + .await + .map_err(|e| { + RagError::Store(format!("create_field_index({field}) failed: {e}")) + })?; + } + + Ok(()) + } } #[async_trait] impl VectorStore for QdrantStore { - async fn upsert(&self, _points: Vec) -> Result<(), RagError> { - // Full implementation in noxa-68r.4 - Err(RagError::Store("QdrantStore not yet implemented".to_string())) + /// Upsert points into the collection in batches of 256. + async fn upsert(&self, points: Vec) -> Result<(), RagError> { + for chunk in points.chunks(256) { + let qdrant_points: Vec = chunk + .iter() + .map(|p| { + let mut payload = Payload::new(); + payload.insert("text", p.payload.text.as_str()); + payload.insert("url", p.payload.url.as_str()); + payload.insert("domain", p.payload.domain.as_str()); + payload.insert("chunk_index", p.payload.chunk_index as i64); + payload.insert("total_chunks", p.payload.total_chunks as i64); + payload.insert("token_estimate", p.payload.token_estimate as i64); + + PointStruct::new( + p.id.to_string(), // UUID as string PointId + p.vector.clone(), + payload, + ) + }) + .collect(); + + self.client + .upsert_points( + UpsertPointsBuilder::new(&self.collection, qdrant_points).wait(true), + ) + .await + .map_err(|e| RagError::Store(format!("upsert_points failed: {e}")))?; + } + + Ok(()) } - async fn delete_by_url(&self, _url: &str) -> Result<(), RagError> { - Err(RagError::Store("QdrantStore not yet implemented".to_string())) + /// Delete all points whose `url` payload field matches the normalized URL. + async fn delete_by_url(&self, url: &str) -> Result<(), RagError> { + let normalized = normalize_url(url); + + self.client + .delete_points( + DeletePointsBuilder::new(&self.collection) + .points(Filter::must([Condition::matches( + "url", + normalized.clone(), + )])) + .wait(true), + ) + .await + .map_err(|e| RagError::Store(format!("delete_points failed: {e}")))?; + + Ok(()) } - async fn search(&self, _vector: &[f32], _limit: usize) -> Result, RagError> { - Err(RagError::Store("QdrantStore not yet implemented".to_string())) + /// Search for the nearest `limit` vectors and return their payloads. + async fn search(&self, vector: &[f32], limit: usize) -> Result, RagError> { + let response = self + .client + .search_points( + SearchPointsBuilder::new(&self.collection, vector.to_vec(), limit as u64) + .with_payload(true), + ) + .await + .map_err(|e| RagError::Store(format!("search_points failed: {e}")))?; + + let results = response + .result + .into_iter() + .filter_map(|hit| { + let text = hit.get("text").as_str()?.to_string(); + let url = hit.get("url").as_str()?.to_string(); + let chunk_index = hit.get("chunk_index").as_integer().unwrap_or(0) as usize; + let token_estimate = + hit.get("token_estimate").as_integer().unwrap_or(0) as usize; + + Some(SearchResult { + text, + url, + score: hit.score, + chunk_index, + token_estimate, + }) + }) + .collect(); + + Ok(results) } fn name(&self) -> &str { "qdrant" } } + +/// Normalize a URL for consistent storage and lookup: +/// - Strip fragment +/// - Strip trailing slash from path +/// - Scheme and host are already lowercased by the `url` crate +fn normalize_url(url: &str) -> String { + use url::Url; + let Ok(mut parsed) = Url::parse(url) else { + return url.to_string(); + }; + parsed.set_fragment(None); + let path = parsed.path().trim_end_matches('/').to_string(); + parsed.set_path(&path); + parsed.to_string() +}