diff --git a/surfsense_backend/app/indexing_pipeline/connector_document.py b/surfsense_backend/app/indexing_pipeline/connector_document.py index cf756ba42..71e3ab700 100644 --- a/surfsense_backend/app/indexing_pipeline/connector_document.py +++ b/surfsense_backend/app/indexing_pipeline/connector_document.py @@ -4,6 +4,7 @@ from app.db import DocumentType class ConnectorDocument(BaseModel): + """Canonical data transfer object produced by connector adapters and consumed by the indexing pipeline.""" title: str source_markdown: str unique_id: str diff --git a/surfsense_backend/app/indexing_pipeline/document_chunker.py b/surfsense_backend/app/indexing_pipeline/document_chunker.py new file mode 100644 index 000000000..70fe23ef3 --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/document_chunker.py @@ -0,0 +1,6 @@ +from app.config import config + + +def chunk_text(text: str) -> list[str]: + """Chunk a text string using the configured chunker and return the chunk texts.""" + return [c.text for c in config.chunker_instance.chunk(text)] diff --git a/surfsense_backend/app/indexing_pipeline/document_embedder.py b/surfsense_backend/app/indexing_pipeline/document_embedder.py new file mode 100644 index 000000000..ea24a5a56 --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/document_embedder.py @@ -0,0 +1,6 @@ +from app.config import config + + +def embed_text(text: str) -> list[float]: + """Embed a single text string using the configured embedding model.""" + return config.embedding_model_instance.embed(text) diff --git a/surfsense_backend/app/indexing_pipeline/document_hashing.py b/surfsense_backend/app/indexing_pipeline/document_hashing.py index 6b352a953..5dd7767a4 100644 --- a/surfsense_backend/app/indexing_pipeline/document_hashing.py +++ b/surfsense_backend/app/indexing_pipeline/document_hashing.py @@ -4,10 +4,12 @@ from app.indexing_pipeline.connector_document import ConnectorDocument def compute_unique_identifier_hash(doc: ConnectorDocument) -> str: + """Return a stable SHA-256 hash identifying a document by its source identity.""" combined = f"{doc.document_type.value}:{doc.unique_id}:{doc.search_space_id}" return hashlib.sha256(combined.encode("utf-8")).hexdigest() def compute_content_hash(doc: ConnectorDocument) -> str: + """Return a SHA-256 hash of the document's content scoped to its search space.""" combined = f"{doc.search_space_id}:{doc.source_markdown}" return hashlib.sha256(combined.encode("utf-8")).hexdigest() diff --git a/surfsense_backend/app/indexing_pipeline/document_summarizer.py b/surfsense_backend/app/indexing_pipeline/document_summarizer.py new file mode 100644 index 000000000..1e708075e --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/document_summarizer.py @@ -0,0 +1,28 @@ +from app.prompts import SUMMARY_PROMPT_TEMPLATE +from app.utils.document_converters import optimize_content_for_context_window + + +async def summarize_document(source_markdown: str, llm, metadata: dict | None = None) -> str: + """Generate a text summary of a document using an LLM, prefixed with metadata when provided.""" + model_name = getattr(llm, "model", "gpt-3.5-turbo") + optimized_content = optimize_content_for_context_window( + source_markdown, metadata, model_name + ) + + summary_chain = SUMMARY_PROMPT_TEMPLATE | llm + content_with_metadata = ( + f"\n\n{metadata}\n\n" + f"\n\n\n\n{optimized_content}\n\n" + ) + summary_result = await summary_chain.ainvoke({"document": content_with_metadata}) + summary_content = summary_result.content + + if metadata: + metadata_parts = ["# DOCUMENT METADATA"] + for key, value in metadata.items(): + if value: + metadata_parts.append(f"**{key.replace('_', ' ').title()}:** {value}") + metadata_section = "\n".join(metadata_parts) + return f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}" + + return summary_content diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index d5f50891c..90a9a361b 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -4,14 +4,16 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import object_session from sqlalchemy.orm.attributes import set_committed_value -from app.config import config -from app.db import Document, DocumentStatus +from app.db import Chunk, Document, DocumentStatus from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_chunker import chunk_text +from app.indexing_pipeline.document_embedder import embed_text from app.indexing_pipeline.document_hashing import compute_content_hash, compute_unique_identifier_hash -from app.utils.document_converters import create_document_chunks, generate_document_summary +from app.indexing_pipeline.document_summarizer import summarize_document def _safe_set_chunks(document: Document, chunks: list) -> None: + """Assign chunks to a document without triggering SQLAlchemy async lazy loading.""" set_committed_value(document, "chunks", chunks) session = object_session(document) if session is not None: @@ -22,12 +24,17 @@ def _safe_set_chunks(document: Document, chunks: list) -> None: class IndexingPipelineService: + """Single pipeline for indexing connector documents. All connectors use this service.""" + def __init__(self, session: AsyncSession) -> None: self.session = session async def prepare_for_indexing( self, connector_docs: list[ConnectorDocument] ) -> list[Document]: + """ + Persist new documents and detect changes, returning only those that need indexing. + """ documents = [] for connector_doc in connector_docs: @@ -73,19 +80,26 @@ class IndexingPipelineService: async def index( self, document: Document, connector_doc: ConnectorDocument, llm ) -> None: + """ + Run summarization, embedding, and chunking for a document and persist the results. + """ try: document.status = DocumentStatus.processing() await self.session.commit() if connector_doc.should_summarize: - content, embedding = await generate_document_summary( + content = await summarize_document( connector_doc.source_markdown, llm, connector_doc.metadata ) else: content = connector_doc.source_markdown - embedding = config.embedding_model_instance.embed(content) - chunks = await create_document_chunks(connector_doc.source_markdown) + embedding = embed_text(content) + + chunks = [ + Chunk(content=text, embedding=embed_text(text)) + for text in chunk_text(connector_doc.source_markdown) + ] document.source_markdown = connector_doc.source_markdown document.content = content diff --git a/surfsense_backend/tests/integration/conftest.py b/surfsense_backend/tests/integration/conftest.py index 0ff860937..155f8e74d 100644 --- a/surfsense_backend/tests/integration/conftest.py +++ b/surfsense_backend/tests/integration/conftest.py @@ -89,40 +89,42 @@ async def db_search_space(db_session: AsyncSession, db_user: User) -> SearchSpac @pytest.fixture -def mock_llm() -> AsyncMock: - llm = AsyncMock() - llm.ainvoke = AsyncMock(return_value=MagicMock(content="Mocked summary.")) - return llm - - -@pytest.fixture -def patched_generate_summary(monkeypatch) -> AsyncMock: - mock = AsyncMock(return_value=("Mocked summary.", [0.1] * _EMBEDDING_DIM)) +def patched_summarize(monkeypatch) -> AsyncMock: + mock = AsyncMock(return_value="Mocked summary.") monkeypatch.setattr( - "app.indexing_pipeline.indexing_pipeline_service.generate_document_summary", + "app.indexing_pipeline.indexing_pipeline_service.summarize_document", mock, ) return mock @pytest.fixture -def patched_create_chunks(monkeypatch) -> MagicMock: - from app.db import Chunk - - chunk = Chunk(content="Test chunk content.", embedding=[0.1] * _EMBEDDING_DIM) - mock = AsyncMock(return_value=[chunk]) +def patched_summarize_raises(monkeypatch) -> AsyncMock: + mock = AsyncMock(side_effect=RuntimeError("LLM unavailable")) monkeypatch.setattr( - "app.indexing_pipeline.indexing_pipeline_service.create_document_chunks", + "app.indexing_pipeline.indexing_pipeline_service.summarize_document", mock, ) return mock @pytest.fixture -def patched_embedding_model(monkeypatch) -> MagicMock: - from app.config import config +def patched_embed_text(monkeypatch) -> MagicMock: + mock = MagicMock(return_value=[0.1] * _EMBEDDING_DIM) + monkeypatch.setattr( + "app.indexing_pipeline.indexing_pipeline_service.embed_text", + mock, + ) + return mock + + +@pytest.fixture +def patched_chunk_text(monkeypatch) -> MagicMock: + mock = MagicMock(return_value=["Test chunk content."]) + monkeypatch.setattr( + "app.indexing_pipeline.indexing_pipeline_service.chunk_text", + mock, + ) + return mock + - model = MagicMock() - model.embed = MagicMock(return_value=[0.1] * _EMBEDDING_DIM) - monkeypatch.setattr(config, "embedding_model_instance", model) - return model diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_index.py b/surfsense_backend/tests/integration/indexing_pipeline/test_index.py index 5edcfb22d..815932121 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_index.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_index.py @@ -1,15 +1,15 @@ import pytest from sqlalchemy import select -from app.db import Document, DocumentStatus +from app.db import Chunk, Document, DocumentStatus from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService pytestmark = pytest.mark.integration +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") async def test_sets_status_ready( db_session, db_search_space, make_connector_document, - mock_llm, patched_generate_summary, patched_create_chunks, ): connector_doc = make_connector_document(search_space_id=db_search_space.id) service = IndexingPipelineService(session=db_session) @@ -18,9 +18,137 @@ async def test_sets_status_ready( document = prepared[0] document_id = document.id - await service.index(document, connector_doc, mock_llm) + await service.index(document, connector_doc, llm=None) result = await db_session.execute(select(Document).filter(Document.id == document_id)) reloaded = result.scalars().first() assert DocumentStatus.is_state(reloaded.status, DocumentStatus.READY) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_content_is_summary_when_should_summarize_true( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=None) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert reloaded.content == "Mocked summary." + + +@pytest.mark.usefixtures("patched_embed_text", "patched_chunk_text") +async def test_content_is_source_markdown_when_should_summarize_false( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document( + search_space_id=db_search_space.id, + should_summarize=False, + source_markdown="## Raw content", + ) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=None) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert reloaded.content == "## Raw content" + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_chunks_written_to_db( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=None) + + result = await db_session.execute( + select(Chunk).filter(Chunk.document_id == document_id) + ) + chunks = result.scalars().all() + + assert len(chunks) == 1 + assert chunks[0].content == "Test chunk content." + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_embedding_written_to_db( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=None) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert reloaded.embedding is not None + assert len(reloaded.embedding) == 1024 + + +@pytest.mark.usefixtures("patched_summarize_raises", "patched_chunk_text") +async def test_llm_error_sets_status_failed( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=None) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert DocumentStatus.is_state(reloaded.status, DocumentStatus.FAILED) + + +@pytest.mark.usefixtures("patched_summarize_raises", "patched_chunk_text") +async def test_llm_error_leaves_no_partial_data( + db_session, db_search_space, make_connector_document, +): + connector_doc = make_connector_document(search_space_id=db_search_space.id) + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([connector_doc]) + document = prepared[0] + document_id = document.id + + await service.index(document, connector_doc, llm=None) + + result = await db_session.execute(select(Document).filter(Document.id == document_id)) + reloaded = result.scalars().first() + + assert reloaded.embedding is None + assert reloaded.content == "Pending..." + + chunks_result = await db_session.execute( + select(Chunk).filter(Chunk.document_id == document_id) + ) + assert chunks_result.scalars().all() == []