wire persist_scratch_index into scratch reindex

This commit is contained in:
CREDO23 2026-06-17 14:59:24 +02:00
parent 34de6c6f87
commit aca23b4731

View file

@ -29,7 +29,7 @@ from app.indexing_pipeline.document_hashing import (
compute_unique_identifier_hash, compute_unique_identifier_hash,
) )
from app.indexing_pipeline.document_persistence import ( from app.indexing_pipeline.document_persistence import (
attach_chunks_to_document, persist_scratch_index,
rollback_and_persist_failure, rollback_and_persist_failure,
) )
from app.indexing_pipeline.exceptions import ( from app.indexing_pipeline.exceptions import (
@ -387,21 +387,37 @@ class IndexingPipelineService:
chunk_count = await self._reindex_incrementally( chunk_count = await self._reindex_incrementally(
document, content, connector_doc, existing document, content, connector_doc, existing
) )
perf.info(
"[indexing] chunk+embed doc=%d chunks=%d in %.3fs",
document.id,
chunk_count,
time.perf_counter() - t_step,
)
document.content = content
document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.ready()
await self.session.commit()
else: else:
chunk_count = await self._reindex_from_scratch( from app.config import config
chunks = await self._reindex_from_scratch(
document, content, connector_doc document, content, connector_doc
) )
perf.info( chunk_count = len(chunks)
"[indexing] chunk+embed doc=%d chunks=%d in %.3fs", perf.info(
document.id, "[indexing] chunk+embed doc=%d chunks=%d in %.3fs",
chunk_count, document.id,
time.perf_counter() - t_step, chunk_count,
) time.perf_counter() - t_step,
)
document.content = content await persist_scratch_index(
document.updated_at = datetime.now(UTC) self.session,
document.status = DocumentStatus.ready() document,
await self.session.commit() content,
chunks,
batch_size=config.INDEXING_CHUNK_INSERT_BATCH_SIZE,
perf=perf,
)
perf.info( perf.info(
"[indexing] index TOTAL doc=%d chunks=%d in %.3fs", "[indexing] index TOTAL doc=%d chunks=%d in %.3fs",
document.id, document.id,
@ -484,8 +500,7 @@ class IndexingPipelineService:
async def _reindex_from_scratch( async def _reindex_from_scratch(
self, document: Document, content: str, connector_doc: ConnectorDocument self, document: Document, content: str, connector_doc: ConnectorDocument
) -> int: ) -> list[Chunk]:
"""First index (or kill-switched re-index): cache-aware full chunk+embed."""
await self.session.execute( await self.session.execute(
delete(Chunk).where(Chunk.document_id == document.id) delete(Chunk).where(Chunk.document_id == document.id)
) )
@ -495,13 +510,11 @@ class IndexingPipelineService:
use_code_chunker=connector_doc.should_use_code_chunker, use_code_chunker=connector_doc.should_use_code_chunker,
) )
chunks = [ document.embedding = summary_embedding
return [
Chunk(content=text, embedding=emb, position=i) Chunk(content=text, embedding=emb, position=i)
for i, (text, emb) in enumerate(chunk_pairs) for i, (text, emb) in enumerate(chunk_pairs)
] ]
document.embedding = summary_embedding
attach_chunks_to_document(document, chunks)
return len(chunks)
async def _reindex_incrementally( async def _reindex_incrementally(
self, self,