mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-04 21:32:39 +02:00
feat: add CELERY_TASK_DEFAULT_QUEUE environment variable for task isolation
This commit is contained in:
parent
a385f2b637
commit
4df40f8cea
9 changed files with 33 additions and 6 deletions
|
|
@ -239,6 +239,7 @@ ENV POSTGRES_DB=surfsense
|
||||||
ENV DATABASE_URL=postgresql+asyncpg://surfsense:surfsense@localhost:5432/surfsense
|
ENV DATABASE_URL=postgresql+asyncpg://surfsense:surfsense@localhost:5432/surfsense
|
||||||
ENV CELERY_BROKER_URL=redis://localhost:6379/0
|
ENV CELERY_BROKER_URL=redis://localhost:6379/0
|
||||||
ENV CELERY_RESULT_BACKEND=redis://localhost:6379/0
|
ENV CELERY_RESULT_BACKEND=redis://localhost:6379/0
|
||||||
|
ENV CELERY_TASK_DEFAULT_QUEUE=surfsense
|
||||||
ENV PYTHONPATH=/app/backend
|
ENV PYTHONPATH=/app/backend
|
||||||
ENV NEXT_FRONTEND_URL=http://localhost:3000
|
ENV NEXT_FRONTEND_URL=http://localhost:3000
|
||||||
ENV AUTH_TYPE=LOCAL
|
ENV AUTH_TYPE=LOCAL
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,8 @@ services:
|
||||||
- DATABASE_URL=postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-surfsense}
|
- DATABASE_URL=postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-surfsense}
|
||||||
- CELERY_BROKER_URL=redis://redis:${REDIS_PORT:-6379}/0
|
- CELERY_BROKER_URL=redis://redis:${REDIS_PORT:-6379}/0
|
||||||
- CELERY_RESULT_BACKEND=redis://redis:${REDIS_PORT:-6379}/0
|
- CELERY_RESULT_BACKEND=redis://redis:${REDIS_PORT:-6379}/0
|
||||||
|
# Queue name isolation - prevents task collision if Redis is shared with other apps
|
||||||
|
- CELERY_TASK_DEFAULT_QUEUE=surfsense
|
||||||
- PYTHONPATH=/app
|
- PYTHONPATH=/app
|
||||||
- UVICORN_LOOP=asyncio
|
- UVICORN_LOOP=asyncio
|
||||||
- UNSTRUCTURED_HAS_PATCHED_LOOP=1
|
- UNSTRUCTURED_HAS_PATCHED_LOOP=1
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,12 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense
|
||||||
#Celery Config
|
#Celery Config
|
||||||
CELERY_BROKER_URL=redis://localhost:6379/0
|
CELERY_BROKER_URL=redis://localhost:6379/0
|
||||||
CELERY_RESULT_BACKEND=redis://localhost:6379/0
|
CELERY_RESULT_BACKEND=redis://localhost:6379/0
|
||||||
|
# Optional: isolate queues when sharing Redis with other apps
|
||||||
|
CELERY_TASK_DEFAULT_QUEUE=surfsense
|
||||||
|
|
||||||
|
# Redis for app-level features (heartbeats, podcast markers)
|
||||||
|
# Defaults to CELERY_BROKER_URL when not set
|
||||||
|
REDIS_APP_URL=redis://localhost:6379/0
|
||||||
|
|
||||||
#Electric(for migrations only)
|
#Electric(for migrations only)
|
||||||
ELECTRIC_DB_USER=electric
|
ELECTRIC_DB_USER=electric
|
||||||
|
|
|
||||||
|
|
@ -21,8 +21,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
from app.db import Podcast, PodcastStatus
|
from app.db import Podcast, PodcastStatus
|
||||||
|
|
||||||
# Redis connection for tracking active podcast tasks
|
# Redis connection for tracking active podcast tasks
|
||||||
# Uses the same Redis instance as Celery
|
# Defaults to the Celery broker when REDIS_APP_URL is not set
|
||||||
REDIS_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
|
REDIS_URL = os.getenv(
|
||||||
|
"REDIS_APP_URL",
|
||||||
|
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
||||||
|
)
|
||||||
_redis_client: redis.Redis | None = None
|
_redis_client: redis.Redis | None = None
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ def init_worker(**kwargs):
|
||||||
# Get Celery configuration from environment
|
# Get Celery configuration from environment
|
||||||
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
|
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
|
||||||
CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
|
CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
|
||||||
|
CELERY_TASK_DEFAULT_QUEUE = os.getenv("CELERY_TASK_DEFAULT_QUEUE", "surfsense")
|
||||||
|
|
||||||
# Get schedule checker interval from environment
|
# Get schedule checker interval from environment
|
||||||
# Format: "<number><unit>" where unit is 'm' (minutes) or 'h' (hours)
|
# Format: "<number><unit>" where unit is 'm' (minutes) or 'h' (hours)
|
||||||
|
|
@ -91,6 +92,9 @@ celery_app.conf.update(
|
||||||
result_serializer="json",
|
result_serializer="json",
|
||||||
timezone="UTC",
|
timezone="UTC",
|
||||||
enable_utc=True,
|
enable_utc=True,
|
||||||
|
task_default_queue=CELERY_TASK_DEFAULT_QUEUE,
|
||||||
|
task_default_exchange=CELERY_TASK_DEFAULT_QUEUE,
|
||||||
|
task_default_routing_key=CELERY_TASK_DEFAULT_QUEUE,
|
||||||
# Task execution settings
|
# Task execution settings
|
||||||
task_track_started=True,
|
task_track_started=True,
|
||||||
task_time_limit=28800, # 8 hour hard limit
|
task_time_limit=28800, # 8 hour hard limit
|
||||||
|
|
|
||||||
|
|
@ -91,7 +91,10 @@ def get_heartbeat_redis_client() -> redis.Redis:
|
||||||
"""Get or create Redis client for heartbeat tracking."""
|
"""Get or create Redis client for heartbeat tracking."""
|
||||||
global _heartbeat_redis_client
|
global _heartbeat_redis_client
|
||||||
if _heartbeat_redis_client is None:
|
if _heartbeat_redis_client is None:
|
||||||
redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
|
redis_url = os.getenv(
|
||||||
|
"REDIS_APP_URL",
|
||||||
|
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
||||||
|
)
|
||||||
_heartbeat_redis_client = redis.from_url(redis_url, decode_responses=True)
|
_heartbeat_redis_client = redis.from_url(redis_url, decode_responses=True)
|
||||||
return _heartbeat_redis_client
|
return _heartbeat_redis_client
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -353,7 +353,9 @@ def process_file_upload_task(
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
_process_file_upload(file_path, filename, search_space_id, user_id)
|
_process_file_upload(file_path, filename, search_space_id, user_id)
|
||||||
)
|
)
|
||||||
logger.info(f"[process_file_upload] Task completed successfully for: {filename}")
|
logger.info(
|
||||||
|
f"[process_file_upload] Task completed successfully for: {filename}"
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"[process_file_upload] Task failed for {filename}: {e}\n"
|
f"[process_file_upload] Task failed for {filename}: {e}\n"
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,10 @@ def _clear_generating_podcast(search_space_id: int) -> None:
|
||||||
import redis
|
import redis
|
||||||
|
|
||||||
try:
|
try:
|
||||||
redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
|
redis_url = os.getenv(
|
||||||
|
"REDIS_APP_URL",
|
||||||
|
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
||||||
|
)
|
||||||
client = redis.from_url(redis_url, decode_responses=True)
|
client = redis.from_url(redis_url, decode_responses=True)
|
||||||
key = f"podcast:generating:{search_space_id}"
|
key = f"podcast:generating:{search_space_id}"
|
||||||
client.delete(key)
|
client.delete(key)
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,10 @@ def get_redis_client() -> redis.Redis:
|
||||||
"""Get or create Redis client for heartbeat checking."""
|
"""Get or create Redis client for heartbeat checking."""
|
||||||
global _redis_client
|
global _redis_client
|
||||||
if _redis_client is None:
|
if _redis_client is None:
|
||||||
redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
|
redis_url = os.getenv(
|
||||||
|
"REDIS_APP_URL",
|
||||||
|
os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
||||||
|
)
|
||||||
_redis_client = redis.from_url(redis_url, decode_responses=True)
|
_redis_client = redis.from_url(redis_url, decode_responses=True)
|
||||||
return _redis_client
|
return _redis_client
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue