mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-12 20:45:20 +02:00
feat(index-cache): serve chunk embeddings from cache during indexing
This commit is contained in:
parent
e8938c119b
commit
019aa7bf76
3 changed files with 138 additions and 21 deletions
11
surfsense_backend/app/indexing_pipeline/cache/__init__.py
vendored
Normal file
11
surfsense_backend/app/indexing_pipeline/cache/__init__.py
vendored
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
"""Content-addressed reuse of chunk+embedding output across workspaces."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from app.indexing_pipeline.cache.cached_indexing import build_chunk_embeddings
|
||||
from app.indexing_pipeline.cache.service import IndexCacheService
|
||||
|
||||
__all__ = [
|
||||
"IndexCacheService",
|
||||
"build_chunk_embeddings",
|
||||
]
|
||||
121
surfsense_backend/app/indexing_pipeline/cache/cached_indexing.py
vendored
Normal file
121
surfsense_backend/app/indexing_pipeline/cache/cached_indexing.py
vendored
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
"""Entry point: serve chunk embeddings from cache, embedding only on a miss.
|
||||
|
||||
Embeddings are a pure function of the markdown, the embedding model, and the
|
||||
chunker -- so identical markdown is chunked and embedded once and reused across
|
||||
workspaces, even when it came from different sources.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import logging
|
||||
|
||||
import numpy as np
|
||||
|
||||
from app.config import config
|
||||
from app.indexing_pipeline.cache.eligibility import is_index_cacheable
|
||||
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet
|
||||
from app.indexing_pipeline.cache.service import IndexCacheService
|
||||
from app.indexing_pipeline.cache.settings import load_index_cache_settings
|
||||
from app.indexing_pipeline.document_chunker import chunk_text, chunk_text_hybrid
|
||||
from app.indexing_pipeline.document_embedder import embed_texts
|
||||
from app.observability import metrics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ChunkPair = tuple[str, np.ndarray]
|
||||
|
||||
|
||||
async def build_chunk_embeddings(
|
||||
markdown: str, *, use_code_chunker: bool
|
||||
) -> tuple[np.ndarray, list[ChunkPair]]:
|
||||
"""Return the document-level vector and ordered ``(chunk_text, vector)`` pairs.
|
||||
|
||||
Drop-in for the inline chunk+embed step; reuses prior output when the same
|
||||
markdown has already been embedded with the current model and chunker.
|
||||
"""
|
||||
settings = load_index_cache_settings()
|
||||
chunker_kind = "code" if use_code_chunker else "hybrid"
|
||||
embedding_dim = getattr(config.embedding_model_instance, "dimension", None)
|
||||
|
||||
cacheable = is_index_cacheable(
|
||||
cache_enabled=settings.enabled,
|
||||
embedding_model=config.EMBEDDING_MODEL,
|
||||
embedding_dim=embedding_dim,
|
||||
)
|
||||
if not cacheable:
|
||||
return await _compute(markdown, use_code_chunker=use_code_chunker)
|
||||
|
||||
key = EmbeddingKey(
|
||||
markdown_sha256=_hash_text(markdown),
|
||||
embedding_model=config.EMBEDDING_MODEL,
|
||||
embedding_dim=int(embedding_dim),
|
||||
chunker_kind=chunker_kind,
|
||||
chunker_version=settings.chunker_version,
|
||||
)
|
||||
|
||||
cached = await _recall(key)
|
||||
if cached is not None:
|
||||
metrics.record_index_cache_lookup(
|
||||
embedding_model=key.embedding_model, chunker_kind=chunker_kind, outcome="hit"
|
||||
)
|
||||
logger.debug("Index cache hit for %s", key.markdown_sha256)
|
||||
return cached.summary_embedding, [(c.text, c.embedding) for c in cached.chunks]
|
||||
|
||||
metrics.record_index_cache_lookup(
|
||||
embedding_model=key.embedding_model, chunker_kind=chunker_kind, outcome="miss"
|
||||
)
|
||||
summary_embedding, chunk_pairs = await _compute(
|
||||
markdown, use_code_chunker=use_code_chunker
|
||||
)
|
||||
await _remember(key, summary_embedding, chunk_pairs)
|
||||
return summary_embedding, chunk_pairs
|
||||
|
||||
|
||||
async def _compute(
|
||||
markdown: str, *, use_code_chunker: bool
|
||||
) -> tuple[np.ndarray, list[ChunkPair]]:
|
||||
if use_code_chunker:
|
||||
chunk_texts = await asyncio.to_thread(
|
||||
chunk_text, markdown, use_code_chunker=True
|
||||
)
|
||||
else:
|
||||
# Table-aware hybrid chunker keeps Markdown tables intact (issue #1334).
|
||||
chunk_texts = await asyncio.to_thread(chunk_text_hybrid, markdown)
|
||||
|
||||
embeddings = await asyncio.to_thread(embed_texts, [markdown, *chunk_texts])
|
||||
summary_embedding, *chunk_embeddings = embeddings
|
||||
return summary_embedding, list(zip(chunk_texts, chunk_embeddings, strict=False))
|
||||
|
||||
|
||||
async def _recall(key: EmbeddingKey) -> EmbeddingSet | None:
|
||||
# Caching is best-effort: any failure falls through to a normal embed.
|
||||
try:
|
||||
from app.tasks.celery_tasks import get_celery_session_maker
|
||||
|
||||
async with get_celery_session_maker()() as session:
|
||||
return await IndexCacheService(session).recall(key)
|
||||
except Exception:
|
||||
logger.warning("Index cache recall failed; embedding fresh", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
async def _remember(
|
||||
key: EmbeddingKey, summary_embedding: np.ndarray, chunk_pairs: list[ChunkPair]
|
||||
) -> None:
|
||||
try:
|
||||
from app.tasks.celery_tasks import get_celery_session_maker
|
||||
|
||||
embedding_set = EmbeddingSet(
|
||||
summary_embedding=summary_embedding,
|
||||
chunks=[CachedChunk(text=text, embedding=vec) for text, vec in chunk_pairs],
|
||||
)
|
||||
async with get_celery_session_maker()() as session:
|
||||
await IndexCacheService(session).remember(key, embedding_set)
|
||||
except Exception:
|
||||
logger.warning("Index cache write failed; result not cached", exc_info=True)
|
||||
|
||||
|
||||
def _hash_text(text: str) -> str:
|
||||
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||||
|
|
@ -19,9 +19,8 @@ from app.db import (
|
|||
DocumentStatus,
|
||||
DocumentType,
|
||||
)
|
||||
from app.indexing_pipeline.cache import build_chunk_embeddings
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.document_chunker import chunk_text, chunk_text_hybrid
|
||||
from app.indexing_pipeline.document_embedder import embed_texts
|
||||
from app.indexing_pipeline.document_hashing import (
|
||||
compute_content_hash,
|
||||
compute_identifier_hash,
|
||||
|
|
@ -385,27 +384,13 @@ class IndexingPipelineService:
|
|||
)
|
||||
|
||||
t_step = time.perf_counter()
|
||||
if connector_doc.should_use_code_chunker:
|
||||
chunk_texts = await asyncio.to_thread(
|
||||
chunk_text,
|
||||
connector_doc.source_markdown,
|
||||
use_code_chunker=True,
|
||||
)
|
||||
else:
|
||||
# Use the table-aware hybrid chunker so Markdown tables are not
|
||||
# split mid-row (see issue #1334).
|
||||
chunk_texts = await asyncio.to_thread(
|
||||
chunk_text_hybrid,
|
||||
connector_doc.source_markdown,
|
||||
)
|
||||
|
||||
texts_to_embed = [content, *chunk_texts]
|
||||
embeddings = await asyncio.to_thread(embed_texts, texts_to_embed)
|
||||
summary_embedding, *chunk_embeddings = embeddings
|
||||
summary_embedding, chunk_pairs = await build_chunk_embeddings(
|
||||
content,
|
||||
use_code_chunker=connector_doc.should_use_code_chunker,
|
||||
)
|
||||
|
||||
chunks = [
|
||||
Chunk(content=text, embedding=emb)
|
||||
for text, emb in zip(chunk_texts, chunk_embeddings, strict=False)
|
||||
Chunk(content=text, embedding=emb) for text, emb in chunk_pairs
|
||||
]
|
||||
perf.info(
|
||||
"[indexing] chunk+embed doc=%d chunks=%d in %.3fs",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue