diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index e544b67cc..c542008cf 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -1,13 +1,11 @@ from datetime import UTC, datetime -from sqlalchemy import select +from sqlalchemy import delete, select +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession - from sqlalchemy.orm import object_session from sqlalchemy.orm.attributes import set_committed_value -from sqlalchemy import delete - from app.db import Chunk, Document, DocumentStatus from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_chunker import chunk_text @@ -39,67 +37,75 @@ class IndexingPipelineService: """ Persist new documents and detect changes, returning only those that need indexing. """ - documents = [] - seen_hashes: set[str] = set() + try: + documents = [] + seen_hashes: set[str] = set() - for connector_doc in connector_docs: - unique_identifier_hash = compute_unique_identifier_hash(connector_doc) - content_hash = compute_content_hash(connector_doc) + for connector_doc in connector_docs: + unique_identifier_hash = compute_unique_identifier_hash(connector_doc) + content_hash = compute_content_hash(connector_doc) - if unique_identifier_hash in seen_hashes: - continue - seen_hashes.add(unique_identifier_hash) + if unique_identifier_hash in seen_hashes: + continue + seen_hashes.add(unique_identifier_hash) - result = await self.session.execute( - select(Document).filter(Document.unique_identifier_hash == unique_identifier_hash) - ) - existing = result.scalars().first() + result = await self.session.execute( + select(Document).filter(Document.unique_identifier_hash == unique_identifier_hash) + ) + existing = result.scalars().first() - if existing is not None: - if existing.content_hash == content_hash: - if existing.title != connector_doc.title: - existing.title = connector_doc.title - existing.updated_at = datetime.now(UTC) - if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): - existing.status = DocumentStatus.pending() - existing.updated_at = datetime.now(UTC) - documents.append(existing) + if existing is not None: + if existing.content_hash == content_hash: + if existing.title != connector_doc.title: + existing.title = connector_doc.title + existing.updated_at = datetime.now(UTC) + if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + existing.status = DocumentStatus.pending() + existing.updated_at = datetime.now(UTC) + documents.append(existing) + continue + + existing.title = connector_doc.title + existing.content_hash = content_hash + existing.source_markdown = connector_doc.source_markdown + existing.document_metadata = connector_doc.metadata + existing.updated_at = datetime.now(UTC) + existing.status = DocumentStatus.pending() + documents.append(existing) continue - existing.title = connector_doc.title - existing.content_hash = content_hash - existing.source_markdown = connector_doc.source_markdown - existing.document_metadata = connector_doc.metadata - existing.updated_at = datetime.now(UTC) - existing.status = DocumentStatus.pending() - documents.append(existing) - continue + duplicate = await self.session.execute( + select(Document).filter(Document.content_hash == content_hash) + ) + if duplicate.scalars().first() is not None: + continue - duplicate = await self.session.execute( - select(Document).filter(Document.content_hash == content_hash) - ) - if duplicate.scalars().first() is not None: - continue + document = Document( + title=connector_doc.title, + document_type=connector_doc.document_type, + content="Pending...", + content_hash=content_hash, + unique_identifier_hash=unique_identifier_hash, + source_markdown=connector_doc.source_markdown, + document_metadata=connector_doc.metadata, + search_space_id=connector_doc.search_space_id, + connector_id=connector_doc.connector_id, + created_by_id=connector_doc.created_by_id, + updated_at=datetime.now(UTC), + status=DocumentStatus.pending(), + ) + self.session.add(document) + documents.append(document) - document = Document( - title=connector_doc.title, - document_type=connector_doc.document_type, - content="Pending...", - content_hash=content_hash, - unique_identifier_hash=unique_identifier_hash, - source_markdown=connector_doc.source_markdown, - document_metadata=connector_doc.metadata, - search_space_id=connector_doc.search_space_id, - connector_id=connector_doc.connector_id, - created_by_id=connector_doc.created_by_id, - updated_at=datetime.now(UTC), - status=DocumentStatus.pending(), - ) - self.session.add(document) - documents.append(document) + await self.session.commit() + return documents - await self.session.commit() - return documents + except IntegrityError: + # Most likely a concurrent worker committed a document with the same + # content_hash or unique_identifier_hash. Roll back and let the next + # sync run handle it. + await self.session.rollback() + return [] async def index( self, document: Document, connector_doc: ConnectorDocument, llm diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py index 4c70cdedc..c919af5ec 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py @@ -209,6 +209,31 @@ async def test_updated_at_advances_when_content_changes( assert updated_at_v2 > updated_at_v1 +async def test_same_content_from_different_source_skipped_in_single_batch( + db_session, db_search_space, make_connector_document +): + first = make_connector_document( + search_space_id=db_search_space.id, + unique_id="source-a", + source_markdown="## Shared content", + ) + second = make_connector_document( + search_space_id=db_search_space.id, + unique_id="source-b", + source_markdown="## Shared content", + ) + service = IndexingPipelineService(session=db_session) + + results = await service.prepare_for_indexing([first, second]) + + assert len(results) == 1 + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + assert len(result.scalars().all()) == 1 + + async def test_same_content_from_different_source_is_skipped( db_session, db_search_space, make_connector_document ): @@ -288,3 +313,5 @@ async def test_title_and_content_change_updates_both_and_returns_document( assert reloaded.title == "Updated Title" assert reloaded.source_markdown == "## v2" + +# explain how this No no_autoflush guard for duplicate check is a regression in new pipeline , explain this Notion chunks wrong string Behavioral diff Chunks page content Would chunk full wrapper , let us discuss about this : GitHub can't split embedding vs chunk content Behavioral diff Two strings One source_markdown \ No newline at end of file