From e419702ebd56e5d756b14d35f80c5e1ccc25ad79 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 30 Nov 2025 04:15:38 +0530 Subject: [PATCH] fix: run ruff formatter to fix code quality --- .../38_add_blocknote_fields_to_documents.py | 10 +++- surfsense_backend/app/routes/editor_routes.py | 34 +++++------ .../celery_tasks/blocknote_migration_tasks.py | 60 +++++++++++-------- .../celery_tasks/document_reindex_tasks.py | 39 ++++++------ 4 files changed, 77 insertions(+), 66 deletions(-) diff --git a/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py b/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py index 474a96d23..d575f53ad 100644 --- a/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py +++ b/surfsense_backend/alembic/versions/38_add_blocknote_fields_to_documents.py @@ -48,15 +48,19 @@ def upgrade() -> None: from app.tasks.celery_tasks.blocknote_migration_tasks import ( populate_blocknote_for_documents_task, ) - + # Queue the task to run asynchronously populate_blocknote_for_documents_task.apply_async() - print("✓ Queued Celery task to populate blocknote_document for existing documents") + print( + "✓ Queued Celery task to populate blocknote_document for existing documents" + ) except Exception as e: # If Celery is not available or task queueing fails, log but don't fail the migration print(f"⚠ Warning: Could not queue blocknote population task: {e}") print(" You can manually trigger it later with:") - print(" celery -A app.celery_app call app.tasks.celery_tasks.blocknote_migration_tasks.populate_blocknote_for_documents_task") + print( + " celery -A app.celery_app call app.tasks.celery_tasks.blocknote_migration_tasks.populate_blocknote_for_documents_task" + ) def downgrade() -> None: diff --git a/surfsense_backend/app/routes/editor_routes.py b/surfsense_backend/app/routes/editor_routes.py index 8d0af667c..5e2363836 100644 --- a/surfsense_backend/app/routes/editor_routes.py +++ b/surfsense_backend/app/routes/editor_routes.py @@ -33,7 +33,7 @@ async def get_editor_content( attempts to generate it from chunks (lazy migration). """ from sqlalchemy.orm import selectinload - + result = await session.execute( select(Document) .options(selectinload(Document.chunks)) @@ -58,39 +58,39 @@ async def get_editor_content( # Lazy migration: Try to generate blocknote_document from chunks from app.utils.blocknote_converter import convert_markdown_to_blocknote - + chunks = sorted(document.chunks, key=lambda c: c.id) - + if not chunks: raise HTTPException( status_code=400, detail="This document has no chunks and cannot be edited. Please re-upload to enable editing.", ) - + # Reconstruct markdown from chunks markdown_content = "\n\n".join(chunk.content for chunk in chunks) - + if not markdown_content.strip(): raise HTTPException( status_code=400, detail="This document has empty content and cannot be edited.", ) - + # Convert to BlockNote blocknote_json = await convert_markdown_to_blocknote(markdown_content) - + if not blocknote_json: raise HTTPException( status_code=500, detail="Failed to convert document to editable format. Please try again later.", ) - + # Save the generated blocknote_document (lazy migration) document.blocknote_document = blocknote_json document.content_needs_reindexing = False document.last_edited_at = None await session.commit() - + return { "document_id": document.id, "title": document.title, @@ -111,7 +111,7 @@ async def save_document( Called when user clicks 'Save & Exit'. """ from app.tasks.celery_tasks.document_reindex_tasks import reindex_document_task - + # Verify ownership result = await session.execute( select(Document) @@ -119,27 +119,27 @@ async def save_document( .filter(Document.id == document_id, SearchSpace.user_id == user.id) ) document = result.scalars().first() - + if not document: raise HTTPException(status_code=404, detail="Document not found") - + blocknote_document = data.get("blocknote_document") if not blocknote_document: raise HTTPException(status_code=400, detail="blocknote_document is required") - + # Save BlockNote document document.blocknote_document = blocknote_document document.last_edited_at = datetime.now(UTC) document.content_needs_reindexing = True - + await session.commit() - + # Queue reindex task reindex_document_task.delay(document_id, str(user.id)) - + return { "status": "saved", "document_id": document_id, "message": "Document saved and will be reindexed in the background", - "last_edited_at": document.last_edited_at.isoformat() + "last_edited_at": document.last_edited_at.isoformat(), } diff --git a/surfsense_backend/app/tasks/celery_tasks/blocknote_migration_tasks.py b/surfsense_backend/app/tasks/celery_tasks/blocknote_migration_tasks.py index abac51a40..f9b7789b3 100644 --- a/surfsense_backend/app/tasks/celery_tasks/blocknote_migration_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/blocknote_migration_tasks.py @@ -36,7 +36,7 @@ def populate_blocknote_for_documents_task( ): """ Celery task to populate blocknote_document for existing documents. - + Args: document_ids: Optional list of specific document IDs to process. If None, processes all documents with blocknote_document IS NULL. @@ -60,7 +60,7 @@ async def _populate_blocknote_for_documents( ): """ Async function to populate blocknote_document for documents. - + Args: document_ids: Optional list of specific document IDs to process batch_size: Number of documents to process per batch @@ -69,75 +69,83 @@ async def _populate_blocknote_for_documents( try: # Build query for documents that need blocknote_document populated query = select(Document).where(Document.blocknote_document.is_(None)) - + # If specific document IDs provided, filter by them if document_ids: query = query.where(Document.id.in_(document_ids)) - + # Load chunks relationship to avoid N+1 queries query = query.options(selectinload(Document.chunks)) - + # Execute query result = await session.execute(query) documents = result.scalars().all() - + total_documents = len(documents) logger.info(f"Found {total_documents} documents to process") - + if total_documents == 0: logger.info("No documents to process") return - + # Process documents in batches processed = 0 failed = 0 - + for i in range(0, total_documents, batch_size): batch = documents[i : i + batch_size] - logger.info(f"Processing batch {i // batch_size + 1}: documents {i+1}-{min(i+batch_size, total_documents)}") - + logger.info( + f"Processing batch {i // batch_size + 1}: documents {i + 1}-{min(i + batch_size, total_documents)}" + ) + for document in batch: try: # Use preloaded chunks from selectinload - no need to query again chunks = sorted(document.chunks, key=lambda c: c.id) - + if not chunks: logger.warning( f"Document {document.id} ({document.title}) has no chunks, skipping" ) failed += 1 continue - + # Reconstruct markdown by concatenating chunk contents - markdown_content = "\n\n".join(chunk.content for chunk in chunks) - + markdown_content = "\n\n".join( + chunk.content for chunk in chunks + ) + if not markdown_content or not markdown_content.strip(): logger.warning( f"Document {document.id} ({document.title}) has empty markdown content, skipping" ) failed += 1 continue - + # Convert markdown to BlockNote JSON - blocknote_json = await convert_markdown_to_blocknote(markdown_content) - + blocknote_json = await convert_markdown_to_blocknote( + markdown_content + ) + if not blocknote_json: logger.warning( f"Failed to convert markdown to BlockNote for document {document.id} ({document.title})" ) failed += 1 continue - + # Update document with blocknote_document (other fields already have correct defaults) document.blocknote_document = blocknote_json - + processed += 1 - + # Commit every batch_size documents to avoid long transactions if processed % batch_size == 0: await session.commit() - logger.info(f"Committed batch: {processed} documents processed so far") - + logger.info( + f"Committed batch: {processed} documents processed so far" + ) + except Exception as e: logger.error( f"Error processing document {document.id} ({document.title}): {e}", @@ -146,15 +154,15 @@ async def _populate_blocknote_for_documents( failed += 1 # Continue with next document instead of failing entire batch continue - + # Commit remaining changes in the batch await session.commit() logger.info(f"Completed batch {i // batch_size + 1}") - + logger.info( f"Migration complete: {processed} documents processed, {failed} failed" ) - + except Exception as e: await session.rollback() logger.error(f"Error in blocknote migration task: {e}", exc_info=True) diff --git a/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py index 93c33ce49..e969cc806 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py @@ -35,7 +35,7 @@ def get_celery_session_maker(): def reindex_document_task(self, document_id: int, user_id: str): """ Celery task to reindex a document after editing. - + Args: document_id: ID of document to reindex user_id: ID of user who edited the document @@ -62,66 +62,65 @@ async def _reindex_document(document_id: int, user_id: str): .where(Document.id == document_id) ) document = result.scalars().first() - + if not document: logger.error(f"Document {document_id} not found") return - + if not document.blocknote_document: logger.warning(f"Document {document_id} has no BlockNote content") return - + logger.info(f"Reindexing document {document_id} ({document.title})") - + # 1. Convert BlockNote → Markdown markdown_content = await convert_blocknote_to_markdown( document.blocknote_document ) - + if not markdown_content: logger.error(f"Failed to convert document {document_id} to markdown") return - + # 2. Delete old chunks explicitly from app.db import Chunk - await session.execute( - delete(Chunk).where(Chunk.document_id == document_id) - ) + + await session.execute(delete(Chunk).where(Chunk.document_id == document_id)) await session.flush() # Ensure old chunks are deleted - + # 3. Create new chunks new_chunks = await create_document_chunks(markdown_content) - + # 4. Add new chunks to session for chunk in new_chunks: chunk.document_id = document_id session.add(chunk) - + logger.info(f"Created {len(new_chunks)} chunks for document {document_id}") - + # 5. Regenerate summary user_llm = await get_user_long_context_llm( session, user_id, document.search_space_id ) - + document_metadata = { "title": document.title, "document_type": document.document_type.value, } - + summary_content, summary_embedding = await generate_document_summary( markdown_content, user_llm, document_metadata ) - + # 6. Update document document.content = summary_content document.embedding = summary_embedding document.content_needs_reindexing = False - + await session.commit() - + logger.info(f"Successfully reindexed document {document_id}") - + except Exception as e: await session.rollback() logger.error(f"Error reindexing document {document_id}: {e}", exc_info=True)