feat(noxa-68r.2,noxa-68r.3,noxa-68r.4): chunker, TEI provider, Qdrant store

noxa-68r.2 — chunker.rs:
- MarkdownSplitter with ChunkConfig::new(lower..upper).with_sizer(tokenizer)
- chunk_char_indices() API (not chunks_with_offsets — doesn't exist in text-splitter 0.29)
- Manual sliding-window overlap via overlap_prefix()
- min_words filter + max_chunks_per_page cap
- plain_text fallback, empty content -> Vec::new()

noxa-68r.3 — embed/tei.rs:
- TeiProvider::new() (hardcoded 1024 dims) + new_with_probe() (dynamic)
- Batch size 96 (RTX 4070 tuned); halve to 48 on 413
- Exponential backoff on 429/503 (100/200/400ms, max 3 retries)
- EmbedRequest always sends inputs as array for consistent Vec<Vec<f32>> response

noxa-68r.4 — store/qdrant.rs:
- QdrantStore with REST via reqwest feature (gRPC port 6334)
- create_collection: Cosine/HNSW m=16 ef_construct=200, payload indexes
- upsert() batched 256 with .wait(true)
- delete_by_url() with URL normalization
- search() extracts payload from ScoredPoint
This commit is contained in:
Jacob Magar 2026-04-12 07:17:57 -04:00
parent 62554b8f12
commit 20e880eea5
3 changed files with 513 additions and 22 deletions

View file

@ -1,13 +1,177 @@
// Chunker — implemented in noxa-68r.2
use noxa_core::types::ExtractionResult;
use text_splitter::{ChunkConfig, MarkdownSplitter};
use tokenizers::Tokenizer;
use crate::config::ChunkerConfig;
use crate::types::Chunk;
pub fn chunk(
_result: &ExtractionResult,
_config: &ChunkerConfig,
_tokenizer: &tokenizers::Tokenizer,
) -> Vec<Chunk> {
// Full implementation in noxa-68r.2
vec![]
/// Count whitespace-separated words in a string.
fn word_count(s: &str) -> usize {
s.split_whitespace().count()
}
/// Extract the domain/host from a URL string.
fn extract_domain(url: &str) -> String {
url::Url::parse(url)
.ok()
.and_then(|u| u.host_str().map(|h| h.to_string()))
.unwrap_or_default()
}
/// Approximate token count — use the tokenizer when possible, fall back to word count.
fn token_estimate(text: &str, tokenizer: &Tokenizer) -> usize {
tokenizer
.encode(text, false)
.map(|enc| enc.len())
.unwrap_or_else(|_| text.split_whitespace().count())
}
/// Build an overlap prefix from the end of `prev_text`, capped at `overlap_tokens` tokens.
///
/// Scans backwards through whitespace-separated words, accumulating until adding the
/// next word would exceed the token budget.
fn overlap_prefix(prev_text: &str, overlap_tokens: usize, tokenizer: &Tokenizer) -> String {
if overlap_tokens == 0 || prev_text.is_empty() {
return String::new();
}
let words: Vec<&str> = prev_text.split_whitespace().collect();
if words.is_empty() {
return String::new();
}
let mut selected: Vec<&str> = Vec::new();
for word in words.iter().rev() {
selected.insert(0, word);
let candidate = selected.join(" ");
if token_estimate(&candidate, tokenizer) >= overlap_tokens {
break;
}
}
selected.join(" ")
}
/// Chunk an `ExtractionResult` into a `Vec<Chunk>`.
///
/// - Uses `content.markdown` if non-empty, otherwise `content.plain_text`.
/// - Empty content (both empty) → `Vec::new()`.
/// - Implements manual sliding-window overlap (text-splitter has no built-in overlap).
/// - Filters chunks below `config.min_words`.
/// - Caps output at `config.max_chunks_per_page`.
pub fn chunk(
result: &ExtractionResult,
config: &ChunkerConfig,
tokenizer: &Tokenizer,
) -> Vec<Chunk> {
// Pick input text: markdown preferred, plain_text fallback.
let text: &str = if !result.content.markdown.is_empty() {
&result.content.markdown
} else if !result.content.plain_text.is_empty() {
&result.content.plain_text
} else {
return Vec::new();
};
// Source URL and domain.
let source_url: String = result
.metadata
.url
.as_deref()
.unwrap_or("")
.to_string();
let domain = extract_domain(&source_url);
// Build the splitter with a token-range chunk config.
// Use (target - 112)..target as the range; handle pathological configs safely.
let upper = config.target_tokens.max(2);
let lower = upper.saturating_sub(112).max(1);
// Ensure lower < upper so the range is valid.
let lower = lower.min(upper - 1);
let splitter = MarkdownSplitter::new(
ChunkConfig::new(lower..upper).with_sizer(tokenizer.clone()),
);
// Split and collect (char_offset, chunk_text) pairs via chunk_char_indices.
let raw_chunks: Vec<(usize, String)> = splitter
.chunk_char_indices(text)
.map(|ci| (ci.char_offset, ci.chunk.to_string()))
.collect();
if raw_chunks.is_empty() {
return Vec::new();
}
// Apply sliding-window overlap: each chunk (except the first) gets a prefix
// consisting of the last `overlap_tokens` tokens of the previous raw chunk text.
let mut chunks_with_overlap: Vec<(usize, String)> = Vec::with_capacity(raw_chunks.len());
for (i, (offset, chunk_text)) in raw_chunks.iter().enumerate() {
let text_with_overlap: String = if i == 0 || config.overlap_tokens == 0 {
chunk_text.clone()
} else {
let prev_text = &raw_chunks[i - 1].1;
let prefix = overlap_prefix(prev_text, config.overlap_tokens, tokenizer);
if prefix.is_empty() {
chunk_text.clone()
} else {
format!("{}\n\n{}", prefix, chunk_text)
}
};
chunks_with_overlap.push((*offset, text_with_overlap));
}
// Filter by min_words, then cap at max_chunks_per_page.
let filtered: Vec<(usize, String)> = chunks_with_overlap
.into_iter()
.filter(|(_, t)| word_count(t) >= config.min_words)
.take(config.max_chunks_per_page)
.collect();
if filtered.is_empty() {
return Vec::new();
}
let total_chunks = filtered.len();
filtered
.into_iter()
.enumerate()
.map(|(chunk_index, (char_offset, text))| {
let t_est = token_estimate(&text, tokenizer);
Chunk {
text,
source_url: source_url.clone(),
domain: domain.clone(),
chunk_index,
total_chunks,
char_offset,
token_estimate: t_est,
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn domain_extraction() {
assert_eq!(
extract_domain("https://docs.example.com/foo"),
"docs.example.com"
);
assert_eq!(extract_domain(""), "");
assert_eq!(extract_domain("not-a-url"), "");
}
#[test]
fn word_count_basic() {
assert_eq!(word_count("hello world foo"), 3);
assert_eq!(word_count(" "), 0);
assert_eq!(word_count(""), 0);
}
}

View file

@ -1,8 +1,27 @@
// TeiProvider — implemented in noxa-68r.3
// TeiProvider — TEI (Text Embeddings Inference) embed provider
// Targets Qwen3-0.6B (1024-dim) served via Hugging Face TEI.
use async_trait::async_trait;
use crate::embed::EmbedProvider;
use crate::error::RagError;
/// Batch size tuned for RTX 4070 (~3x throughput vs default 32).
const BATCH_SIZE: usize = 96;
/// Reduced batch size on HTTP 413.
const BATCH_SIZE_REDUCED: usize = 48;
/// Default embedding dimensions for Qwen3-0.6B.
const DEFAULT_DIMENSIONS: usize = 1024;
/// Per-batch request timeout.
const BATCH_TIMEOUT_SECS: u64 = 60;
/// Max retries on 429/503.
const MAX_RETRIES: u32 = 3;
#[derive(serde::Serialize)]
struct EmbedRequest<'a> {
inputs: &'a [String],
truncate: bool,
normalize: bool,
}
pub struct TeiProvider {
pub(crate) client: reqwest::Client,
pub(crate) url: String,
@ -11,6 +30,58 @@ pub struct TeiProvider {
}
impl TeiProvider {
/// Construct with hardcoded dimensions (1024 for Qwen3-0.6B).
pub fn new(url: String, model: String) -> Self {
Self {
client: reqwest::Client::new(),
url,
model,
dimensions: DEFAULT_DIMENSIONS,
}
}
/// Construct by probing /embed with a single dummy text to discover dimensions.
pub async fn new_with_probe(
url: String,
model: String,
client: reqwest::Client,
) -> Result<Self, RagError> {
let dummy = vec!["probe".to_string()];
let req = EmbedRequest {
inputs: &dummy,
truncate: true,
normalize: true,
};
let resp = client
.post(format!("{}/embed", url))
.timeout(std::time::Duration::from_secs(10))
.json(&req)
.send()
.await?;
if !resp.status().is_success() {
return Err(RagError::Embed(format!(
"TEI probe failed with status {}",
resp.status()
)));
}
let vecs: Vec<Vec<f32>> = resp.json().await?;
let dimensions = vecs
.into_iter()
.next()
.map(|v| v.len())
.unwrap_or(DEFAULT_DIMENSIONS);
Ok(Self {
client,
url,
model,
dimensions,
})
}
/// GET /health — must return 200 within 2 s.
pub async fn is_available(&self) -> bool {
self.client
.get(format!("{}/health", self.url))
@ -28,12 +99,96 @@ impl TeiProvider {
pub fn name(&self) -> &str {
"tei"
}
/// Send one batch to POST /embed. Handles 429/503 with exponential back-off.
/// Returns Err(RagError::Embed) on HTTP 413 — caller should halve the batch.
async fn embed_batch(&self, batch: &[String]) -> Result<Vec<Vec<f32>>, RagError> {
let url = format!("{}/embed", self.url);
let req_body = EmbedRequest {
inputs: batch,
truncate: true,
normalize: true,
};
let mut delay_ms: u64 = 100;
for attempt in 0..=MAX_RETRIES {
let resp = self
.client
.post(&url)
.timeout(std::time::Duration::from_secs(BATCH_TIMEOUT_SECS))
.json(&req_body)
.send()
.await?;
let status = resp.status();
if status.is_success() {
let vecs: Vec<Vec<f32>> = resp.json().await?;
return Ok(vecs);
}
if status.as_u16() == 413 {
// Caller must halve the batch; no point retrying at this size.
return Err(RagError::Embed(format!(
"TEI returned 413 (payload too large) for batch of {}",
batch.len()
)));
}
if (status.as_u16() == 429 || status.as_u16() == 503) && attempt < MAX_RETRIES {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(400);
continue;
}
return Err(RagError::Embed(format!(
"TEI /embed returned HTTP {}",
status
)));
}
Err(RagError::Embed("TEI /embed: max retries exceeded".to_string()))
}
}
#[async_trait]
impl EmbedProvider for TeiProvider {
async fn embed(&self, _texts: &[String]) -> Result<Vec<Vec<f32>>, RagError> {
// Full implementation in noxa-68r.3
Err(RagError::Embed("TeiProvider not yet implemented".to_string()))
async fn embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>, RagError> {
if texts.is_empty() {
return Ok(vec![]);
}
let mut results: Vec<Vec<f32>> = Vec::with_capacity(texts.len());
for chunk in texts.chunks(BATCH_SIZE) {
match self.embed_batch(chunk).await {
Ok(vecs) => results.extend(vecs),
Err(RagError::Embed(ref msg)) if msg.contains("413") => {
// Halve batch size and retry once.
let mut chunk_results: Vec<Vec<f32>> = Vec::with_capacity(chunk.len());
let mut failed = false;
for sub_chunk in chunk.chunks(BATCH_SIZE_REDUCED) {
match self.embed_batch(sub_chunk).await {
Ok(vecs) => chunk_results.extend(vecs),
Err(e) => {
// 413 on reduced batch or other error — hard fail.
let _ = e;
failed = true;
break;
}
}
}
if failed {
return Err(RagError::Embed(
"TEI returned 413 even on reduced batch size".to_string(),
));
}
results.extend(chunk_results);
}
Err(e) => return Err(e),
}
}
Ok(results)
}
}

View file

@ -1,30 +1,202 @@
// QdrantStore — implemented in noxa-68r.4
use async_trait::async_trait;
use qdrant_client::Qdrant;
use qdrant_client::qdrant::{
Condition, CreateCollectionBuilder, CreateFieldIndexCollectionBuilder, DeletePointsBuilder,
Distance, FieldType, Filter, HnswConfigDiffBuilder, PointStruct, SearchPointsBuilder,
UpsertPointsBuilder, VectorParamsBuilder,
};
use qdrant_client::Payload;
use crate::error::RagError;
use crate::store::VectorStore;
use crate::types::{Point, SearchResult};
pub struct QdrantStore {
pub(crate) client: qdrant_client::Qdrant,
pub(crate) collection: String,
client: Qdrant,
collection: String,
#[allow(dead_code)]
uuid_namespace: uuid::Uuid,
}
impl QdrantStore {
/// Create a new QdrantStore.
///
/// `url` should be the gRPC endpoint, typically `http://localhost:6334`.
/// The crate uses gRPC transport via tonic — the `reqwest` feature only
/// enables snapshot downloads, not a REST transport.
pub fn new(
url: &str,
collection: String,
api_key: Option<String>,
uuid_namespace: uuid::Uuid,
) -> Result<Self, RagError> {
let mut builder = Qdrant::from_url(url);
if let Some(key) = api_key {
builder = builder.api_key(key);
}
let client = builder
.build()
.map_err(|e| RagError::Store(format!("failed to build qdrant client: {e}")))?;
Ok(Self {
client,
collection,
uuid_namespace,
})
}
/// Check whether the collection already exists.
pub async fn collection_exists(&self) -> Result<bool, RagError> {
self.client
.collection_exists(&self.collection)
.await
.map_err(|e| RagError::Store(format!("collection_exists failed: {e}")))
}
/// Create the collection with cosine distance, HNSW m=16/ef_construct=200,
/// and payload indexes on `url` + `domain`.
pub async fn create_collection(&self, dims: usize) -> Result<(), RagError> {
let hnsw = HnswConfigDiffBuilder::default()
.m(16)
.ef_construct(200)
.build();
let vectors = VectorParamsBuilder::new(dims as u64, Distance::Cosine)
.on_disk(true)
.hnsw_config(hnsw);
self.client
.create_collection(
CreateCollectionBuilder::new(&self.collection)
.vectors_config(vectors)
.on_disk_payload(true),
)
.await
.map_err(|e| RagError::Store(format!("create_collection failed: {e}")))?;
// Payload indexes for fast filtering by url and domain.
for field in ["url", "domain"] {
self.client
.create_field_index(CreateFieldIndexCollectionBuilder::new(
&self.collection,
field,
FieldType::Keyword,
))
.await
.map_err(|e| {
RagError::Store(format!("create_field_index({field}) failed: {e}"))
})?;
}
Ok(())
}
}
#[async_trait]
impl VectorStore for QdrantStore {
async fn upsert(&self, _points: Vec<Point>) -> Result<(), RagError> {
// Full implementation in noxa-68r.4
Err(RagError::Store("QdrantStore not yet implemented".to_string()))
/// Upsert points into the collection in batches of 256.
async fn upsert(&self, points: Vec<Point>) -> Result<(), RagError> {
for chunk in points.chunks(256) {
let qdrant_points: Vec<PointStruct> = chunk
.iter()
.map(|p| {
let mut payload = Payload::new();
payload.insert("text", p.payload.text.as_str());
payload.insert("url", p.payload.url.as_str());
payload.insert("domain", p.payload.domain.as_str());
payload.insert("chunk_index", p.payload.chunk_index as i64);
payload.insert("total_chunks", p.payload.total_chunks as i64);
payload.insert("token_estimate", p.payload.token_estimate as i64);
PointStruct::new(
p.id.to_string(), // UUID as string PointId
p.vector.clone(),
payload,
)
})
.collect();
self.client
.upsert_points(
UpsertPointsBuilder::new(&self.collection, qdrant_points).wait(true),
)
.await
.map_err(|e| RagError::Store(format!("upsert_points failed: {e}")))?;
}
Ok(())
}
async fn delete_by_url(&self, _url: &str) -> Result<(), RagError> {
Err(RagError::Store("QdrantStore not yet implemented".to_string()))
/// Delete all points whose `url` payload field matches the normalized URL.
async fn delete_by_url(&self, url: &str) -> Result<(), RagError> {
let normalized = normalize_url(url);
self.client
.delete_points(
DeletePointsBuilder::new(&self.collection)
.points(Filter::must([Condition::matches(
"url",
normalized.clone(),
)]))
.wait(true),
)
.await
.map_err(|e| RagError::Store(format!("delete_points failed: {e}")))?;
Ok(())
}
async fn search(&self, _vector: &[f32], _limit: usize) -> Result<Vec<SearchResult>, RagError> {
Err(RagError::Store("QdrantStore not yet implemented".to_string()))
/// Search for the nearest `limit` vectors and return their payloads.
async fn search(&self, vector: &[f32], limit: usize) -> Result<Vec<SearchResult>, RagError> {
let response = self
.client
.search_points(
SearchPointsBuilder::new(&self.collection, vector.to_vec(), limit as u64)
.with_payload(true),
)
.await
.map_err(|e| RagError::Store(format!("search_points failed: {e}")))?;
let results = response
.result
.into_iter()
.filter_map(|hit| {
let text = hit.get("text").as_str()?.to_string();
let url = hit.get("url").as_str()?.to_string();
let chunk_index = hit.get("chunk_index").as_integer().unwrap_or(0) as usize;
let token_estimate =
hit.get("token_estimate").as_integer().unwrap_or(0) as usize;
Some(SearchResult {
text,
url,
score: hit.score,
chunk_index,
token_estimate,
})
})
.collect();
Ok(results)
}
fn name(&self) -> &str {
"qdrant"
}
}
/// Normalize a URL for consistent storage and lookup:
/// - Strip fragment
/// - Strip trailing slash from path
/// - Scheme and host are already lowercased by the `url` crate
fn normalize_url(url: &str) -> String {
use url::Url;
let Ok(mut parsed) = Url::parse(url) else {
return url.to_string();
};
parsed.set_fragment(None);
let path = parsed.path().trim_end_matches('/').to_string();
parsed.set_path(&path);
parsed.to_string()
}