From 4df40f8cea1eadffea71e7df33a17f3dac75ab1e Mon Sep 17 00:00:00 2001 From: "DESKTOP-RTLN3BA\\$punk" Date: Mon, 2 Feb 2026 13:17:12 -0800 Subject: [PATCH] feat: add CELERY_TASK_DEFAULT_QUEUE environment variable for task isolation --- Dockerfile.allinone | 1 + docker-compose.yml | 2 ++ surfsense_backend/.env.example | 6 ++++++ surfsense_backend/app/agents/new_chat/tools/podcast.py | 7 +++++-- surfsense_backend/app/celery_app.py | 4 ++++ .../app/routes/search_source_connectors_routes.py | 5 ++++- surfsense_backend/app/tasks/celery_tasks/document_tasks.py | 4 +++- surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py | 5 ++++- .../tasks/celery_tasks/stale_notification_cleanup_task.py | 5 ++++- 9 files changed, 33 insertions(+), 6 deletions(-) diff --git a/Dockerfile.allinone b/Dockerfile.allinone index 64e99a14d..7f9f18806 100644 --- a/Dockerfile.allinone +++ b/Dockerfile.allinone @@ -239,6 +239,7 @@ ENV POSTGRES_DB=surfsense ENV DATABASE_URL=postgresql+asyncpg://surfsense:surfsense@localhost:5432/surfsense ENV CELERY_BROKER_URL=redis://localhost:6379/0 ENV CELERY_RESULT_BACKEND=redis://localhost:6379/0 +ENV CELERY_TASK_DEFAULT_QUEUE=surfsense ENV PYTHONPATH=/app/backend ENV NEXT_FRONTEND_URL=http://localhost:3000 ENV AUTH_TYPE=LOCAL diff --git a/docker-compose.yml b/docker-compose.yml index e5989210f..8efc0b506 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,6 +53,8 @@ services: - DATABASE_URL=postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-surfsense} - CELERY_BROKER_URL=redis://redis:${REDIS_PORT:-6379}/0 - CELERY_RESULT_BACKEND=redis://redis:${REDIS_PORT:-6379}/0 + # Queue name isolation - prevents task collision if Redis is shared with other apps + - CELERY_TASK_DEFAULT_QUEUE=surfsense - PYTHONPATH=/app - UVICORN_LOOP=asyncio - UNSTRUCTURED_HAS_PATCHED_LOOP=1 diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 2b6a84967..2e10f4e36 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -3,6 +3,12 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense #Celery Config CELERY_BROKER_URL=redis://localhost:6379/0 CELERY_RESULT_BACKEND=redis://localhost:6379/0 +# Optional: isolate queues when sharing Redis with other apps +CELERY_TASK_DEFAULT_QUEUE=surfsense + +# Redis for app-level features (heartbeats, podcast markers) +# Defaults to CELERY_BROKER_URL when not set +REDIS_APP_URL=redis://localhost:6379/0 #Electric(for migrations only) ELECTRIC_DB_USER=electric diff --git a/surfsense_backend/app/agents/new_chat/tools/podcast.py b/surfsense_backend/app/agents/new_chat/tools/podcast.py index 1048ed881..e6412f4f2 100644 --- a/surfsense_backend/app/agents/new_chat/tools/podcast.py +++ b/surfsense_backend/app/agents/new_chat/tools/podcast.py @@ -21,8 +21,11 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import Podcast, PodcastStatus # Redis connection for tracking active podcast tasks -# Uses the same Redis instance as Celery -REDIS_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") +# Defaults to the Celery broker when REDIS_APP_URL is not set +REDIS_URL = os.getenv( + "REDIS_APP_URL", + os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), +) _redis_client: redis.Redis | None = None diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index b77f5698e..b690f6096 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -26,6 +26,7 @@ def init_worker(**kwargs): # Get Celery configuration from environment CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0") +CELERY_TASK_DEFAULT_QUEUE = os.getenv("CELERY_TASK_DEFAULT_QUEUE", "surfsense") # Get schedule checker interval from environment # Format: "" where unit is 'm' (minutes) or 'h' (hours) @@ -91,6 +92,9 @@ celery_app.conf.update( result_serializer="json", timezone="UTC", enable_utc=True, + task_default_queue=CELERY_TASK_DEFAULT_QUEUE, + task_default_exchange=CELERY_TASK_DEFAULT_QUEUE, + task_default_routing_key=CELERY_TASK_DEFAULT_QUEUE, # Task execution settings task_track_started=True, task_time_limit=28800, # 8 hour hard limit diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 3a937653d..3c7d66c3a 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -91,7 +91,10 @@ def get_heartbeat_redis_client() -> redis.Redis: """Get or create Redis client for heartbeat tracking.""" global _heartbeat_redis_client if _heartbeat_redis_client is None: - redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") + redis_url = os.getenv( + "REDIS_APP_URL", + os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), + ) _heartbeat_redis_client = redis.from_url(redis_url, decode_responses=True) return _heartbeat_redis_client diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 66bde3d43..6279510da 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -353,7 +353,9 @@ def process_file_upload_task( loop.run_until_complete( _process_file_upload(file_path, filename, search_space_id, user_id) ) - logger.info(f"[process_file_upload] Task completed successfully for: {filename}") + logger.info( + f"[process_file_upload] Task completed successfully for: {filename}" + ) except Exception as e: logger.error( f"[process_file_upload] Task failed for {filename}: {e}\n" diff --git a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py index 2ce8716e0..14df83508 100644 --- a/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/podcast_tasks.py @@ -51,7 +51,10 @@ def _clear_generating_podcast(search_space_id: int) -> None: import redis try: - redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") + redis_url = os.getenv( + "REDIS_APP_URL", + os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), + ) client = redis.from_url(redis_url, decode_responses=True) key = f"podcast:generating:{search_space_id}" client.delete(key) diff --git a/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py b/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py index e77b3225e..9041655b0 100644 --- a/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py @@ -36,7 +36,10 @@ def get_redis_client() -> redis.Redis: """Get or create Redis client for heartbeat checking.""" global _redis_client if _redis_client is None: - redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") + redis_url = os.getenv( + "REDIS_APP_URL", + os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), + ) _redis_client = redis.from_url(redis_url, decode_responses=True) return _redis_client