fix: handle IntegrityError in prepare_for_indexing and add within-batch content dedup test

This commit is contained in:
CREDO23 2026-02-25 12:03:00 +02:00
parent 1b4ed35de3
commit e6b7ce7345
2 changed files with 89 additions and 56 deletions

View file

@ -1,13 +1,11 @@
from datetime import UTC, datetime
from sqlalchemy import select
from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import object_session
from sqlalchemy.orm.attributes import set_committed_value
from sqlalchemy import delete
from app.db import Chunk, Document, DocumentStatus
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_chunker import chunk_text
@ -39,67 +37,75 @@ class IndexingPipelineService:
"""
Persist new documents and detect changes, returning only those that need indexing.
"""
documents = []
seen_hashes: set[str] = set()
try:
documents = []
seen_hashes: set[str] = set()
for connector_doc in connector_docs:
unique_identifier_hash = compute_unique_identifier_hash(connector_doc)
content_hash = compute_content_hash(connector_doc)
for connector_doc in connector_docs:
unique_identifier_hash = compute_unique_identifier_hash(connector_doc)
content_hash = compute_content_hash(connector_doc)
if unique_identifier_hash in seen_hashes:
continue
seen_hashes.add(unique_identifier_hash)
if unique_identifier_hash in seen_hashes:
continue
seen_hashes.add(unique_identifier_hash)
result = await self.session.execute(
select(Document).filter(Document.unique_identifier_hash == unique_identifier_hash)
)
existing = result.scalars().first()
result = await self.session.execute(
select(Document).filter(Document.unique_identifier_hash == unique_identifier_hash)
)
existing = result.scalars().first()
if existing is not None:
if existing.content_hash == content_hash:
if existing.title != connector_doc.title:
existing.title = connector_doc.title
existing.updated_at = datetime.now(UTC)
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
existing.status = DocumentStatus.pending()
existing.updated_at = datetime.now(UTC)
documents.append(existing)
if existing is not None:
if existing.content_hash == content_hash:
if existing.title != connector_doc.title:
existing.title = connector_doc.title
existing.updated_at = datetime.now(UTC)
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
existing.status = DocumentStatus.pending()
existing.updated_at = datetime.now(UTC)
documents.append(existing)
continue
existing.title = connector_doc.title
existing.content_hash = content_hash
existing.source_markdown = connector_doc.source_markdown
existing.document_metadata = connector_doc.metadata
existing.updated_at = datetime.now(UTC)
existing.status = DocumentStatus.pending()
documents.append(existing)
continue
existing.title = connector_doc.title
existing.content_hash = content_hash
existing.source_markdown = connector_doc.source_markdown
existing.document_metadata = connector_doc.metadata
existing.updated_at = datetime.now(UTC)
existing.status = DocumentStatus.pending()
documents.append(existing)
continue
duplicate = await self.session.execute(
select(Document).filter(Document.content_hash == content_hash)
)
if duplicate.scalars().first() is not None:
continue
duplicate = await self.session.execute(
select(Document).filter(Document.content_hash == content_hash)
)
if duplicate.scalars().first() is not None:
continue
document = Document(
title=connector_doc.title,
document_type=connector_doc.document_type,
content="Pending...",
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
source_markdown=connector_doc.source_markdown,
document_metadata=connector_doc.metadata,
search_space_id=connector_doc.search_space_id,
connector_id=connector_doc.connector_id,
created_by_id=connector_doc.created_by_id,
updated_at=datetime.now(UTC),
status=DocumentStatus.pending(),
)
self.session.add(document)
documents.append(document)
document = Document(
title=connector_doc.title,
document_type=connector_doc.document_type,
content="Pending...",
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash,
source_markdown=connector_doc.source_markdown,
document_metadata=connector_doc.metadata,
search_space_id=connector_doc.search_space_id,
connector_id=connector_doc.connector_id,
created_by_id=connector_doc.created_by_id,
updated_at=datetime.now(UTC),
status=DocumentStatus.pending(),
)
self.session.add(document)
documents.append(document)
await self.session.commit()
return documents
await self.session.commit()
return documents
except IntegrityError:
# Most likely a concurrent worker committed a document with the same
# content_hash or unique_identifier_hash. Roll back and let the next
# sync run handle it.
await self.session.rollback()
return []
async def index(
self, document: Document, connector_doc: ConnectorDocument, llm

View file

@ -209,6 +209,31 @@ async def test_updated_at_advances_when_content_changes(
assert updated_at_v2 > updated_at_v1
async def test_same_content_from_different_source_skipped_in_single_batch(
db_session, db_search_space, make_connector_document
):
first = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-a",
source_markdown="## Shared content",
)
second = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-b",
source_markdown="## Shared content",
)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing([first, second])
assert len(results) == 1
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
assert len(result.scalars().all()) == 1
async def test_same_content_from_different_source_is_skipped(
db_session, db_search_space, make_connector_document
):
@ -288,3 +313,5 @@ async def test_title_and_content_change_updates_both_and_returns_document(
assert reloaded.title == "Updated Title"
assert reloaded.source_markdown == "## v2"
# explain how this No no_autoflush guard for duplicate check is a regression in new pipeline , explain this Notion chunks wrong string Behavioral diff Chunks page content Would chunk full wrapper , let us discuss about this : GitHub can't split embedding vs chunk content Behavioral diff Two strings One source_markdown