From c57ee978e67f97d69a7aac8294e415430131d2e8 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Thu, 18 Jun 2026 20:06:26 +0200 Subject: [PATCH] feat: persist and refresh chunk char spans on index --- .../indexing_pipeline_service.py | 89 +++++++++++++++---- 1 file changed, 70 insertions(+), 19 deletions(-) diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 30ea9d5d6..0cb74089b 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -20,9 +20,10 @@ from app.db import ( DocumentType, ) from app.indexing_pipeline.cache import build_chunk_embeddings -from app.indexing_pipeline.cache.cached_indexing import chunk_markdown, embed_batch -from app.indexing_pipeline.chunk_reconciler import ExistingChunk, reconcile +from app.indexing_pipeline.cache.cached_indexing import chunk_slices, embed_batch +from app.indexing_pipeline.chunk_reconciler import ChunkPlan, ExistingChunk, reconcile from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_chunker import ChunkSlice from app.indexing_pipeline.document_hashing import ( compute_content_hash, compute_identifier_hash, @@ -489,12 +490,22 @@ class IndexingPipelineService: async def _load_existing_chunks(self, document_id: int) -> list[ExistingChunk]: result = await self.session.execute( - select(Chunk.id, Chunk.content, Chunk.position).where( - Chunk.document_id == document_id - ) + select( + Chunk.id, + Chunk.content, + Chunk.position, + Chunk.start_char, + Chunk.end_char, + ).where(Chunk.document_id == document_id) ) return [ - ExistingChunk(id=row.id, content=row.content, position=row.position) + ExistingChunk( + id=row.id, + content=row.content, + position=row.position, + start_char=row.start_char, + end_char=row.end_char, + ) for row in result ] @@ -505,15 +516,21 @@ class IndexingPipelineService: delete(Chunk).where(Chunk.document_id == document.id) ) - summary_embedding, chunk_pairs = await build_chunk_embeddings( + summary_embedding, slice_pairs = await build_chunk_embeddings( content, use_code_chunker=connector_doc.should_use_code_chunker, ) document.embedding = summary_embedding return [ - Chunk(content=text, embedding=emb, position=i) - for i, (text, emb) in enumerate(chunk_pairs) + Chunk( + content=chunk_slice.text, + embedding=emb, + position=i, + start_char=chunk_slice.start_char, + end_char=chunk_slice.end_char, + ) + for i, (chunk_slice, emb) in enumerate(slice_pairs) ] async def _reindex_incrementally( @@ -525,35 +542,39 @@ class IndexingPipelineService: ) -> int: """Edit path: keep rows whose text survived, embed only new texts. - Unchanged rows keep their embedding and their HNSW/GIN index entries; - moved rows get a position-only UPDATE, which touches neither index. + Unchanged rows keep their embedding and their HNSW/GIN index entries. An + edit can shift a kept chunk's char span without changing its text, so + every kept row's position and span are refreshed whenever they drift. """ - new_texts = await chunk_markdown( + slices = await chunk_slices( content, use_code_chunker=connector_doc.should_use_code_chunker ) + new_texts = [s.text for s in slices] plan = reconcile(existing, new_texts) # One batch: the document-level summary vector plus the missing chunks. embeddings = await embed_batch([content, *[t for _, t in plan.to_embed]]) summary_embedding, *new_embeddings = embeddings - if plan.reused: - await self.session.execute( - update(Chunk), - [{"id": cid, "position": pos} for cid, pos in plan.reused], - ) if plan.to_delete: await self.session.execute( delete(Chunk).where(Chunk.id.in_(plan.to_delete)) ) + + span_updates = self._kept_row_span_updates(existing, slices, plan) + if span_updates: + await self.session.execute(update(Chunk), span_updates) + self.session.add_all( Chunk( - content=text, + content=slices[pos].text, embedding=emb, position=pos, + start_char=slices[pos].start_char, + end_char=slices[pos].end_char, document_id=document.id, ) - for (pos, text), emb in zip(plan.to_embed, new_embeddings, strict=True) + for (pos, _text), emb in zip(plan.to_embed, new_embeddings, strict=True) ) document.embedding = summary_embedding @@ -564,6 +585,36 @@ class IndexingPipelineService: ) return len(new_texts) + @staticmethod + def _kept_row_span_updates( + existing: list[ExistingChunk], + slices: list[ChunkSlice], + plan: ChunkPlan, + ) -> list[dict]: + """Position/span writes for kept rows, emitted only where a value drifts.""" + deleted = set(plan.to_delete) + moved = dict(plan.reused) + updates: list[dict] = [] + for chunk in existing: + if chunk.id in deleted: + continue + new_position = moved.get(chunk.id, chunk.position) + target = slices[new_position] + if ( + chunk.position != new_position + or chunk.start_char != target.start_char + or chunk.end_char != target.end_char + ): + updates.append( + { + "id": chunk.id, + "position": new_position, + "start_char": target.start_char, + "end_char": target.end_char, + } + ) + return updates + async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None: """Fire-and-forget: enqueue incremental AI sort if the search space has it enabled.""" try: