diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index d67eea360..e544b67cc 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -40,11 +40,16 @@ class IndexingPipelineService: Persist new documents and detect changes, returning only those that need indexing. """ documents = [] + seen_hashes: set[str] = set() for connector_doc in connector_docs: unique_identifier_hash = compute_unique_identifier_hash(connector_doc) content_hash = compute_content_hash(connector_doc) + if unique_identifier_hash in seen_hashes: + continue + seen_hashes.add(unique_identifier_hash) + result = await self.session.execute( select(Document).filter(Document.unique_identifier_hash == unique_identifier_hash) ) @@ -55,6 +60,10 @@ class IndexingPipelineService: if existing.title != connector_doc.title: existing.title = connector_doc.title existing.updated_at = datetime.now(UTC) + if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + existing.status = DocumentStatus.pending() + existing.updated_at = datetime.now(UTC) + documents.append(existing) continue existing.title = connector_doc.title diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py index f84bcc5e6..4c70cdedc 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py @@ -23,28 +23,36 @@ async def test_new_document_is_persisted_with_pending_status( assert reloaded is not None assert DocumentStatus.is_state(reloaded.status, DocumentStatus.PENDING) + assert reloaded.source_markdown == doc.source_markdown -async def test_unchanged_document_is_skipped( - db_session, db_search_space, make_connector_document +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_unchanged_ready_document_is_skipped( + db_session, db_search_space, make_connector_document, mocker, ): doc = make_connector_document(search_space_id=db_search_space.id) service = IndexingPipelineService(session=db_session) - await service.prepare_for_indexing([doc]) + # Index fully so the document reaches ready state + prepared = await service.prepare_for_indexing([doc]) + await service.index(prepared[0], doc, llm=mocker.Mock()) + + # Same content on the next run — a ready document must be skipped results = await service.prepare_for_indexing([doc]) assert results == [] +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") async def test_title_only_change_updates_title_in_db( - db_session, db_search_space, make_connector_document + db_session, db_search_space, make_connector_document, mocker, ): original = make_connector_document(search_space_id=db_search_space.id, title="Original Title") service = IndexingPipelineService(session=db_session) - first = await service.prepare_for_indexing([original]) - document_id = first[0].id + prepared = await service.prepare_for_indexing([original]) + document_id = prepared[0].id + await service.index(prepared[0], original, llm=mocker.Mock()) renamed = make_connector_document(search_space_id=db_search_space.id, title="Updated Title") results = await service.prepare_for_indexing([renamed]) @@ -227,6 +235,31 @@ async def test_same_content_from_different_source_is_skipped( assert len(result.scalars().all()) == 1 +@pytest.mark.usefixtures("patched_summarize_raises", "patched_embed_text", "patched_chunk_text") +async def test_failed_document_with_unchanged_content_is_requeued( + db_session, db_search_space, make_connector_document, mocker, +): + doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + # First run: document is created and indexing crashes → status = failed + prepared = await service.prepare_for_indexing([doc]) + document_id = prepared[0].id + await service.index(prepared[0], doc, llm=mocker.Mock()) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + assert DocumentStatus.is_state(result.scalars().first().status, DocumentStatus.FAILED) + + # Next run: same content, pipeline must re-queue the failed document + results = await service.prepare_for_indexing([doc]) + + assert len(results) == 1 + assert results[0].id == document_id + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + assert DocumentStatus.is_state(result.scalars().first().status, DocumentStatus.PENDING) + + async def test_title_and_content_change_updates_both_and_returns_document( db_session, db_search_space, make_connector_document ):