From 7d55aaf2c183b37b715be3c9f4d15cff5156e04a Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 12 Jun 2026 18:53:08 +0200 Subject: [PATCH] feat(indexing): reconcile chunks incrementally on re-index index() now loads existing rows and applies a content diff instead of delete-all/reinsert-all: unchanged chunks keep their rows and embeddings (zero HNSW/GIN churn), moved chunks get a position-only UPDATE, and only new texts are embedded, batched with the summary embedding. First index keeps the cache-aware build_chunk_embeddings path. --- .../indexing_pipeline_service.py | 119 +++++++++++++++--- 1 file changed, 101 insertions(+), 18 deletions(-) diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 271b3ee03..224eb0f5d 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -8,7 +8,7 @@ from collections.abc import Awaitable, Callable from dataclasses import dataclass, field from datetime import UTC, datetime -from sqlalchemy import delete, select +from sqlalchemy import delete, select, update from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession @@ -20,6 +20,8 @@ from app.db import ( DocumentType, ) from app.indexing_pipeline.cache import build_chunk_embeddings +from app.indexing_pipeline.cache.cached_indexing import chunk_markdown, embed_batch +from app.indexing_pipeline.chunk_reconciler import ExistingChunk, reconcile from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import ( compute_content_hash, @@ -379,39 +381,34 @@ class IndexingPipelineService: content = connector_doc.source_markdown - await self.session.execute( - delete(Chunk).where(Chunk.document_id == document.id) - ) - t_step = time.perf_counter() - summary_embedding, chunk_pairs = await build_chunk_embeddings( - content, - use_code_chunker=connector_doc.should_use_code_chunker, - ) - - chunks = [ - Chunk(content=text, embedding=emb) for text, emb in chunk_pairs - ] + existing = await self._load_existing_chunks(document.id) + if existing and self._reconcile_enabled(): + chunk_count = await self._reindex_incrementally( + document, content, connector_doc, existing + ) + else: + chunk_count = await self._reindex_from_scratch( + document, content, connector_doc + ) perf.info( "[indexing] chunk+embed doc=%d chunks=%d in %.3fs", document.id, - len(chunks), + chunk_count, time.perf_counter() - t_step, ) document.content = content - document.embedding = summary_embedding - attach_chunks_to_document(document, chunks) document.updated_at = datetime.now(UTC) document.status = DocumentStatus.ready() await self.session.commit() perf.info( "[indexing] index TOTAL doc=%d chunks=%d in %.3fs", document.id, - len(chunks), + chunk_count, time.perf_counter() - t_index, ) - log_index_success(ctx, chunk_count=len(chunks)) + log_index_success(ctx, chunk_count=chunk_count) outcome_status = "success" await self._enqueue_ai_sort_if_enabled(document) @@ -468,6 +465,92 @@ class IndexingPipelineService: persist_span_cm.__exit__(*sys.exc_info()) return document + @staticmethod + def _reconcile_enabled() -> bool: + from app.config import config + + return config.CHUNK_RECONCILE_ENABLED + + async def _load_existing_chunks(self, document_id: int) -> list[ExistingChunk]: + result = await self.session.execute( + select(Chunk.id, Chunk.content, Chunk.position).where( + Chunk.document_id == document_id + ) + ) + return [ + ExistingChunk(id=row.id, content=row.content, position=row.position) + for row in result + ] + + async def _reindex_from_scratch( + self, document: Document, content: str, connector_doc: ConnectorDocument + ) -> int: + """First index (or kill-switched re-index): cache-aware full chunk+embed.""" + await self.session.execute( + delete(Chunk).where(Chunk.document_id == document.id) + ) + + summary_embedding, chunk_pairs = await build_chunk_embeddings( + content, + use_code_chunker=connector_doc.should_use_code_chunker, + ) + + chunks = [ + Chunk(content=text, embedding=emb, position=i) + for i, (text, emb) in enumerate(chunk_pairs) + ] + document.embedding = summary_embedding + attach_chunks_to_document(document, chunks) + return len(chunks) + + async def _reindex_incrementally( + self, + document: Document, + content: str, + connector_doc: ConnectorDocument, + existing: list[ExistingChunk], + ) -> int: + """Edit path: keep rows whose text survived, embed only new texts. + + Unchanged rows keep their embedding and their HNSW/GIN index entries; + moved rows get a position-only UPDATE, which touches neither index. + """ + new_texts = await chunk_markdown( + content, use_code_chunker=connector_doc.should_use_code_chunker + ) + plan = reconcile(existing, new_texts) + + # One batch: the document-level summary vector plus the missing chunks. + embeddings = await embed_batch([content, *[t for _, t in plan.to_embed]]) + summary_embedding, *new_embeddings = embeddings + + if plan.reused: + await self.session.execute( + update(Chunk), + [{"id": cid, "position": pos} for cid, pos in plan.reused], + ) + if plan.to_delete: + await self.session.execute( + delete(Chunk).where(Chunk.id.in_(plan.to_delete)) + ) + self.session.add_all( + Chunk( + content=text, + embedding=emb, + position=pos, + document_id=document.id, + ) + for (pos, text), emb in zip(plan.to_embed, new_embeddings, strict=True) + ) + document.embedding = summary_embedding + + ot_metrics.record_chunk_reconcile( + reused=len(existing) - len(plan.to_delete), + embedded=len(plan.to_embed), + deleted=len(plan.to_delete), + ) + return len(new_texts) + async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None: """Fire-and-forget: enqueue incremental AI sort if the search space has it enabled.""" try: