feat: persist and refresh chunk char spans on index

This commit is contained in:
CREDO23 2026-06-18 20:06:26 +02:00
parent 1e33c28c24
commit c57ee978e6

View file

@ -20,9 +20,10 @@ from app.db import (
DocumentType, DocumentType,
) )
from app.indexing_pipeline.cache import build_chunk_embeddings from app.indexing_pipeline.cache import build_chunk_embeddings
from app.indexing_pipeline.cache.cached_indexing import chunk_markdown, embed_batch from app.indexing_pipeline.cache.cached_indexing import chunk_slices, embed_batch
from app.indexing_pipeline.chunk_reconciler import ExistingChunk, reconcile from app.indexing_pipeline.chunk_reconciler import ChunkPlan, ExistingChunk, reconcile
from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_chunker import ChunkSlice
from app.indexing_pipeline.document_hashing import ( from app.indexing_pipeline.document_hashing import (
compute_content_hash, compute_content_hash,
compute_identifier_hash, compute_identifier_hash,
@ -489,12 +490,22 @@ class IndexingPipelineService:
async def _load_existing_chunks(self, document_id: int) -> list[ExistingChunk]: async def _load_existing_chunks(self, document_id: int) -> list[ExistingChunk]:
result = await self.session.execute( result = await self.session.execute(
select(Chunk.id, Chunk.content, Chunk.position).where( select(
Chunk.document_id == document_id Chunk.id,
) Chunk.content,
Chunk.position,
Chunk.start_char,
Chunk.end_char,
).where(Chunk.document_id == document_id)
) )
return [ return [
ExistingChunk(id=row.id, content=row.content, position=row.position) ExistingChunk(
id=row.id,
content=row.content,
position=row.position,
start_char=row.start_char,
end_char=row.end_char,
)
for row in result for row in result
] ]
@ -505,15 +516,21 @@ class IndexingPipelineService:
delete(Chunk).where(Chunk.document_id == document.id) delete(Chunk).where(Chunk.document_id == document.id)
) )
summary_embedding, chunk_pairs = await build_chunk_embeddings( summary_embedding, slice_pairs = await build_chunk_embeddings(
content, content,
use_code_chunker=connector_doc.should_use_code_chunker, use_code_chunker=connector_doc.should_use_code_chunker,
) )
document.embedding = summary_embedding document.embedding = summary_embedding
return [ return [
Chunk(content=text, embedding=emb, position=i) Chunk(
for i, (text, emb) in enumerate(chunk_pairs) content=chunk_slice.text,
embedding=emb,
position=i,
start_char=chunk_slice.start_char,
end_char=chunk_slice.end_char,
)
for i, (chunk_slice, emb) in enumerate(slice_pairs)
] ]
async def _reindex_incrementally( async def _reindex_incrementally(
@ -525,35 +542,39 @@ class IndexingPipelineService:
) -> int: ) -> int:
"""Edit path: keep rows whose text survived, embed only new texts. """Edit path: keep rows whose text survived, embed only new texts.
Unchanged rows keep their embedding and their HNSW/GIN index entries; Unchanged rows keep their embedding and their HNSW/GIN index entries. An
moved rows get a position-only UPDATE, which touches neither index. edit can shift a kept chunk's char span without changing its text, so
every kept row's position and span are refreshed whenever they drift.
""" """
new_texts = await chunk_markdown( slices = await chunk_slices(
content, use_code_chunker=connector_doc.should_use_code_chunker content, use_code_chunker=connector_doc.should_use_code_chunker
) )
new_texts = [s.text for s in slices]
plan = reconcile(existing, new_texts) plan = reconcile(existing, new_texts)
# One batch: the document-level summary vector plus the missing chunks. # One batch: the document-level summary vector plus the missing chunks.
embeddings = await embed_batch([content, *[t for _, t in plan.to_embed]]) embeddings = await embed_batch([content, *[t for _, t in plan.to_embed]])
summary_embedding, *new_embeddings = embeddings summary_embedding, *new_embeddings = embeddings
if plan.reused:
await self.session.execute(
update(Chunk),
[{"id": cid, "position": pos} for cid, pos in plan.reused],
)
if plan.to_delete: if plan.to_delete:
await self.session.execute( await self.session.execute(
delete(Chunk).where(Chunk.id.in_(plan.to_delete)) delete(Chunk).where(Chunk.id.in_(plan.to_delete))
) )
span_updates = self._kept_row_span_updates(existing, slices, plan)
if span_updates:
await self.session.execute(update(Chunk), span_updates)
self.session.add_all( self.session.add_all(
Chunk( Chunk(
content=text, content=slices[pos].text,
embedding=emb, embedding=emb,
position=pos, position=pos,
start_char=slices[pos].start_char,
end_char=slices[pos].end_char,
document_id=document.id, document_id=document.id,
) )
for (pos, text), emb in zip(plan.to_embed, new_embeddings, strict=True) for (pos, _text), emb in zip(plan.to_embed, new_embeddings, strict=True)
) )
document.embedding = summary_embedding document.embedding = summary_embedding
@ -564,6 +585,36 @@ class IndexingPipelineService:
) )
return len(new_texts) return len(new_texts)
@staticmethod
def _kept_row_span_updates(
existing: list[ExistingChunk],
slices: list[ChunkSlice],
plan: ChunkPlan,
) -> list[dict]:
"""Position/span writes for kept rows, emitted only where a value drifts."""
deleted = set(plan.to_delete)
moved = dict(plan.reused)
updates: list[dict] = []
for chunk in existing:
if chunk.id in deleted:
continue
new_position = moved.get(chunk.id, chunk.position)
target = slices[new_position]
if (
chunk.position != new_position
or chunk.start_char != target.start_char
or chunk.end_char != target.end_char
):
updates.append(
{
"id": chunk.id,
"position": new_position,
"start_char": target.start_char,
"end_char": target.end_char,
}
)
return updates
async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None: async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None:
"""Fire-and-forget: enqueue incremental AI sort if the search space has it enabled.""" """Fire-and-forget: enqueue incremental AI sort if the search space has it enabled."""
try: try: