diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 041053a04..e6b5ae84d 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -18,6 +18,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config as app_config from app.db import Document, DocumentStatus, DocumentType, Log, Notification +from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file from app.services.llm_service import get_user_long_context_llm from app.services.notification_service import NotificationService from app.services.task_logging_service import TaskLoggingService @@ -33,7 +34,6 @@ from .base import ( check_document_by_unique_identifier, check_duplicate_document, get_current_timestamp, - safe_set_chunks, ) from .markdown_processor import add_received_markdown_file_document @@ -1863,7 +1863,7 @@ async def process_file_in_background_with_document( ) return None - # ===== STEP 3: Generate embeddings and chunks ===== + # ===== STEP 3+4: Index via pipeline ===== if notification: await NotificationService.document_processing.notify_processing_progress( session, notification, stage="chunking" @@ -1871,57 +1871,22 @@ async def process_file_in_background_with_document( user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - if user_llm: - document_metadata = { - "file_name": filename, - "etl_service": etl_service, - "document_type": "File Document", - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, user_llm, document_metadata - ) - else: - # Fallback: use truncated content as summary - summary_content = markdown_content[:4000] - from app.config import config - - summary_embedding = config.embedding_model_instance.embed(summary_content) - - chunks = await create_document_chunks(markdown_content) - - # ===== STEP 4: Update document to READY ===== - from sqlalchemy.orm.attributes import flag_modified - - document.title = filename - document.content = summary_content - document.content_hash = content_hash - document.embedding = summary_embedding - document.document_metadata = { - "FILE_NAME": filename, - "ETL_SERVICE": etl_service or "UNKNOWN", - **(document.document_metadata or {}), - } - flag_modified(document, "document_metadata") - - # Use safe_set_chunks to avoid async issues - safe_set_chunks(document, chunks) - - document.source_markdown = markdown_content - document.content_needs_reindexing = False - document.updated_at = get_current_timestamp() - document.status = DocumentStatus.ready() # Shows checkmark in UI - - await session.commit() - await session.refresh(document) + await index_uploaded_file( + markdown_content=markdown_content, + filename=filename, + etl_service=etl_service, + search_space_id=search_space_id, + user_id=user_id, + session=session, + llm=user_llm, + ) await task_logger.log_task_success( log_entry, f"Successfully processed file: {filename}", { "document_id": document.id, - "content_hash": content_hash, "file_type": etl_service, - "chunks_count": len(chunks), }, ) diff --git a/surfsense_backend/tests/integration/indexing_pipeline/adapters/__init__.py b/surfsense_backend/tests/integration/indexing_pipeline/adapters/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/tests/integration/indexing_pipeline/adapters/test_file_upload_adapter.py b/surfsense_backend/tests/integration/indexing_pipeline/adapters/test_file_upload_adapter.py new file mode 100644 index 000000000..c471110fc --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/adapters/test_file_upload_adapter.py @@ -0,0 +1,87 @@ +import pytest +from sqlalchemy import select + +from app.db import Chunk, Document, DocumentStatus +from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file + +pytestmark = pytest.mark.integration + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_sets_status_ready(db_session, db_search_space, db_user, mocker): + await index_uploaded_file( + markdown_content="## Hello\n\nSome content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + session=db_session, + llm=mocker.Mock(), + ) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + document = result.scalars().first() + + assert DocumentStatus.is_state(document.status, DocumentStatus.READY) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_content_is_summary(db_session, db_search_space, db_user, mocker): + await index_uploaded_file( + markdown_content="## Hello\n\nSome content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + session=db_session, + llm=mocker.Mock(), + ) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + document = result.scalars().first() + + assert document.content == "Mocked summary." + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text") +async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker): + await index_uploaded_file( + markdown_content="## Hello\n\nSome content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + session=db_session, + llm=mocker.Mock(), + ) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + document = result.scalars().first() + + chunks_result = await db_session.execute( + select(Chunk).filter(Chunk.document_id == document.id) + ) + chunks = chunks_result.scalars().all() + + assert len(chunks) == 1 + assert chunks[0].content == "Test chunk content." + + +@pytest.mark.usefixtures("patched_summarize_raises", "patched_embed_text", "patched_chunk_text") +async def test_raises_on_indexing_failure(db_session, db_search_space, db_user, mocker): + with pytest.raises(RuntimeError): + await index_uploaded_file( + markdown_content="## Hello\n\nSome content.", + filename="test.pdf", + etl_service="UNSTRUCTURED", + search_space_id=db_search_space.id, + user_id=str(db_user.id), + session=db_session, + llm=mocker.Mock(), + )