feat(indexing): reconcile chunks incrementally on re-index

index() now loads existing rows and applies a content diff instead of
delete-all/reinsert-all: unchanged chunks keep their rows and embeddings
(zero HNSW/GIN churn), moved chunks get a position-only UPDATE, and only
new texts are embedded, batched with the summary embedding. First index
keeps the cache-aware build_chunk_embeddings path.
This commit is contained in:
CREDO23 2026-06-12 18:53:08 +02:00
parent fd495e1b2f
commit 7d55aaf2c1

View file

@ -8,7 +8,7 @@ from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from sqlalchemy import delete, select
from sqlalchemy import delete, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
@ -20,6 +20,8 @@ from app.db import (
DocumentType,
)
from app.indexing_pipeline.cache import build_chunk_embeddings
from app.indexing_pipeline.cache.cached_indexing import chunk_markdown, embed_batch
from app.indexing_pipeline.chunk_reconciler import ExistingChunk, reconcile
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import (
compute_content_hash,
@ -379,39 +381,34 @@ class IndexingPipelineService:
content = connector_doc.source_markdown
await self.session.execute(
delete(Chunk).where(Chunk.document_id == document.id)
)
t_step = time.perf_counter()
summary_embedding, chunk_pairs = await build_chunk_embeddings(
content,
use_code_chunker=connector_doc.should_use_code_chunker,
)
chunks = [
Chunk(content=text, embedding=emb) for text, emb in chunk_pairs
]
existing = await self._load_existing_chunks(document.id)
if existing and self._reconcile_enabled():
chunk_count = await self._reindex_incrementally(
document, content, connector_doc, existing
)
else:
chunk_count = await self._reindex_from_scratch(
document, content, connector_doc
)
perf.info(
"[indexing] chunk+embed doc=%d chunks=%d in %.3fs",
document.id,
len(chunks),
chunk_count,
time.perf_counter() - t_step,
)
document.content = content
document.embedding = summary_embedding
attach_chunks_to_document(document, chunks)
document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.ready()
await self.session.commit()
perf.info(
"[indexing] index TOTAL doc=%d chunks=%d in %.3fs",
document.id,
len(chunks),
chunk_count,
time.perf_counter() - t_index,
)
log_index_success(ctx, chunk_count=len(chunks))
log_index_success(ctx, chunk_count=chunk_count)
outcome_status = "success"
await self._enqueue_ai_sort_if_enabled(document)
@ -468,6 +465,92 @@ class IndexingPipelineService:
persist_span_cm.__exit__(*sys.exc_info())
return document
@staticmethod
def _reconcile_enabled() -> bool:
from app.config import config
return config.CHUNK_RECONCILE_ENABLED
async def _load_existing_chunks(self, document_id: int) -> list[ExistingChunk]:
result = await self.session.execute(
select(Chunk.id, Chunk.content, Chunk.position).where(
Chunk.document_id == document_id
)
)
return [
ExistingChunk(id=row.id, content=row.content, position=row.position)
for row in result
]
async def _reindex_from_scratch(
self, document: Document, content: str, connector_doc: ConnectorDocument
) -> int:
"""First index (or kill-switched re-index): cache-aware full chunk+embed."""
await self.session.execute(
delete(Chunk).where(Chunk.document_id == document.id)
)
summary_embedding, chunk_pairs = await build_chunk_embeddings(
content,
use_code_chunker=connector_doc.should_use_code_chunker,
)
chunks = [
Chunk(content=text, embedding=emb, position=i)
for i, (text, emb) in enumerate(chunk_pairs)
]
document.embedding = summary_embedding
attach_chunks_to_document(document, chunks)
return len(chunks)
async def _reindex_incrementally(
self,
document: Document,
content: str,
connector_doc: ConnectorDocument,
existing: list[ExistingChunk],
) -> int:
"""Edit path: keep rows whose text survived, embed only new texts.
Unchanged rows keep their embedding and their HNSW/GIN index entries;
moved rows get a position-only UPDATE, which touches neither index.
"""
new_texts = await chunk_markdown(
content, use_code_chunker=connector_doc.should_use_code_chunker
)
plan = reconcile(existing, new_texts)
# One batch: the document-level summary vector plus the missing chunks.
embeddings = await embed_batch([content, *[t for _, t in plan.to_embed]])
summary_embedding, *new_embeddings = embeddings
if plan.reused:
await self.session.execute(
update(Chunk),
[{"id": cid, "position": pos} for cid, pos in plan.reused],
)
if plan.to_delete:
await self.session.execute(
delete(Chunk).where(Chunk.id.in_(plan.to_delete))
)
self.session.add_all(
Chunk(
content=text,
embedding=emb,
position=pos,
document_id=document.id,
)
for (pos, text), emb in zip(plan.to_embed, new_embeddings, strict=True)
)
document.embedding = summary_embedding
ot_metrics.record_chunk_reconcile(
reused=len(existing) - len(plan.to_delete),
embedded=len(plan.to_embed),
deleted=len(plan.to_delete),
)
return len(new_texts)
async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None:
"""Fire-and-forget: enqueue incremental AI sort if the search space has it enabled."""
try: