diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 05bb79218..d67eea360 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -6,6 +6,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import object_session from sqlalchemy.orm.attributes import set_committed_value +from sqlalchemy import delete + from app.db import Chunk, Document, DocumentStatus from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_chunker import chunk_text @@ -52,6 +54,7 @@ class IndexingPipelineService: if existing.content_hash == content_hash: if existing.title != connector_doc.title: existing.title = connector_doc.title + existing.updated_at = datetime.now(UTC) continue existing.title = connector_doc.title @@ -99,7 +102,7 @@ class IndexingPipelineService: document.status = DocumentStatus.processing() await self.session.commit() - if connector_doc.should_summarize: + if connector_doc.should_summarize and llm is not None: content = await summarize_document( connector_doc.source_markdown, llm, connector_doc.metadata ) @@ -108,6 +111,10 @@ class IndexingPipelineService: embedding = embed_text(content) + await self.session.execute( + delete(Chunk).where(Chunk.document_id == document.id) + ) + chunks = [ Chunk(content=text, embedding=embed_text(text)) for text in chunk_text( diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py b/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py index 627b4428d..696def4b2 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py @@ -28,7 +28,7 @@ async def test_sets_status_ready( @pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") async def test_content_is_summary_when_should_summarize_true( - db_session, db_search_space, make_connector_document, + db_session, db_search_space, make_connector_document, mocker, ): connector_doc = make_connector_document(search_space_id=db_search_space.id) service = IndexingPipelineService(session=db_session) @@ -37,7 +37,7 @@ async def test_content_is_summary_when_should_summarize_true( document = prepared[0] document_id = document.id - await service.index(document, connector_doc, llm=None) + await service.index(document, connector_doc, llm=mocker.Mock()) result = await db_session.execute(select(Document).filter(Document.id == document_id)) reloaded = result.scalars().first() @@ -132,11 +132,15 @@ async def test_updated_at_advances_after_indexing( assert updated_at_ready > updated_at_pending -@pytest.mark.usefixtures("patched_summarize_raises", "patched_chunk_text") -async def test_llm_error_sets_status_failed( +@pytest.mark.usefixtures("patched_embed_text", "patched_chunk_text") +async def test_no_llm_falls_back_to_source_markdown( db_session, db_search_space, make_connector_document, ): - connector_doc = make_connector_document(search_space_id=db_search_space.id) + connector_doc = make_connector_document( + search_space_id=db_search_space.id, + should_summarize=True, + source_markdown="## Fallback content", + ) service = IndexingPipelineService(session=db_session) prepared = await service.prepare_for_indexing([connector_doc]) @@ -148,12 +152,44 @@ async def test_llm_error_sets_status_failed( result = await db_session.execute(select(Document).filter(Document.id == document_id)) reloaded = result.scalars().first() - assert DocumentStatus.is_state(reloaded.status, DocumentStatus.FAILED) + assert DocumentStatus.is_state(reloaded.status, DocumentStatus.READY) + assert reloaded.content == "## Fallback content" + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_reindex_replaces_old_chunks( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document( + search_space_id=db_search_space.id, + source_markdown="## v1", + ) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=None) + + updated_doc = make_connector_document( + search_space_id=db_search_space.id, + source_markdown="## v2", + ) + re_prepared = await service.prepare_for_indexing([updated_doc]) + await service.index(re_prepared[0], updated_doc, llm=None) + + result = await db_session.execute( + select(Chunk).filter(Chunk.document_id == document_id) + ) + chunks = result.scalars().all() + + assert len(chunks) == 1 @pytest.mark.usefixtures("patched_summarize_raises", "patched_chunk_text") -async def test_llm_error_leaves_no_partial_data( - db_session, db_search_space, make_connector_document, +async def test_llm_error_sets_status_failed( + db_session, db_search_space, make_connector_document, mocker, ): connector_doc = make_connector_document(search_space_id=db_search_space.id) service = IndexingPipelineService(session=db_session) @@ -162,7 +198,26 @@ async def test_llm_error_leaves_no_partial_data( document = prepared[0] document_id = document.id - await service.index(document, connector_doc, llm=None) + await service.index(document, connector_doc, llm=mocker.Mock()) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert DocumentStatus.is_state(reloaded.status, DocumentStatus.FAILED) + + +@pytest.mark.usefixtures("patched_summarize_raises", "patched_chunk_text") +async def test_llm_error_leaves_no_partial_data( + db_session, db_search_space, make_connector_document, mocker, +): + connector_doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=mocker.Mock()) result = await db_session.execute(select(Document).filter(Document.id == document_id)) reloaded = result.scalars().first() diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py index b3cce7eaa..f84bcc5e6 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py @@ -159,6 +159,27 @@ async def test_metadata_is_updated_when_content_changes( assert reloaded.document_metadata == {"status": "done"} +async def test_updated_at_advances_when_title_only_changes( + db_session, db_search_space, make_connector_document +): + original = make_connector_document(search_space_id=db_search_space.id, title="Old Title") + service = IndexingPipelineService(session=db_session) + + first = await service.prepare_for_indexing([original]) + document_id = first[0].id + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + updated_at_v1 = result.scalars().first().updated_at + + renamed = make_connector_document(search_space_id=db_search_space.id, title="New Title") + await service.prepare_for_indexing([renamed]) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + updated_at_v2 = result.scalars().first().updated_at + + assert updated_at_v2 > updated_at_v1 + + async def test_updated_at_advances_when_content_changes( db_session, db_search_space, make_connector_document ):