diff --git a/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py b/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py index ab1095ee3..0bbb67105 100644 --- a/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py +++ b/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py @@ -1,47 +1,83 @@ from sqlalchemy.ext.asyncio import AsyncSession -from app.db import DocumentStatus, DocumentType +from app.db import Document, DocumentStatus, DocumentType from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_content_hash from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService -async def index_uploaded_file( - markdown_content: str, - filename: str, - etl_service: str, - search_space_id: int, - user_id: str, - session: AsyncSession, - llm, - should_summarize: bool = False, -) -> None: - connector_doc = ConnectorDocument( - title=filename, - source_markdown=markdown_content, - unique_id=filename, - document_type=DocumentType.FILE, - search_space_id=search_space_id, - created_by_id=user_id, - connector_id=None, - should_summarize=should_summarize, - should_use_code_chunker=False, - fallback_summary=markdown_content[:4000], - metadata={ - "FILE_NAME": filename, - "ETL_SERVICE": etl_service, - }, - ) +class UploadDocumentAdapter: + def __init__(self, session: AsyncSession) -> None: + self._session = session + self._service = IndexingPipelineService(session) - service = IndexingPipelineService(session) - documents = await service.prepare_for_indexing([connector_doc]) + async def index( + self, + markdown_content: str, + filename: str, + etl_service: str, + search_space_id: int, + user_id: str, + llm, + should_summarize: bool = False, + ) -> None: + connector_doc = ConnectorDocument( + title=filename, + source_markdown=markdown_content, + unique_id=filename, + document_type=DocumentType.FILE, + search_space_id=search_space_id, + created_by_id=user_id, + connector_id=None, + should_summarize=should_summarize, + should_use_code_chunker=False, + fallback_summary=markdown_content[:4000], + metadata={ + "FILE_NAME": filename, + "ETL_SERVICE": etl_service, + }, + ) - if not documents: - raise RuntimeError("prepare_for_indexing returned no documents") + documents = await self._service.prepare_for_indexing([connector_doc]) - indexed = await service.index(documents[0], connector_doc, llm) + if not documents: + raise RuntimeError("prepare_for_indexing returned no documents") - if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY): - raise RuntimeError(indexed.status.get("reason", "Indexing failed")) + indexed = await self._service.index(documents[0], connector_doc, llm) - indexed.content_needs_reindexing = False - await session.commit() + if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY): + raise RuntimeError(indexed.status.get("reason", "Indexing failed")) + + indexed.content_needs_reindexing = False + await self._session.commit() + + async def reindex(self, document: Document, llm) -> None: + """Re-index an existing document after its source_markdown has been updated.""" + if not document.source_markdown: + raise RuntimeError("Document has no source_markdown to reindex") + + metadata = document.document_metadata or {} + + connector_doc = ConnectorDocument( + title=document.title, + source_markdown=document.source_markdown, + unique_id=document.title, + document_type=document.document_type, + search_space_id=document.search_space_id, + created_by_id=str(document.created_by_id), + connector_id=document.connector_id, + should_summarize=True, + should_use_code_chunker=False, + fallback_summary=document.source_markdown[:4000], + metadata=metadata, + ) + + document.content_hash = compute_content_hash(connector_doc) + + indexed = await self._service.index(document, connector_doc, llm) + + if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY): + raise RuntimeError(indexed.status.get("reason", "Reindexing failed")) + + indexed.content_needs_reindexing = False + await self._session.commit() diff --git a/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py index a2a0d635d..fd099684d 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py @@ -2,7 +2,7 @@ import logging -from sqlalchemy import delete, select +from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from sqlalchemy.orm import selectinload @@ -11,12 +11,9 @@ from sqlalchemy.pool import NullPool from app.celery_app import celery_app from app.config import config from app.db import Document +from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.utils.document_converters import ( - create_document_chunks, - generate_document_summary, -) logger = logging.getLogger(__name__) @@ -54,7 +51,6 @@ def reindex_document_task(self, document_id: int, user_id: str): async def _reindex_document(document_id: int, user_id: str): """Async function to reindex a document.""" async with get_celery_session_maker()() as session: - # First, get the document to get search_space_id for logging result = await session.execute( select(Document) .options(selectinload(Document.chunks)) @@ -66,10 +62,8 @@ async def _reindex_document(document_id: int, user_id: str): logger.error(f"Document {document_id} not found") return - # Initialize task logger task_logger = TaskLoggingService(session, document.search_space_id) - # Log task start log_entry = await task_logger.log_task_start( task_name="document_reindex", source="editor", @@ -83,10 +77,7 @@ async def _reindex_document(document_id: int, user_id: str): ) try: - # Read markdown directly from source_markdown - markdown_content = document.source_markdown - - if not markdown_content: + if not document.source_markdown: await task_logger.log_task_failure( log_entry, f"Document {document_id} has no source_markdown to reindex", @@ -97,51 +88,17 @@ async def _reindex_document(document_id: int, user_id: str): logger.info(f"Reindexing document {document_id} ({document.title})") - # 1. Delete old chunks explicitly - from app.db import Chunk - - await session.execute(delete(Chunk).where(Chunk.document_id == document_id)) - await session.flush() # Ensure old chunks are deleted - - # 2. Create new chunks from source_markdown - new_chunks = await create_document_chunks(markdown_content) - - # 3. Add new chunks to session - for chunk in new_chunks: - chunk.document_id = document_id - session.add(chunk) - - logger.info(f"Created {len(new_chunks)} chunks for document {document_id}") - - # 4. Regenerate summary user_llm = await get_user_long_context_llm( session, user_id, document.search_space_id ) - document_metadata = { - "title": document.title, - "document_type": document.document_type.value, - } + adapter = UploadDocumentAdapter(session) + await adapter.reindex(document=document, llm=user_llm) - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - - # 5. Update document - document.content = summary_content - document.embedding = summary_embedding - document.content_needs_reindexing = False - - await session.commit() - - # Log success await task_logger.log_task_success( log_entry, f"Successfully reindexed document: {document.title}", - { - "chunks_created": len(new_chunks), - "document_id": document_id, - }, + {"document_id": document_id}, ) logger.info(f"Successfully reindexed document {document_id}") diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index b77777e06..5e97951bd 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -18,7 +18,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config as app_config from app.db import Document, DocumentStatus, DocumentType, Log, Notification -from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file +from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter from app.services.llm_service import get_user_long_context_llm from app.services.notification_service import NotificationService from app.services.task_logging_service import TaskLoggingService @@ -1871,13 +1871,13 @@ async def process_file_in_background_with_document( user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - await index_uploaded_file( + adapter = UploadDocumentAdapter(session) + await adapter.index( markdown_content=markdown_content, filename=filename, etl_service=etl_service, search_space_id=search_space_id, user_id=user_id, - session=session, llm=user_llm, should_summarize=should_summarize, ) diff --git a/surfsense_backend/tests/integration/indexing_pipeline/adapters/test_file_upload_adapter.py b/surfsense_backend/tests/integration/indexing_pipeline/adapters/test_file_upload_adapter.py index 193e4bd80..068adb3b4 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/adapters/test_file_upload_adapter.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/adapters/test_file_upload_adapter.py @@ -2,7 +2,7 @@ import pytest from sqlalchemy import select from app.db import Chunk, Document, DocumentStatus -from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file +from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter pytestmark = pytest.mark.integration @@ -12,13 +12,13 @@ pytestmark = pytest.mark.integration ) async def test_sets_status_ready(db_session, db_search_space, db_user, mocker): """Document status is READY after successful indexing.""" - await index_uploaded_file( + adapter = UploadDocumentAdapter(db_session) + await adapter.index( markdown_content="## Hello\n\nSome content.", filename="test.pdf", etl_service="UNSTRUCTURED", search_space_id=db_search_space.id, user_id=str(db_user.id), - session=db_session, llm=mocker.Mock(), ) @@ -35,13 +35,13 @@ async def test_sets_status_ready(db_session, db_search_space, db_user, mocker): ) async def test_content_is_summary(db_session, db_search_space, db_user, mocker): """Document content is set to the LLM-generated summary.""" - await index_uploaded_file( + adapter = UploadDocumentAdapter(db_session) + await adapter.index( markdown_content="## Hello\n\nSome content.", filename="test.pdf", etl_service="UNSTRUCTURED", search_space_id=db_search_space.id, user_id=str(db_user.id), - session=db_session, llm=mocker.Mock(), ) @@ -58,13 +58,13 @@ async def test_content_is_summary(db_session, db_search_space, db_user, mocker): ) async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker): """Chunks derived from the source markdown are persisted in the DB.""" - await index_uploaded_file( + adapter = UploadDocumentAdapter(db_session) + await adapter.index( markdown_content="## Hello\n\nSome content.", filename="test.pdf", etl_service="UNSTRUCTURED", search_space_id=db_search_space.id, user_id=str(db_user.id), - session=db_session, llm=mocker.Mock(), ) @@ -87,13 +87,150 @@ async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker ) async def test_raises_on_indexing_failure(db_session, db_search_space, db_user, mocker): """RuntimeError is raised when the indexing step fails so the caller can fire a failure notification.""" + adapter = UploadDocumentAdapter(db_session) with pytest.raises(RuntimeError): - await index_uploaded_file( + await adapter.index( markdown_content="## Hello\n\nSome content.", filename="test.pdf", etl_service="UNSTRUCTURED", search_space_id=db_search_space.id, user_id=str(db_user.id), - session=db_session, llm=mocker.Mock(), ) + + +# --------------------------------------------------------------------------- +# reindex() tests +# --------------------------------------------------------------------------- + + +@pytest.mark.usefixtures( + "patched_summarize", "patched_embed_text", "patched_chunk_text" +) +async def test_reindex_sets_status_ready(db_session, db_search_space, db_user, mocker): + """Document status is READY after successful reindexing.""" + adapter = UploadDocumentAdapter(db_session) + await adapter.index( + markdown_content="## Original\n\nOriginal content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + llm=mocker.Mock(), + ) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + document = result.scalars().first() + + document.source_markdown = "## Edited\n\nNew content after user edit." + await db_session.flush() + + await adapter.reindex(document=document, llm=mocker.Mock()) + + await db_session.refresh(document) + assert DocumentStatus.is_state(document.status, DocumentStatus.READY) + + +@pytest.mark.usefixtures( + "patched_summarize", "patched_embed_text", "patched_chunk_text" +) +async def test_reindex_replaces_chunks(db_session, db_search_space, db_user, mocker): + """Reindexing replaces old chunks rather than appending new ones.""" + adapter = UploadDocumentAdapter(db_session) + await adapter.index( + markdown_content="## Original\n\nOriginal content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + llm=mocker.Mock(), + ) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + document = result.scalars().first() + document_id = document.id + + document.source_markdown = "## Edited\n\nNew content after user edit." + await db_session.flush() + + await adapter.reindex(document=document, llm=mocker.Mock()) + + chunks_result = await db_session.execute( + select(Chunk).filter(Chunk.document_id == document_id) + ) + chunks = chunks_result.scalars().all() + + assert len(chunks) == 1 + + +@pytest.mark.usefixtures( + "patched_summarize", "patched_embed_text", "patched_chunk_text" +) +async def test_reindex_clears_reindexing_flag( + db_session, db_search_space, db_user, mocker +): + """After successful reindex, content_needs_reindexing is False.""" + adapter = UploadDocumentAdapter(db_session) + await adapter.index( + markdown_content="## Original\n\nOriginal content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + llm=mocker.Mock(), + ) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + document = result.scalars().first() + + document.source_markdown = "## Edited\n\nNew content after user edit." + document.content_needs_reindexing = True + await db_session.flush() + + await adapter.reindex(document=document, llm=mocker.Mock()) + + await db_session.refresh(document) + assert document.content_needs_reindexing is False + + +@pytest.mark.usefixtures("patched_embed_text", "patched_chunk_text") +async def test_reindex_raises_on_failure( + db_session, db_search_space, db_user, mocker +): + """RuntimeError is raised when reindexing fails so the caller can handle it.""" + mocker.patch( + "app.indexing_pipeline.indexing_pipeline_service.summarize_document", + return_value="Mocked summary.", + ) + + adapter = UploadDocumentAdapter(db_session) + await adapter.index( + markdown_content="## Original\n\nOriginal content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + llm=mocker.Mock(), + ) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + document = result.scalars().first() + + document.source_markdown = "## Edited\n\nNew content after user edit." + await db_session.flush() + + mocker.patch( + "app.indexing_pipeline.indexing_pipeline_service.summarize_document", + side_effect=RuntimeError("LLM unavailable"), + ) + + with pytest.raises(RuntimeError): + await adapter.reindex(document=document, llm=mocker.Mock())