diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index c542008cf..af2b2a2ff 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -37,11 +37,11 @@ class IndexingPipelineService: """ Persist new documents and detect changes, returning only those that need indexing. """ - try: - documents = [] - seen_hashes: set[str] = set() + documents = [] + seen_hashes: set[str] = set() - for connector_doc in connector_docs: + for connector_doc in connector_docs: + try: unique_identifier_hash = compute_unique_identifier_hash(connector_doc) content_hash = compute_content_hash(connector_doc) @@ -97,13 +97,16 @@ class IndexingPipelineService: self.session.add(document) documents.append(document) + except Exception: + continue + + try: await self.session.commit() return documents - except IntegrityError: - # Most likely a concurrent worker committed a document with the same - # content_hash or unique_identifier_hash. Roll back and let the next - # sync run handle it. + # A concurrent worker committed a document with the same content_hash + # or unique_identifier_hash between our check and our INSERT. + # The document already exists — roll back and let the next sync run handle it. await self.session.rollback() return [] diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py index c919af5ec..8b66b8323 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py @@ -2,6 +2,7 @@ import pytest from sqlalchemy import select from app.db import Document, DocumentStatus +from app.indexing_pipeline.document_hashing import compute_content_hash as real_compute_content_hash from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService pytestmark = pytest.mark.integration @@ -314,4 +315,49 @@ async def test_title_and_content_change_updates_both_and_returns_document( assert reloaded.title == "Updated Title" assert reloaded.source_markdown == "## v2" -# explain how this No no_autoflush guard for duplicate check is a regression in new pipeline , explain this Notion chunks wrong string Behavioral diff Chunks page content Would chunk full wrapper , let us discuss about this : GitHub can't split embedding vs chunk content Behavioral diff Two strings One source_markdown \ No newline at end of file + + +async def test_one_bad_document_in_batch_does_not_prevent_others_from_being_persisted( + db_session, db_search_space, make_connector_document, monkeypatch, +): + """ + A per-document error during prepare_for_indexing must be isolated. + The two valid documents around the failing one must still be persisted. + """ + docs = [ + make_connector_document( + search_space_id=db_search_space.id, + unique_id="good-1", + source_markdown="## Good doc 1", + ), + make_connector_document( + search_space_id=db_search_space.id, + unique_id="will-fail", + source_markdown="## Bad doc", + ), + make_connector_document( + search_space_id=db_search_space.id, + unique_id="good-2", + source_markdown="## Good doc 2", + ), + ] + + def compute_content_hash_with_error(doc): + if doc.unique_id == "will-fail": + raise RuntimeError("Simulated per-document failure") + return real_compute_content_hash(doc) + + monkeypatch.setattr( + "app.indexing_pipeline.indexing_pipeline_service.compute_content_hash", + compute_content_hash_with_error, + ) + + service = IndexingPipelineService(session=db_session) + results = await service.prepare_for_indexing(docs) + + assert len(results) == 2 + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + assert len(result.scalars().all()) == 2 \ No newline at end of file