feat: update Celery worker configuration and improve connector deletion process

- Added support for multiple queues in Celery worker configuration.
- Modified connector deletion to handle documents inline instead of using a background task.
- Updated response messages for document creation and connector deletion to reflect new processing status.
- Removed the obsolete connector deletion Celery task file.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-02-16 00:07:23 -08:00
parent 81c70befcf
commit 1849b451a5
8 changed files with 66 additions and 300 deletions

1
.vscode/launch.json vendored
View file

@ -71,6 +71,7 @@
"app.celery_app:celery_app",
"worker",
"--loglevel=info",
"--queues=surfsense,surfsense.connectors,surfsense-dev,surfsense-dev.connectors",
"--pool=solo"
],
"console": "integratedTerminal",

View file

@ -57,7 +57,7 @@ environment=PYTHONPATH="/app/backend",UVICORN_LOOP="asyncio",UNSTRUCTURED_HAS_PA
# Celery Worker
[program:celery-worker]
command=celery -A app.celery_app worker --loglevel=info --concurrency=2 --pool=solo
command=celery -A app.celery_app worker --loglevel=info --concurrency=2 --pool=solo --queues=surfsense,surfsense.connectors
directory=/app/backend
autostart=true
autorestart=true

View file

@ -82,7 +82,6 @@ celery_app = Celery(
"app.tasks.celery_tasks.blocknote_migration_tasks",
"app.tasks.celery_tasks.document_reindex_tasks",
"app.tasks.celery_tasks.stale_notification_cleanup_task",
"app.tasks.celery_tasks.connector_deletion_task",
],
)
@ -143,7 +142,6 @@ celery_app.conf.update(
"index_bookstack_pages": {"queue": CONNECTORS_QUEUE},
"index_obsidian_vault": {"queue": CONNECTORS_QUEUE},
"index_composio_connector": {"queue": CONNECTORS_QUEUE},
"delete_connector_with_documents": {"queue": CONNECTORS_QUEUE},
# Everything else (document processing, podcasts, reindexing,
# schedule checker, cleanup) stays on the default fast queue.
},

View file

@ -97,7 +97,10 @@ async def create_documents(
raise HTTPException(status_code=400, detail="Invalid document type")
await session.commit()
return {"message": "Documents processed successfully"}
return {
"message": "Documents queued for background processing",
"status": "queued",
}
except HTTPException:
raise
except Exception as e:

View file

@ -532,14 +532,16 @@ async def delete_search_source_connector(
"""
Delete a search source connector and all its associated documents.
The deletion runs in background via Celery task. User is notified
via the notification system when complete (no polling required).
The deletion happens inline (documents are deleted in batches,
then the connector record is removed).
Requires CONNECTORS_DELETE permission.
"""
from app.tasks.celery_tasks.connector_deletion_task import (
delete_connector_with_documents_task,
)
from sqlalchemy import delete as sa_delete, func
from app.db import Document
deletion_batch_size = 500
try:
# Get the connector first
@ -562,12 +564,10 @@ async def delete_search_source_connector(
"You don't have permission to delete this connector",
)
# Store connector info before we queue the deletion task
# Store connector info before deletion
connector_name = db_connector.name
connector_type = db_connector.connector_type.value
search_space_id = db_connector.search_space_id
# Delete any periodic schedule associated with this connector (lightweight, sync)
# Delete any periodic schedule associated with this connector
if db_connector.periodic_indexing_enabled:
success = delete_periodic_schedule(connector_id)
if not success:
@ -575,7 +575,7 @@ async def delete_search_source_connector(
f"Failed to delete periodic schedule for connector {connector_id}"
)
# For Composio connectors, delete the connected account in Composio (lightweight API call, sync)
# For Composio connectors, delete the connected account in Composio
composio_connector_types = [
SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
SearchSourceConnectorType.COMPOSIO_GMAIL_CONNECTOR,
@ -602,30 +602,58 @@ async def delete_search_source_connector(
f"for connector {connector_id}"
)
except Exception as composio_error:
# Log but don't fail the deletion - Composio account may already be deleted
logger.warning(
f"Error deleting Composio connected account {composio_connected_account_id}: {composio_error!s}"
)
# Queue background task to delete documents and connector
# This handles potentially large document counts without blocking the API
delete_connector_with_documents_task.delay(
connector_id=connector_id,
user_id=str(user.id),
search_space_id=search_space_id,
connector_name=connector_name,
connector_type=connector_type,
# Delete documents in batches (chunks are deleted via CASCADE)
total_deleted = 0
count_result = await session.execute(
select(func.count(Document.id)).where(Document.connector_id == connector_id)
)
total_docs = count_result.scalar() or 0
logger.info(
f"Queued deletion task for connector {connector_id} ({connector_name})"
f"Starting deletion of connector {connector_id} ({connector_name}). "
f"Documents to delete: {total_docs}"
)
while True:
result = await session.execute(
select(Document.id)
.where(Document.connector_id == connector_id)
.limit(deletion_batch_size)
)
doc_ids = [row[0] for row in result.fetchall()]
if not doc_ids:
break
await session.execute(sa_delete(Document).where(Document.id.in_(doc_ids)))
await session.commit()
total_deleted += len(doc_ids)
logger.info(
f"Deleted batch of {len(doc_ids)} documents. "
f"Progress: {total_deleted}/{total_docs}"
)
# Delete the connector record
await session.delete(db_connector)
await session.commit()
logger.info(
f"Connector {connector_id} ({connector_name}) deleted successfully. "
f"Total documents deleted: {total_deleted}"
)
doc_text = "document" if total_deleted == 1 else "documents"
return {
"message": "Connector deletion started. You will be notified when complete.",
"status": "queued",
"message": f"Connector '{connector_name}' deleted. {total_deleted} {doc_text} removed.",
"status": "completed",
"connector_id": connector_id,
"connector_name": connector_name,
"documents_deleted": total_deleted,
}
except HTTPException:
raise

View file

@ -1,269 +0,0 @@
"""Celery task for background connector deletion.
This task handles the deletion of all documents associated with a connector
in the background, then deletes the connector itself. User is notified via
the notification system when complete (no polling required).
Features:
- Batch deletion to handle large document counts
- Automatic retry on failure
- Progress tracking via notifications
- Handles both success and failure notifications
"""
import asyncio
import logging
from uuid import UUID
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.db import Document, Notification, SearchSourceConnector
logger = logging.getLogger(__name__)
# Batch size for document deletion
DELETION_BATCH_SIZE = 500
def _get_celery_session_maker():
"""Create async session maker for Celery tasks."""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False), engine
@celery_app.task(
bind=True,
name="delete_connector_with_documents",
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True,
)
def delete_connector_with_documents_task(
self,
connector_id: int,
user_id: str,
search_space_id: int,
connector_name: str,
connector_type: str,
):
"""
Background task to delete a connector and all its associated documents.
Creates a notification when complete (success or failure).
No polling required - user sees notification in UI.
Args:
connector_id: ID of the connector to delete
user_id: ID of the user who initiated the deletion
search_space_id: ID of the search space
connector_name: Name of the connector (for notification message)
connector_type: Type of the connector (for logging)
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_delete_connector_async(
connector_id=connector_id,
user_id=user_id,
search_space_id=search_space_id,
connector_name=connector_name,
connector_type=connector_type,
)
)
finally:
loop.close()
async def _delete_connector_async(
connector_id: int,
user_id: str,
search_space_id: int,
connector_name: str,
connector_type: str,
) -> dict:
"""
Async implementation of connector deletion.
Steps:
1. Count total documents to delete
2. Delete documents in batches (chunks cascade automatically)
3. Delete the connector record
4. Create success notification
On failure, creates failure notification and re-raises exception.
"""
session_maker, engine = _get_celery_session_maker()
total_deleted = 0
try:
async with session_maker() as session:
# Step 1: Count total documents for this connector
count_result = await session.execute(
select(func.count(Document.id)).where(
Document.connector_id == connector_id
)
)
total_docs = count_result.scalar() or 0
logger.info(
f"Starting deletion of connector {connector_id} ({connector_name}). "
f"Documents to delete: {total_docs}"
)
# Step 2: Delete documents in batches
while True:
# Get batch of document IDs
result = await session.execute(
select(Document.id)
.where(Document.connector_id == connector_id)
.limit(DELETION_BATCH_SIZE)
)
doc_ids = [row[0] for row in result.fetchall()]
if not doc_ids:
break
# Delete this batch (chunks are deleted via CASCADE)
await session.execute(delete(Document).where(Document.id.in_(doc_ids)))
await session.commit()
total_deleted += len(doc_ids)
logger.info(
f"Deleted batch of {len(doc_ids)} documents. "
f"Progress: {total_deleted}/{total_docs}"
)
# Step 3: Delete the connector record
result = await session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == connector_id
)
)
connector = result.scalar_one_or_none()
if connector:
await session.delete(connector)
logger.info(f"Deleted connector record: {connector_id}")
else:
logger.warning(
f"Connector {connector_id} not found - may have been already deleted"
)
# Step 4: Create success notification
doc_text = "document" if total_deleted == 1 else "documents"
notification = Notification(
user_id=UUID(user_id),
search_space_id=search_space_id,
type="connector_deletion",
title=f"{connector_name} removed",
message=f"Cleanup complete. {total_deleted} {doc_text} removed.",
notification_metadata={
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"documents_deleted": total_deleted,
"status": "completed",
},
)
session.add(notification)
await session.commit()
logger.info(
f"Connector {connector_id} ({connector_name}) deleted successfully. "
f"Total documents deleted: {total_deleted}"
)
return {
"status": "success",
"connector_id": connector_id,
"connector_name": connector_name,
"documents_deleted": total_deleted,
}
except Exception as e:
logger.error(
f"Failed to delete connector {connector_id} ({connector_name}): {e!s}",
exc_info=True,
)
# Create failure notification
try:
async with session_maker() as session:
notification = Notification(
user_id=UUID(user_id),
search_space_id=search_space_id,
type="connector_deletion",
title=f"Failed to Remove {connector_name}",
message="Something went wrong while removing this connector. Please try again.",
notification_metadata={
"connector_id": connector_id,
"connector_name": connector_name,
"connector_type": connector_type,
"documents_deleted": total_deleted,
"status": "failed",
"error": str(e),
},
)
session.add(notification)
await session.commit()
except Exception as notify_error:
logger.error(
f"Failed to create failure notification: {notify_error!s}",
exc_info=True,
)
# Re-raise to trigger Celery retry
raise
finally:
await engine.dispose()
async def delete_documents_by_connector_id(
session,
connector_id: int,
batch_size: int = DELETION_BATCH_SIZE,
) -> int:
"""
Delete all documents associated with a connector in batches.
This is a utility function that can be used independently of the Celery task
for synchronous deletion scenarios (e.g., small document counts).
Args:
session: AsyncSession instance
connector_id: ID of the connector
batch_size: Number of documents to delete per batch
Returns:
Total number of documents deleted
"""
total_deleted = 0
while True:
result = await session.execute(
select(Document.id)
.where(Document.connector_id == connector_id)
.limit(batch_size)
)
doc_ids = [row[0] for row in result.fetchall()]
if not doc_ids:
break
await session.execute(delete(Document).where(Document.id.in_(doc_ids)))
await session.commit()
total_deleted += len(doc_ids)
return total_deleted

View file

@ -278,8 +278,9 @@ In a new terminal window, start the Celery worker to handle background tasks:
# Make sure you're in the surfsense_backend directory
cd surfsense_backend
# Start Celery worker
uv run celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo
# Start Celery worker (consume both default and connectors queues)
DEFAULT_Q="${CELERY_TASK_DEFAULT_QUEUE:-surfsense}"
uv run celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo --queues="${DEFAULT_Q},${DEFAULT_Q}.connectors"
```
**If using pip/venv:**
@ -293,8 +294,9 @@ source .venv/bin/activate # Linux/macOS
# OR
.venv\Scripts\activate # Windows
# Start Celery worker
celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo
# Start Celery worker (consume both default and connectors queues)
DEFAULT_Q="${CELERY_TASK_DEFAULT_QUEUE:-surfsense}"
celery -A celery_worker.celery_app worker --loglevel=info --concurrency=1 --pool=solo --queues="${DEFAULT_Q},${DEFAULT_Q}.connectors"
```
**Optional: Start Flower for monitoring Celery tasks:**

View file

@ -133,7 +133,10 @@ export const updateConnectorResponse = searchSourceConnector;
export const deleteConnectorRequest = searchSourceConnector.pick({ id: true });
export const deleteConnectorResponse = z.object({
message: z.literal("Search source connector deleted successfully"),
message: z.string(),
status: z.string().optional(),
connector_id: z.number().optional(),
connector_name: z.string().optional(),
});
/**