feat(indexing): add pure chunk reconciler for content-addressed diffs

Greedy multiset match on chunk text decides which rows keep their embeddings,
which texts need embedding, and which rows are deleted. No DB, no embeddings;
fully unit-tested (reuse, head insert, middle edit, deletion, duplicates,
reorder, full rewrite).
This commit is contained in:
CREDO23 2026-06-12 18:52:46 +02:00
parent c6e71c851c
commit f82dedf712
2 changed files with 150 additions and 0 deletions

View file

@ -0,0 +1,56 @@
"""Diff a document's existing chunk rows against its freshly chunked texts.
Embeddings are a pure function of chunk text, so a row whose content reappears
in the new chunking keeps its embedding (and its HNSW/GIN index entries); only
genuinely new texts are embedded and only vanished rows are deleted. Matching
is a greedy multiset match on content in document order, so duplicate
boilerplate chunks pair up one-to-one and reordered chunks become cheap
position updates instead of delete+reinsert.
"""
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class ExistingChunk:
id: int
content: str
position: int
@dataclass(frozen=True, slots=True)
class ChunkPlan:
"""The minimal set of writes that turns the stored chunks into the new ones.
``reused`` holds only kept rows whose position actually changed; rows that
match in place need no write at all. Kept-row count (for metrics) is
``len(existing) - len(to_delete)``.
"""
reused: list[tuple[int, int]] # (existing_chunk_id, new_position)
to_embed: list[tuple[int, str]] # (new_position, text)
to_delete: list[int] # existing chunk ids
def reconcile(existing: list[ExistingChunk], new_texts: list[str]) -> ChunkPlan:
available: dict[str, deque[ExistingChunk]] = defaultdict(deque)
for chunk in sorted(existing, key=lambda c: c.position):
available[chunk.content].append(chunk)
reused: list[tuple[int, int]] = []
to_embed: list[tuple[int, str]] = []
for new_position, text in enumerate(new_texts):
matches = available.get(text)
if matches:
chunk = matches.popleft()
if chunk.position != new_position:
reused.append((chunk.id, new_position))
else:
to_embed.append((new_position, text))
to_delete = [chunk.id for queue in available.values() for chunk in queue]
return ChunkPlan(reused=reused, to_embed=to_embed, to_delete=to_delete)

View file

@ -0,0 +1,94 @@
"""reconcile(): diff existing chunk rows against new chunk texts.
The reconciler decides which rows (and embeddings) survive an edit, which texts
must be embedded, and which rows go away -- purely from content, no DB.
"""
from __future__ import annotations
from app.indexing_pipeline.chunk_reconciler import ExistingChunk, reconcile
def _existing(*contents: str) -> list[ExistingChunk]:
return [
ExistingChunk(id=i + 1, content=text, position=i)
for i, text in enumerate(contents)
]
def test_identical_content_keeps_every_row_untouched():
plan = reconcile(_existing("alpha", "beta", "gamma"), ["alpha", "beta", "gamma"])
assert plan.to_embed == []
assert plan.to_delete == []
assert plan.reused == []
def test_head_insert_embeds_only_the_new_chunk_and_shifts_the_rest():
plan = reconcile(_existing("alpha", "beta"), ["intro", "alpha", "beta"])
assert plan.to_embed == [(0, "intro")]
assert plan.to_delete == []
# alpha: position 0 -> 1, beta: 1 -> 2; embeddings untouched.
assert plan.reused == [(1, 1), (2, 2)]
def test_middle_edit_swaps_exactly_one_chunk():
plan = reconcile(
_existing("alpha", "beta", "gamma"), ["alpha", "beta EDITED", "gamma"]
)
assert plan.to_embed == [(1, "beta EDITED")]
assert plan.to_delete == [2]
# Neighbours did not move, so no position writes at all.
assert plan.reused == []
def test_removed_chunk_is_deleted_and_followers_shift_up():
plan = reconcile(_existing("alpha", "beta", "gamma"), ["alpha", "gamma"])
assert plan.to_embed == []
assert plan.to_delete == [2]
assert plan.reused == [(3, 1)]
def test_duplicate_texts_pair_up_one_to_one():
# Two identical boilerplate chunks, only one survives the edit: exactly one
# row is kept and exactly one is deleted -- never both kept or both dropped.
plan = reconcile(_existing("boiler", "boiler", "body"), ["boiler", "body"])
assert plan.to_embed == []
assert plan.to_delete == [2]
assert plan.reused == [(3, 1)]
def test_duplicate_growth_embeds_only_the_extra_copy():
plan = reconcile(_existing("boiler", "body"), ["boiler", "boiler", "body"])
assert plan.to_embed == [(1, "boiler")]
assert plan.to_delete == []
assert plan.reused == [(2, 2)]
def test_reorder_becomes_position_updates_with_no_embedding():
plan = reconcile(_existing("alpha", "beta"), ["beta", "alpha"])
assert plan.to_embed == []
assert plan.to_delete == []
assert sorted(plan.reused) == [(1, 1), (2, 0)]
def test_full_rewrite_replaces_everything():
plan = reconcile(_existing("alpha", "beta"), ["new one", "new two"])
assert plan.to_embed == [(0, "new one"), (1, "new two")]
assert sorted(plan.to_delete) == [1, 2]
assert plan.reused == []
def test_no_existing_chunks_embeds_all():
plan = reconcile([], ["alpha", "beta"])
assert plan.to_embed == [(0, "alpha"), (1, "beta")]
assert plan.to_delete == []
assert plan.reused == []