diff --git a/.vscode/launch.json b/.vscode/launch.json index ad7f04bd0..4988cc8f3 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -71,6 +71,7 @@ "app.celery_app:celery_app", "worker", "--loglevel=info", + "--queues=surfsense,surfsense.connectors,surfsense-dev,surfsense-dev.connectors", "--pool=solo" ], "console": "integratedTerminal", diff --git a/scripts/docker/supervisor-allinone.conf b/scripts/docker/supervisor-allinone.conf index eb2404b3c..1a21fcc04 100644 --- a/scripts/docker/supervisor-allinone.conf +++ b/scripts/docker/supervisor-allinone.conf @@ -57,7 +57,7 @@ environment=PYTHONPATH="/app/backend",UVICORN_LOOP="asyncio",UNSTRUCTURED_HAS_PA # Celery Worker [program:celery-worker] -command=celery -A app.celery_app worker --loglevel=info --concurrency=2 --pool=solo +command=celery -A app.celery_app worker --loglevel=info --concurrency=2 --pool=solo --queues=surfsense,surfsense.connectors directory=/app/backend autostart=true autorestart=true diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 477e5369f..a6637b1bd 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -82,7 +82,6 @@ celery_app = Celery( "app.tasks.celery_tasks.blocknote_migration_tasks", "app.tasks.celery_tasks.document_reindex_tasks", "app.tasks.celery_tasks.stale_notification_cleanup_task", - "app.tasks.celery_tasks.connector_deletion_task", ], ) @@ -143,7 +142,6 @@ celery_app.conf.update( "index_bookstack_pages": {"queue": CONNECTORS_QUEUE}, "index_obsidian_vault": {"queue": CONNECTORS_QUEUE}, "index_composio_connector": {"queue": CONNECTORS_QUEUE}, - "delete_connector_with_documents": {"queue": CONNECTORS_QUEUE}, # Everything else (document processing, podcasts, reindexing, # schedule checker, cleanup) stays on the default fast queue. }, diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index c8aeac0c9..ed4d6dea3 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -97,7 +97,10 @@ async def create_documents( raise HTTPException(status_code=400, detail="Invalid document type") await session.commit() - return {"message": "Documents processed successfully"} + return { + "message": "Documents queued for background processing", + "status": "queued", + } except HTTPException: raise except Exception as e: diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index ba6877376..b69238837 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -532,14 +532,16 @@ async def delete_search_source_connector( """ Delete a search source connector and all its associated documents. - The deletion runs in background via Celery task. User is notified - via the notification system when complete (no polling required). + The deletion happens inline (documents are deleted in batches, + then the connector record is removed). Requires CONNECTORS_DELETE permission. """ - from app.tasks.celery_tasks.connector_deletion_task import ( - delete_connector_with_documents_task, - ) + from sqlalchemy import delete as sa_delete, func + + from app.db import Document + + deletion_batch_size = 500 try: # Get the connector first @@ -562,12 +564,10 @@ async def delete_search_source_connector( "You don't have permission to delete this connector", ) - # Store connector info before we queue the deletion task + # Store connector info before deletion connector_name = db_connector.name - connector_type = db_connector.connector_type.value - search_space_id = db_connector.search_space_id - # Delete any periodic schedule associated with this connector (lightweight, sync) + # Delete any periodic schedule associated with this connector if db_connector.periodic_indexing_enabled: success = delete_periodic_schedule(connector_id) if not success: @@ -575,7 +575,7 @@ async def delete_search_source_connector( f"Failed to delete periodic schedule for connector {connector_id}" ) - # For Composio connectors, delete the connected account in Composio (lightweight API call, sync) + # For Composio connectors, delete the connected account in Composio composio_connector_types = [ SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR, @@ -602,30 +602,58 @@ async def delete_search_source_connector( f"for connector {connector_id}" ) except Exception as composio_error: - # Log but don't fail the deletion - Composio account may already be deleted logger.warning( f"Error deleting Composio connected account {composio_connected_account_id}: {composio_error!s}" ) - # Queue background task to delete documents and connector - # This handles potentially large document counts without blocking the API - delete_connector_with_documents_task.delay( - connector_id=connector_id, - user_id=str(user.id), - search_space_id=search_space_id, - connector_name=connector_name, - connector_type=connector_type, + # Delete documents in batches (chunks are deleted via CASCADE) + total_deleted = 0 + count_result = await session.execute( + select(func.count(Document.id)).where(Document.connector_id == connector_id) ) + total_docs = count_result.scalar() or 0 logger.info( - f"Queued deletion task for connector {connector_id} ({connector_name})" + f"Starting deletion of connector {connector_id} ({connector_name}). " + f"Documents to delete: {total_docs}" ) + while True: + result = await session.execute( + select(Document.id) + .where(Document.connector_id == connector_id) + .limit(deletion_batch_size) + ) + doc_ids = [row[0] for row in result.fetchall()] + + if not doc_ids: + break + + await session.execute(sa_delete(Document).where(Document.id.in_(doc_ids))) + await session.commit() + + total_deleted += len(doc_ids) + logger.info( + f"Deleted batch of {len(doc_ids)} documents. " + f"Progress: {total_deleted}/{total_docs}" + ) + + # Delete the connector record + await session.delete(db_connector) + await session.commit() + + logger.info( + f"Connector {connector_id} ({connector_name}) deleted successfully. " + f"Total documents deleted: {total_deleted}" + ) + + doc_text = "document" if total_deleted == 1 else "documents" return { - "message": "Connector deletion started. You will be notified when complete.", - "status": "queued", + "message": f"Connector '{connector_name}' deleted. {total_deleted} {doc_text} removed.", + "status": "completed", "connector_id": connector_id, "connector_name": connector_name, + "documents_deleted": total_deleted, } except HTTPException: raise diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py b/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py deleted file mode 100644 index e024aca29..000000000 --- a/surfsense_backend/app/tasks/celery_tasks/connector_deletion_task.py +++ /dev/null @@ -1,269 +0,0 @@ -"""Celery task for background connector deletion. - -This task handles the deletion of all documents associated with a connector -in the background, then deletes the connector itself. User is notified via -the notification system when complete (no polling required). - -Features: -- Batch deletion to handle large document counts -- Automatic retry on failure -- Progress tracking via notifications -- Handles both success and failure notifications -""" - -import asyncio -import logging -from uuid import UUID - -from sqlalchemy import delete, func, select -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine -from sqlalchemy.pool import NullPool - -from app.celery_app import celery_app -from app.config import config -from app.db import Document, Notification, SearchSourceConnector - -logger = logging.getLogger(__name__) - -# Batch size for document deletion -DELETION_BATCH_SIZE = 500 - - -def _get_celery_session_maker(): - """Create async session maker for Celery tasks.""" - engine = create_async_engine( - config.DATABASE_URL, - poolclass=NullPool, - echo=False, - ) - return async_sessionmaker(engine, expire_on_commit=False), engine - - -@celery_app.task( - bind=True, - name="delete_connector_with_documents", - max_retries=3, - default_retry_delay=60, - autoretry_for=(Exception,), - retry_backoff=True, -) -def delete_connector_with_documents_task( - self, - connector_id: int, - user_id: str, - search_space_id: int, - connector_name: str, - connector_type: str, -): - """ - Background task to delete a connector and all its associated documents. - - Creates a notification when complete (success or failure). - No polling required - user sees notification in UI. - - Args: - connector_id: ID of the connector to delete - user_id: ID of the user who initiated the deletion - search_space_id: ID of the search space - connector_name: Name of the connector (for notification message) - connector_type: Type of the connector (for logging) - """ - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - try: - return loop.run_until_complete( - _delete_connector_async( - connector_id=connector_id, - user_id=user_id, - search_space_id=search_space_id, - connector_name=connector_name, - connector_type=connector_type, - ) - ) - finally: - loop.close() - - -async def _delete_connector_async( - connector_id: int, - user_id: str, - search_space_id: int, - connector_name: str, - connector_type: str, -) -> dict: - """ - Async implementation of connector deletion. - - Steps: - 1. Count total documents to delete - 2. Delete documents in batches (chunks cascade automatically) - 3. Delete the connector record - 4. Create success notification - - On failure, creates failure notification and re-raises exception. - """ - session_maker, engine = _get_celery_session_maker() - total_deleted = 0 - - try: - async with session_maker() as session: - # Step 1: Count total documents for this connector - count_result = await session.execute( - select(func.count(Document.id)).where( - Document.connector_id == connector_id - ) - ) - total_docs = count_result.scalar() or 0 - - logger.info( - f"Starting deletion of connector {connector_id} ({connector_name}). " - f"Documents to delete: {total_docs}" - ) - - # Step 2: Delete documents in batches - while True: - # Get batch of document IDs - result = await session.execute( - select(Document.id) - .where(Document.connector_id == connector_id) - .limit(DELETION_BATCH_SIZE) - ) - doc_ids = [row[0] for row in result.fetchall()] - - if not doc_ids: - break - - # Delete this batch (chunks are deleted via CASCADE) - await session.execute(delete(Document).where(Document.id.in_(doc_ids))) - await session.commit() - - total_deleted += len(doc_ids) - logger.info( - f"Deleted batch of {len(doc_ids)} documents. " - f"Progress: {total_deleted}/{total_docs}" - ) - - # Step 3: Delete the connector record - result = await session.execute( - select(SearchSourceConnector).where( - SearchSourceConnector.id == connector_id - ) - ) - connector = result.scalar_one_or_none() - - if connector: - await session.delete(connector) - logger.info(f"Deleted connector record: {connector_id}") - else: - logger.warning( - f"Connector {connector_id} not found - may have been already deleted" - ) - - # Step 4: Create success notification - doc_text = "document" if total_deleted == 1 else "documents" - notification = Notification( - user_id=UUID(user_id), - search_space_id=search_space_id, - type="connector_deletion", - title=f"{connector_name} removed", - message=f"Cleanup complete. {total_deleted} {doc_text} removed.", - notification_metadata={ - "connector_id": connector_id, - "connector_name": connector_name, - "connector_type": connector_type, - "documents_deleted": total_deleted, - "status": "completed", - }, - ) - session.add(notification) - await session.commit() - - logger.info( - f"Connector {connector_id} ({connector_name}) deleted successfully. " - f"Total documents deleted: {total_deleted}" - ) - - return { - "status": "success", - "connector_id": connector_id, - "connector_name": connector_name, - "documents_deleted": total_deleted, - } - - except Exception as e: - logger.error( - f"Failed to delete connector {connector_id} ({connector_name}): {e!s}", - exc_info=True, - ) - - # Create failure notification - try: - async with session_maker() as session: - notification = Notification( - user_id=UUID(user_id), - search_space_id=search_space_id, - type="connector_deletion", - title=f"Failed to Remove {connector_name}", - message="Something went wrong while removing this connector. Please try again.", - notification_metadata={ - "connector_id": connector_id, - "connector_name": connector_name, - "connector_type": connector_type, - "documents_deleted": total_deleted, - "status": "failed", - "error": str(e), - }, - ) - session.add(notification) - await session.commit() - except Exception as notify_error: - logger.error( - f"Failed to create failure notification: {notify_error!s}", - exc_info=True, - ) - - # Re-raise to trigger Celery retry - raise - - finally: - await engine.dispose() - - -async def delete_documents_by_connector_id( - session, - connector_id: int, - batch_size: int = DELETION_BATCH_SIZE, -) -> int: - """ - Delete all documents associated with a connector in batches. - - This is a utility function that can be used independently of the Celery task - for synchronous deletion scenarios (e.g., small document counts). - - Args: - session: AsyncSession instance - connector_id: ID of the connector - batch_size: Number of documents to delete per batch - - Returns: - Total number of documents deleted - """ - total_deleted = 0 - - while True: - result = await session.execute( - select(Document.id) - .where(Document.connector_id == connector_id) - .limit(batch_size) - ) - doc_ids = [row[0] for row in result.fetchall()] - - if not doc_ids: - break - - await session.execute(delete(Document).where(Document.id.in_(doc_ids))) - await session.commit() - total_deleted += len(doc_ids) - - return total_deleted diff --git a/surfsense_web/content/docs/manual-installation.mdx b/surfsense_web/content/docs/manual-installation.mdx index b4da781ba..1d30a12ef 100644 --- a/surfsense_web/content/docs/manual-installation.mdx +++ b/surfsense_web/content/docs/manual-installation.mdx @@ -278,8 +278,9 @@ In a new terminal window, start the Celery worker to handle background tasks: # Make sure you're in the surfsense_backend directory cd surfsense_backend -# Start Celery worker -uv run celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo +# Start Celery worker (consume both default and connectors queues) +DEFAULT_Q="${CELERY_TASK_DEFAULT_QUEUE:-surfsense}" +uv run celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo --queues="${DEFAULT_Q},${DEFAULT_Q}.connectors" ``` **If using pip/venv:** @@ -293,8 +294,9 @@ source .venv/bin/activate # Linux/macOS # OR .venv\Scripts\activate # Windows -# Start Celery worker -celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo +# Start Celery worker (consume both default and connectors queues) +DEFAULT_Q="${CELERY_TASK_DEFAULT_QUEUE:-surfsense}" +celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo --queues="${DEFAULT_Q},${DEFAULT_Q}.connectors" ``` **Optional: Start Flower for monitoring Celery tasks:** diff --git a/surfsense_web/contracts/types/connector.types.ts b/surfsense_web/contracts/types/connector.types.ts index a7760745d..3a11d0399 100644 --- a/surfsense_web/contracts/types/connector.types.ts +++ b/surfsense_web/contracts/types/connector.types.ts @@ -133,7 +133,10 @@ export const updateConnectorResponse = searchSourceConnector; export const deleteConnectorRequest = searchSourceConnector.pick({ id: true }); export const deleteConnectorResponse = z.object({ - message: z.literal("Search source connector deleted successfully"), + message: z.string(), + status: z.string().optional(), + connector_id: z.number().optional(), + connector_name: z.string().optional(), }); /**