diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 224eb0f5d..30ea9d5d6 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -29,7 +29,7 @@ from app.indexing_pipeline.document_hashing import ( compute_unique_identifier_hash, ) from app.indexing_pipeline.document_persistence import ( - attach_chunks_to_document, + persist_scratch_index, rollback_and_persist_failure, ) from app.indexing_pipeline.exceptions import ( @@ -387,21 +387,37 @@ class IndexingPipelineService: chunk_count = await self._reindex_incrementally( document, content, connector_doc, existing ) + perf.info( + "[indexing] chunk+embed doc=%d chunks=%d in %.3fs", + document.id, + chunk_count, + time.perf_counter() - t_step, + ) + document.content = content + document.updated_at = datetime.now(UTC) + document.status = DocumentStatus.ready() + await self.session.commit() else: - chunk_count = await self._reindex_from_scratch( + from app.config import config + + chunks = await self._reindex_from_scratch( document, content, connector_doc ) - perf.info( - "[indexing] chunk+embed doc=%d chunks=%d in %.3fs", - document.id, - chunk_count, - time.perf_counter() - t_step, - ) - - document.content = content - document.updated_at = datetime.now(UTC) - document.status = DocumentStatus.ready() - await self.session.commit() + chunk_count = len(chunks) + perf.info( + "[indexing] chunk+embed doc=%d chunks=%d in %.3fs", + document.id, + chunk_count, + time.perf_counter() - t_step, + ) + await persist_scratch_index( + self.session, + document, + content, + chunks, + batch_size=config.INDEXING_CHUNK_INSERT_BATCH_SIZE, + perf=perf, + ) perf.info( "[indexing] index TOTAL doc=%d chunks=%d in %.3fs", document.id, @@ -484,8 +500,7 @@ class IndexingPipelineService: async def _reindex_from_scratch( self, document: Document, content: str, connector_doc: ConnectorDocument - ) -> int: - """First index (or kill-switched re-index): cache-aware full chunk+embed.""" + ) -> list[Chunk]: await self.session.execute( delete(Chunk).where(Chunk.document_id == document.id) ) @@ -495,13 +510,11 @@ class IndexingPipelineService: use_code_chunker=connector_doc.should_use_code_chunker, ) - chunks = [ + document.embedding = summary_embedding + return [ Chunk(content=text, embedding=emb, position=i) for i, (text, emb) in enumerate(chunk_pairs) ] - document.embedding = summary_embedding - attach_chunks_to_document(document, chunks) - return len(chunks) async def _reindex_incrementally( self,