diff --git a/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py b/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py index 742771322..474a96d23 100644 --- a/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py +++ b/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py @@ -20,8 +20,9 @@ depends_on: str | Sequence[str] | None = None def upgrade() -> None: - """Upgrade schema - Add BlockNote fields only.""" + """Upgrade schema - Add BlockNote fields and trigger population task.""" + # Add the columns op.add_column( "documents", sa.Column( @@ -42,6 +43,21 @@ def upgrade() -> None: sa.Column("last_edited_at", sa.TIMESTAMP(timezone=True), nullable=True), ) + # Trigger the Celery task to populate blocknote_document for existing documents + try: + from app.tasks.celery_tasks.blocknote_migration_tasks import ( + populate_blocknote_for_documents_task, + ) + + # Queue the task to run asynchronously + populate_blocknote_for_documents_task.apply_async() + print("✓ Queued Celery task to populate blocknote_document for existing documents") + except Exception as e: + # If Celery is not available or task queueing fails, log but don't fail the migration + print(f"⚠ Warning: Could not queue blocknote population task: {e}") + print(" You can manually trigger it later with:") + print(" celery -A app.celery_app call app.tasks.celery_tasks.blocknote_migration_tasks.populate_blocknote_for_documents_task") + def downgrade() -> None: """Downgrade schema - Remove BlockNote fields.""" diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 898ab9735..1e68a9c47 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -63,6 +63,7 @@ celery_app = Celery( "app.tasks.celery_tasks.podcast_tasks", "app.tasks.celery_tasks.connector_tasks", "app.tasks.celery_tasks.schedule_checker_task", + "app.tasks.celery_tasks.blocknote_migration_tasks", ], ) diff --git a/surfsense_backend/app/routes/editor_routes.py b/surfsense_backend/app/routes/editor_routes.py index a34c80db0..f52a2fef9 100644 --- a/surfsense_backend/app/routes/editor_routes.py +++ b/surfsense_backend/app/routes/editor_routes.py @@ -30,11 +30,13 @@ async def get_editor_content( Get document content for editing. Returns BlockNote JSON document. If blocknote_document is NULL, - attempts to convert from `content` - though this won't work well - for old documents that only have summaries. + attempts to generate it from chunks (lazy migration). """ + from sqlalchemy.orm import selectinload + result = await session.execute( select(Document) + .options(selectinload(Document.chunks)) .join(SearchSpace) .filter(Document.id == document_id, SearchSpace.user_id == user.id) ) @@ -54,12 +56,47 @@ async def get_editor_content( else None, } - # For old documents without blocknote_document, return error - # (Can't convert summary back to full document) - raise HTTPException( - status_code=400, - detail="This document was uploaded before editing was enabled. Please re-upload to enable editing.", - ) + # Lazy migration: Try to generate blocknote_document from chunks + from app.utils.blocknote_converter import convert_markdown_to_blocknote + + chunks = sorted(document.chunks, key=lambda c: c.id) + + if not chunks: + raise HTTPException( + status_code=400, + detail="This document has no chunks and cannot be edited. Please re-upload to enable editing.", + ) + + # Reconstruct markdown from chunks + markdown_content = "\n\n".join(chunk.content for chunk in chunks) + + if not markdown_content.strip(): + raise HTTPException( + status_code=400, + detail="This document has empty content and cannot be edited.", + ) + + # Convert to BlockNote + blocknote_json = await convert_markdown_to_blocknote(markdown_content) + + if not blocknote_json: + raise HTTPException( + status_code=500, + detail="Failed to convert document to editable format. Please try again later.", + ) + + # Save the generated blocknote_document (lazy migration) + document.blocknote_document = blocknote_json + document.content_needs_reindexing = False + document.last_edited_at = None + await session.commit() + + return { + "document_id": document.id, + "title": document.title, + "blocknote_document": blocknote_json, + "last_edited_at": None, + } @router.put("/documents/{document_id}/blocknote-content") diff --git a/surfsense_backend/app/tasks/celery_tasks/blocknote_migration_tasks.py b/surfsense_backend/app/tasks/celery_tasks/blocknote_migration_tasks.py new file mode 100644 index 000000000..abac51a40 --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/blocknote_migration_tasks.py @@ -0,0 +1,161 @@ +"""Celery tasks for populating blocknote_document for existing documents.""" + +import logging +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.orm import selectinload +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config +from app.db import Chunk, Document +from app.utils.blocknote_converter import convert_markdown_to_blocknote + +logger = logging.getLogger(__name__) + + +def get_celery_session_maker(): + """ + Create a new async session maker for Celery tasks. + This is necessary because Celery tasks run in a new event loop, + and the default session maker is bound to the main app's event loop. + """ + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False) + + +@celery_app.task(name="populate_blocknote_for_documents", bind=True) +def populate_blocknote_for_documents_task( + self, document_ids: list[int] | None = None, batch_size: int = 50 +): + """ + Celery task to populate blocknote_document for existing documents. + + Args: + document_ids: Optional list of specific document IDs to process. + If None, processes all documents with blocknote_document IS NULL. + batch_size: Number of documents to process in each batch (default: 50) + """ + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _populate_blocknote_for_documents(document_ids, batch_size) + ) + finally: + loop.close() + + +async def _populate_blocknote_for_documents( + document_ids: list[int] | None = None, batch_size: int = 50 +): + """ + Async function to populate blocknote_document for documents. + + Args: + document_ids: Optional list of specific document IDs to process + batch_size: Number of documents to process per batch + """ + async with get_celery_session_maker()() as session: + try: + # Build query for documents that need blocknote_document populated + query = select(Document).where(Document.blocknote_document.is_(None)) + + # If specific document IDs provided, filter by them + if document_ids: + query = query.where(Document.id.in_(document_ids)) + + # Load chunks relationship to avoid N+1 queries + query = query.options(selectinload(Document.chunks)) + + # Execute query + result = await session.execute(query) + documents = result.scalars().all() + + total_documents = len(documents) + logger.info(f"Found {total_documents} documents to process") + + if total_documents == 0: + logger.info("No documents to process") + return + + # Process documents in batches + processed = 0 + failed = 0 + + for i in range(0, total_documents, batch_size): + batch = documents[i : i + batch_size] + logger.info(f"Processing batch {i // batch_size + 1}: documents {i+1}-{min(i+batch_size, total_documents)}") + + for document in batch: + try: + # Use preloaded chunks from selectinload - no need to query again + chunks = sorted(document.chunks, key=lambda c: c.id) + + if not chunks: + logger.warning( + f"Document {document.id} ({document.title}) has no chunks, skipping" + ) + failed += 1 + continue + + # Reconstruct markdown by concatenating chunk contents + markdown_content = "\n\n".join(chunk.content for chunk in chunks) + + if not markdown_content or not markdown_content.strip(): + logger.warning( + f"Document {document.id} ({document.title}) has empty markdown content, skipping" + ) + failed += 1 + continue + + # Convert markdown to BlockNote JSON + blocknote_json = await convert_markdown_to_blocknote(markdown_content) + + if not blocknote_json: + logger.warning( + f"Failed to convert markdown to BlockNote for document {document.id} ({document.title})" + ) + failed += 1 + continue + + # Update document with blocknote_document (other fields already have correct defaults) + document.blocknote_document = blocknote_json + + processed += 1 + + # Commit every batch_size documents to avoid long transactions + if processed % batch_size == 0: + await session.commit() + logger.info(f"Committed batch: {processed} documents processed so far") + + except Exception as e: + logger.error( + f"Error processing document {document.id} ({document.title}): {e}", + exc_info=True, + ) + failed += 1 + # Continue with next document instead of failing entire batch + continue + + # Commit remaining changes in the batch + await session.commit() + logger.info(f"Completed batch {i // batch_size + 1}") + + logger.info( + f"Migration complete: {processed} documents processed, {failed} failed" + ) + + except Exception as e: + await session.rollback() + logger.error(f"Error in blocknote migration task: {e}", exc_info=True) + raise diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 95cf1c462..3b026b93e 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -396,7 +396,9 @@ async def add_received_file_document_using_docling( "ETL_SERVICE": "DOCLING", } existing_document.chunks = chunks - existing_document.blocknote_document = blocknote_json + existing_document.blocknote_document = None + existing_document.content_needs_reindexing = False + existing_document.last_edited_at = None await session.commit() await session.refresh(existing_document) @@ -416,7 +418,9 @@ async def add_received_file_document_using_docling( chunks=chunks, content_hash=content_hash, unique_identifier_hash=unique_identifier_hash, - blocknote_document=blocknote_json, + blocknote_document=None, + content_needs_reindexing=False, + last_edited_at=None, ) session.add(document) diff --git a/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx index 5a0dae2d6..ce26afc38 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/editor/[documentId]/page.tsx @@ -96,7 +96,7 @@ export default function EditorPage() { } }, [editorContent, document]); - // TODO: Auto-save every 30 seconds - DIRECT CALL TO FASTAPI + // TODO: Maybe add Auto-save every 30 seconds - DIRECT CALL TO FASTAPI // Save and exit - DIRECT CALL TO FASTAPI const handleSave = async () => {