diff --git a/surfsense_backend/app/indexing_pipeline/document_chunker.py b/surfsense_backend/app/indexing_pipeline/document_chunker.py index 70fe23ef3..719c9f4bb 100644 --- a/surfsense_backend/app/indexing_pipeline/document_chunker.py +++ b/surfsense_backend/app/indexing_pipeline/document_chunker.py @@ -1,6 +1,7 @@ from app.config import config -def chunk_text(text: str) -> list[str]: +def chunk_text(text: str, use_code_chunker: bool = False) -> list[str]: """Chunk a text string using the configured chunker and return the chunk texts.""" - return [c.text for c in config.chunker_instance.chunk(text)] + chunker = config.code_chunker_instance if use_code_chunker else config.chunker_instance + return [c.text for c in chunker.chunk(text)] diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 90a9a361b..ed117eb4c 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -1,3 +1,5 @@ +from datetime import UTC, datetime + from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -55,10 +57,18 @@ class IndexingPipelineService: existing.title = connector_doc.title existing.content_hash = content_hash existing.source_markdown = connector_doc.source_markdown + existing.document_metadata = connector_doc.metadata + existing.updated_at = datetime.now(UTC) existing.status = DocumentStatus.pending() documents.append(existing) continue + duplicate = await self.session.execute( + select(Document).filter(Document.content_hash == content_hash) + ) + if duplicate.scalars().first() is not None: + continue + document = Document( title=connector_doc.title, document_type=connector_doc.document_type, @@ -69,6 +79,8 @@ class IndexingPipelineService: document_metadata=connector_doc.metadata, search_space_id=connector_doc.search_space_id, connector_id=connector_doc.connector_id, + created_by_id=connector_doc.created_by_id, + updated_at=datetime.now(UTC), status=DocumentStatus.pending(), ) self.session.add(document) @@ -98,18 +110,23 @@ class IndexingPipelineService: chunks = [ Chunk(content=text, embedding=embed_text(text)) - for text in chunk_text(connector_doc.source_markdown) + for text in chunk_text( + connector_doc.source_markdown, + use_code_chunker=connector_doc.should_use_code_chunker, + ) ] document.source_markdown = connector_doc.source_markdown document.content = content document.embedding = embedding _safe_set_chunks(document, chunks) + document.updated_at = datetime.now(UTC) document.status = DocumentStatus.ready() await self.session.commit() except Exception as e: await self.session.rollback() await self.session.refresh(document) + document.updated_at = datetime.now(UTC) document.status = DocumentStatus.failed(str(e)) await self.session.commit() diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py b/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py index 815932121..627b4428d 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_index_document.py @@ -110,6 +110,28 @@ async def test_embedding_written_to_db( assert len(reloaded.embedding) == 1024 +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_updated_at_advances_after_indexing( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + updated_at_pending = result.scalars().first().updated_at + + await service.index(document, connector_doc, llm=None) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + updated_at_ready = result.scalars().first().updated_at + + assert updated_at_ready > updated_at_pending + + @pytest.mark.usefixtures("patched_summarize_raises", "patched_chunk_text") async def test_llm_error_sets_status_failed( db_session, db_search_space, make_connector_document, diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py index 9ab46943e..2f6cbf47a 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_prepare_for_indexing.py @@ -115,6 +115,112 @@ async def test_duplicate_in_batch_is_persisted_once( assert len(rows) == 1 +async def test_created_by_id_is_persisted( + db_session, db_user, db_search_space, make_connector_document +): + doc = make_connector_document( + search_space_id=db_search_space.id, + created_by_id=str(db_user.id), + ) + service = IndexingPipelineService(session=db_session) + + results = await service.prepare_for_indexing([doc]) + document_id = results[0].id + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert str(reloaded.created_by_id) == str(db_user.id) + + +async def test_metadata_is_updated_when_content_changes( + db_session, db_search_space, make_connector_document +): + original = make_connector_document( + search_space_id=db_search_space.id, + source_markdown="## v1", + metadata={"status": "in_progress"}, + ) + service = IndexingPipelineService(session=db_session) + + first = await service.prepare_for_indexing([original]) + document_id = first[0].id + + updated = make_connector_document( + search_space_id=db_search_space.id, + source_markdown="## v2", + metadata={"status": "done"}, + ) + await service.prepare_for_indexing([updated]) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert reloaded.document_metadata == {"status": "done"} + + +async def test_updated_at_advances_when_content_changes( + db_session, db_search_space, make_connector_document +): + original = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v1") + service = IndexingPipelineService(session=db_session) + + first = await service.prepare_for_indexing([original]) + document_id = first[0].id + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + updated_at_v1 = result.scalars().first().updated_at + + updated = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v2") + await service.prepare_for_indexing([updated]) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + updated_at_v2 = result.scalars().first().updated_at + + assert updated_at_v2 > updated_at_v1 + + +async def test_updated_at_is_set_on_creation( + db_session, db_search_space, make_connector_document +): + doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + results = await service.prepare_for_indexing([doc]) + document_id = results[0].id + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert reloaded.updated_at is not None + + +async def test_same_content_from_different_source_is_skipped( + db_session, db_search_space, make_connector_document +): + first = make_connector_document( + search_space_id=db_search_space.id, + unique_id="source-a", + source_markdown="## Shared content", + ) + second = make_connector_document( + search_space_id=db_search_space.id, + unique_id="source-b", + source_markdown="## Shared content", + ) + service = IndexingPipelineService(session=db_session) + + await service.prepare_for_indexing([first]) + results = await service.prepare_for_indexing([second]) + + assert results == [] + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + assert len(result.scalars().all()) == 1 + + async def test_title_and_content_change_updates_both_and_returns_document( db_session, db_search_space, make_connector_document ): diff --git a/surfsense_backend/tests/unit/conftest.py b/surfsense_backend/tests/unit/conftest.py deleted file mode 100644 index 4e5c81bcb..000000000 --- a/surfsense_backend/tests/unit/conftest.py +++ /dev/null @@ -1,3 +0,0 @@ -# No fixtures needed for unit tests yet. -# Unit tests cover pure functions and value objects with no dependencies. -# External-boundary mocks (llm, embedding_model) live in tests/integration/conftest.py. diff --git a/surfsense_backend/tests/unit/indexing_pipeline/conftest.py b/surfsense_backend/tests/unit/indexing_pipeline/conftest.py new file mode 100644 index 000000000..886318bc9 --- /dev/null +++ b/surfsense_backend/tests/unit/indexing_pipeline/conftest.py @@ -0,0 +1,15 @@ +import pytest + + +@pytest.fixture +def patched_chunker_instance(mocker): + mock = mocker.patch("app.indexing_pipeline.document_chunker.config.chunker_instance") + mock.chunk.return_value = [mocker.Mock(text="prose chunk")] + return mock + + +@pytest.fixture +def patched_code_chunker_instance(mocker): + mock = mocker.patch("app.indexing_pipeline.document_chunker.config.code_chunker_instance") + mock.chunk.return_value = [mocker.Mock(text="code chunk")] + return mock diff --git a/surfsense_backend/tests/unit/indexing_pipeline/test_document_chunker.py b/surfsense_backend/tests/unit/indexing_pipeline/test_document_chunker.py new file mode 100644 index 000000000..258227cbe --- /dev/null +++ b/surfsense_backend/tests/unit/indexing_pipeline/test_document_chunker.py @@ -0,0 +1,19 @@ +import pytest + +from app.indexing_pipeline.document_chunker import chunk_text + +pytestmark = pytest.mark.unit + + +def test_uses_code_chunker_when_flag_is_true(patched_code_chunker_instance): + result = chunk_text("def foo(): pass", use_code_chunker=True) + + patched_code_chunker_instance.chunk.assert_called_once_with("def foo(): pass") + assert result == ["code chunk"] + + +def test_uses_default_chunker_when_flag_is_false(patched_chunker_instance): + result = chunk_text("Some prose text.", use_code_chunker=False) + + patched_chunker_instance.chunk.assert_called_once_with("Some prose text.") + assert result == ["prose chunk"]