diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 1e68a9c47..f7bea8cc3 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -64,6 +64,7 @@ celery_app = Celery( "app.tasks.celery_tasks.connector_tasks", "app.tasks.celery_tasks.schedule_checker_task", "app.tasks.celery_tasks.blocknote_migration_tasks", + "app.tasks.celery_tasks.document_reindex_tasks", ], ) diff --git a/surfsense_backend/app/routes/editor_routes.py b/surfsense_backend/app/routes/editor_routes.py index f52a2fef9..8d0af667c 100644 --- a/surfsense_backend/app/routes/editor_routes.py +++ b/surfsense_backend/app/routes/editor_routes.py @@ -99,103 +99,47 @@ async def get_editor_content( } -@router.put("/documents/{document_id}/blocknote-content") -async def update_blocknote_content( +@router.post("/documents/{document_id}/save") +async def save_document( document_id: int, data: dict[str, Any], session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), ): """ - Auto-save BlockNote document during editing. - Only updates blocknote_document field, not content. + Save BlockNote document and trigger reindexing. + Called when user clicks 'Save & Exit'. """ + from app.tasks.celery_tasks.document_reindex_tasks import reindex_document_task + + # Verify ownership result = await session.execute( select(Document) .join(SearchSpace) .filter(Document.id == document_id, SearchSpace.user_id == user.id) ) document = result.scalars().first() - + if not document: raise HTTPException(status_code=404, detail="Document not found") - + blocknote_document = data.get("blocknote_document") if not blocknote_document: raise HTTPException(status_code=400, detail="blocknote_document is required") - - # Update only blocknote_document and last_edited_at + + # Save BlockNote document document.blocknote_document = blocknote_document document.last_edited_at = datetime.now(UTC) - + document.content_needs_reindexing = True + await session.commit() - await session.refresh(document) - - return {"status": "saved", "last_edited_at": document.last_edited_at.isoformat()} - - -# did not implement reindexing (for now) -# @router.post("/documents/{document_id}/finalize-edit") -# async def finalize_edit( -# document_id: int, -# session: AsyncSession = Depends(get_async_session), -# user: User = Depends(current_active_user), -# ): -# """ -# Finalize document editing: convert BlockNote to markdown, -# update content (summary), and trigger reindexing. -# """ -# result = await session.execute( -# select(Document) -# .join(SearchSpace) -# .filter(Document.id == document_id, SearchSpace.user_id == user.id) -# ) -# document = result.scalars().first() - -# if not document: -# raise HTTPException(status_code=404, detail="Document not found") - -# if not document.blocknote_document: -# raise HTTPException( -# status_code=400, -# detail="Document has no BlockNote content to finalize" -# ) - -# # 1. Convert BlockNote JSON → Markdown -# full_markdown = await convert_blocknote_to_markdown(document.blocknote_document) - -# if not full_markdown: -# raise HTTPException( -# status_code=500, -# detail="Failed to convert BlockNote document to markdown" -# ) - -# # 2. Generate new summary from full markdown -# from app.services.llm_service import get_user_long_context_llm -# from app.utils.document_converters import generate_document_summary - -# user_llm = await get_user_long_context_llm(session, str(user.id), document.search_space_id) -# if not user_llm: -# raise HTTPException( -# status_code=500, -# detail="No LLM configured for summary generation" -# ) - -# document_metadata = document.document_metadata or {} -# summary_content, summary_embedding = await generate_document_summary( -# full_markdown, user_llm, document_metadata -# ) - -# # 3. Update document fields -# document.content = summary_content -# document.embedding = summary_embedding -# document.content_needs_reindexing = True # Trigger chunk regeneration -# document.last_edited_at = datetime.now(UTC) - -# await session.commit() - -# return { -# "status": "finalized", -# "message": "Document saved. Summary and chunks will be regenerated in the background.", -# "content_needs_reindexing": True, -# } + + # Queue reindex task + reindex_document_task.delay(document_id, str(user.id)) + + return { + "status": "saved", + "document_id": document_id, + "message": "Document saved and will be reindexed in the background", + "last_edited_at": document.last_edited_at.isoformat() + } diff --git a/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py new file mode 100644 index 000000000..93c33ce49 --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py @@ -0,0 +1,128 @@ +"""Celery tasks for reindexing edited documents.""" + +import logging + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.pool import NullPool +from sqlalchemy import delete +from sqlalchemy.orm import selectinload + +from app.celery_app import celery_app +from app.config import config +from app.db import Document +from app.utils.blocknote_converter import convert_blocknote_to_markdown +from app.utils.document_converters import ( + create_document_chunks, + generate_document_summary, +) +from app.services.llm_service import get_user_long_context_llm + +logger = logging.getLogger(__name__) + + +def get_celery_session_maker(): + """Create async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False) + + +@celery_app.task(name="reindex_document", bind=True) +def reindex_document_task(self, document_id: int, user_id: str): + """ + Celery task to reindex a document after editing. + + Args: + document_id: ID of document to reindex + user_id: ID of user who edited the document + """ + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete(_reindex_document(document_id, user_id)) + finally: + loop.close() + + +async def _reindex_document(document_id: int, user_id: str): + """Async function to reindex a document.""" + async with get_celery_session_maker()() as session: + try: + # Get document + result = await session.execute( + select(Document) + .options(selectinload(Document.chunks)) # Eagerly load chunks + .where(Document.id == document_id) + ) + document = result.scalars().first() + + if not document: + logger.error(f"Document {document_id} not found") + return + + if not document.blocknote_document: + logger.warning(f"Document {document_id} has no BlockNote content") + return + + logger.info(f"Reindexing document {document_id} ({document.title})") + + # 1. Convert BlockNote → Markdown + markdown_content = await convert_blocknote_to_markdown( + document.blocknote_document + ) + + if not markdown_content: + logger.error(f"Failed to convert document {document_id} to markdown") + return + + # 2. Delete old chunks explicitly + from app.db import Chunk + await session.execute( + delete(Chunk).where(Chunk.document_id == document_id) + ) + await session.flush() # Ensure old chunks are deleted + + # 3. Create new chunks + new_chunks = await create_document_chunks(markdown_content) + + # 4. Add new chunks to session + for chunk in new_chunks: + chunk.document_id = document_id + session.add(chunk) + + logger.info(f"Created {len(new_chunks)} chunks for document {document_id}") + + # 5. Regenerate summary + user_llm = await get_user_long_context_llm( + session, user_id, document.search_space_id + ) + + document_metadata = { + "title": document.title, + "document_type": document.document_type.value, + } + + summary_content, summary_embedding = await generate_document_summary( + markdown_content, user_llm, document_metadata + ) + + # 6. Update document + document.content = summary_content + document.embedding = summary_embedding + document.content_needs_reindexing = False + + await session.commit() + + logger.info(f"Successfully reindexed document {document_id}") + + except Exception as e: + await session.rollback() + logger.error(f"Error reindexing document {document_id}: {e}", exc_info=True) + raise diff --git a/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx index ce26afc38..544834372 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx @@ -112,11 +112,11 @@ export default function EditorPage() { setSaving(true); try { - // Save blocknote_document to database (without finalizing/reindexing) + // Save blocknote_document and trigger reindexing in background const response = await fetch( - `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents/${documentId}/blocknote-content`, + `${process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL}/api/v1/documents/${documentId}/save`, { - method: "PUT", + method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}`, @@ -133,7 +133,7 @@ export default function EditorPage() { } setHasUnsavedChanges(false); - toast.success("Document saved successfully"); + toast.success("Document saved! Reindexing in background..."); // Small delay before redirect to show success message setTimeout(() => {