diff --git a/surfsense_backend/Dockerfile b/surfsense_backend/Dockerfile index 9ce6467b3..b7b96b6cb 100644 --- a/surfsense_backend/Dockerfile +++ b/surfsense_backend/Dockerfile @@ -74,6 +74,23 @@ RUN dos2unix /app/scripts/docker/entrypoint.sh && chmod +x /app/scripts/docker/e ENV PYTHONPATH=/app ENV UVICORN_LOOP=asyncio +# SERVICE_ROLE controls which process this container runs: +# api – FastAPI backend only (runs migrations on startup) +# worker – Celery worker only +# beat – Celery beat scheduler only +# all – All three (legacy / dev default) +ENV SERVICE_ROLE=all + +# Celery worker tuning (only used when SERVICE_ROLE=worker or all) +ENV CELERY_MAX_WORKERS=10 +ENV CELERY_MIN_WORKERS=2 +ENV CELERY_MAX_TASKS_PER_CHILD=50 +# CELERY_QUEUES: comma-separated queues to consume (empty = all queues) +# "surfsense" – fast tasks only (file uploads, podcasts, etc.) +# "surfsense.connectors" – slow connector indexing tasks only +# "" – both queues (default, for single-worker setups) +ENV CELERY_QUEUES="" + # Run EXPOSE 8000-8001 CMD ["/app/scripts/docker/entrypoint.sh"] \ No newline at end of file diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index af406eab7..477e5369f 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -86,6 +86,11 @@ celery_app = Celery( ], ) +# ── Queue names ────────────────────────────────────────────── +# Default queue : fast, user-facing tasks (file upload, podcast, reindex, …) +# Connectors queue: slow, long-running indexing tasks (Notion, Gmail, web crawl, …) +CONNECTORS_QUEUE = f"{CELERY_TASK_DEFAULT_QUEUE}.connectors" + # Celery configuration celery_app.conf.update( # Task settings @@ -114,6 +119,34 @@ celery_app.conf.update( broker_connection_retry_on_startup=True, # Beat scheduler settings beat_max_loop_interval=60, # Check every minute + # ── Task routing ───────────────────────────────────────── + # Route slow connector/indexing tasks to a dedicated queue so they + # never block fast user-facing tasks (file uploads, podcasts, etc.) + task_routes={ + # Connector indexing tasks → connectors queue + "index_slack_messages": {"queue": CONNECTORS_QUEUE}, + "index_notion_pages": {"queue": CONNECTORS_QUEUE}, + "index_github_repos": {"queue": CONNECTORS_QUEUE}, + "index_linear_issues": {"queue": CONNECTORS_QUEUE}, + "index_jira_issues": {"queue": CONNECTORS_QUEUE}, + "index_confluence_pages": {"queue": CONNECTORS_QUEUE}, + "index_clickup_tasks": {"queue": CONNECTORS_QUEUE}, + "index_google_calendar_events": {"queue": CONNECTORS_QUEUE}, + "index_airtable_records": {"queue": CONNECTORS_QUEUE}, + "index_google_gmail_messages": {"queue": CONNECTORS_QUEUE}, + "index_google_drive_files": {"queue": CONNECTORS_QUEUE}, + "index_discord_messages": {"queue": CONNECTORS_QUEUE}, + "index_teams_messages": {"queue": CONNECTORS_QUEUE}, + "index_luma_events": {"queue": CONNECTORS_QUEUE}, + "index_elasticsearch_documents": {"queue": CONNECTORS_QUEUE}, + "index_crawled_urls": {"queue": CONNECTORS_QUEUE}, + "index_bookstack_pages": {"queue": CONNECTORS_QUEUE}, + "index_obsidian_vault": {"queue": CONNECTORS_QUEUE}, + "index_composio_connector": {"queue": CONNECTORS_QUEUE}, + "delete_connector_with_documents": {"queue": CONNECTORS_QUEUE}, + # Everything else (document processing, podcasts, reindexing, + # schedule checker, cleanup) stays on the default fast queue. + }, ) # Configure Celery Beat schedule diff --git a/surfsense_backend/scripts/docker/entrypoint.sh b/surfsense_backend/scripts/docker/entrypoint.sh index f721e4e85..ce0f1ce13 100644 --- a/surfsense_backend/scripts/docker/entrypoint.sh +++ b/surfsense_backend/scripts/docker/entrypoint.sh @@ -1,58 +1,129 @@ #!/bin/bash set -e -# Function to handle shutdown gracefully +# ───────────────────────────────────────────────────────────── +# SERVICE_ROLE controls which process(es) this container runs. +# +# api – FastAPI backend only (runs migrations on startup) +# worker – Celery worker only +# beat – Celery beat scheduler only +# all – All three in one container (legacy / dev default) +# +# Set SERVICE_ROLE as an environment variable in Coolify for +# each service deployment. +# ───────────────────────────────────────────────────────────── +SERVICE_ROLE="${SERVICE_ROLE:-all}" +echo "Starting SurfSense with SERVICE_ROLE=${SERVICE_ROLE}" + +# ── Autoscale defaults (override via env) ──────────────────── +# CELERY_MAX_WORKERS – max concurrent worker processes +# CELERY_MIN_WORKERS – min workers kept warm +# CELERY_QUEUES – comma-separated queues to consume +# (empty = all queues for backward compat) +CELERY_MAX_WORKERS="${CELERY_MAX_WORKERS:-10}" +CELERY_MIN_WORKERS="${CELERY_MIN_WORKERS:-2}" +CELERY_MAX_TASKS_PER_CHILD="${CELERY_MAX_TASKS_PER_CHILD:-50}" +CELERY_QUEUES="${CELERY_QUEUES:-}" + +# ── Graceful shutdown ──────────────────────────────────────── +PIDS=() + cleanup() { echo "Shutting down services..." - kill -TERM "$backend_pid" "$celery_worker_pid" "$celery_beat_pid" 2>/dev/null || true - wait "$backend_pid" "$celery_worker_pid" "$celery_beat_pid" 2>/dev/null || true + for pid in "${PIDS[@]}"; do + kill -TERM "$pid" 2>/dev/null || true + done + for pid in "${PIDS[@]}"; do + wait "$pid" 2>/dev/null || true + done exit 0 } trap cleanup SIGTERM SIGINT -# Run database migrations with safeguards -echo "Running database migrations..." -# Wait for database to be ready (max 30 seconds) -for i in {1..30}; do - if python -c "from app.db import engine; import asyncio; asyncio.run(engine.dispose())" 2>/dev/null; then - echo "Database is ready." - break +# ── Database migrations (only for api / all) ───────────────── +run_migrations() { + echo "Running database migrations..." + for i in {1..30}; do + if python -c "from app.db import engine; import asyncio; asyncio.run(engine.dispose())" 2>/dev/null; then + echo "Database is ready." + break + fi + echo "Waiting for database... ($i/30)" + sleep 1 + done + + if timeout 60 alembic upgrade head 2>&1; then + echo "Migrations completed successfully." + else + echo "WARNING: Migration failed or timed out. Continuing anyway..." + echo "You may need to run migrations manually: alembic upgrade head" fi - echo "Waiting for database... ($i/30)" - sleep 1 -done +} -# Run migrations with timeout (60 seconds max) -if timeout 60 alembic upgrade head 2>&1; then - echo "Migrations completed successfully." -else - echo "WARNING: Migration failed or timed out. Continuing anyway..." - echo "You may need to run migrations manually: alembic upgrade head" -fi +# ── Service starters ───────────────────────────────────────── +start_api() { + echo "Starting FastAPI Backend..." + python main.py & + PIDS+=($!) + echo " FastAPI PID=${PIDS[-1]}" +} -echo "Starting FastAPI Backend..." -python main.py & -backend_pid=$! +start_worker() { + QUEUE_ARGS="" + if [ -n "${CELERY_QUEUES}" ]; then + QUEUE_ARGS="--queues=${CELERY_QUEUES}" + fi -# Wait a bit for backend to initialize -sleep 5 + echo "Starting Celery Worker (autoscale=${CELERY_MAX_WORKERS},${CELERY_MIN_WORKERS}, max-tasks-per-child=${CELERY_MAX_TASKS_PER_CHILD}, queues=${CELERY_QUEUES:-all})..." + celery -A app.celery_app worker \ + --loglevel=info \ + --autoscale="${CELERY_MAX_WORKERS},${CELERY_MIN_WORKERS}" \ + --max-tasks-per-child="${CELERY_MAX_TASKS_PER_CHILD}" \ + --prefetch-multiplier=1 \ + -Ofair \ + ${QUEUE_ARGS} & + PIDS+=($!) + echo " Celery Worker PID=${PIDS[-1]}" +} -echo "Starting Celery Worker..." -celery -A app.celery_app worker --loglevel=info --autoscale=128,4 & -celery_worker_pid=$! +start_beat() { + echo "Starting Celery Beat..." + celery -A app.celery_app beat --loglevel=info & + PIDS+=($!) + echo " Celery Beat PID=${PIDS[-1]}" +} -# Wait a bit for worker to initialize -sleep 3 +# ── Main: run based on role ────────────────────────────────── +case "${SERVICE_ROLE}" in + api) + run_migrations + start_api + ;; + worker) + start_worker + ;; + beat) + start_beat + ;; + all) + run_migrations + start_api + sleep 5 + start_worker + sleep 3 + start_beat + ;; + *) + echo "ERROR: Unknown SERVICE_ROLE '${SERVICE_ROLE}'. Use: api, worker, beat, or all" + exit 1 + ;; +esac -echo "Starting Celery Beat..." -celery -A app.celery_app beat --loglevel=info & -celery_beat_pid=$! - -echo "All services started. PIDs: Backend=$backend_pid, Worker=$celery_worker_pid, Beat=$celery_beat_pid" +echo "All requested services started. PIDs: ${PIDS[*]}" # Wait for any process to exit wait -n -# If we get here, one process exited, so exit with its status +# If we get here, one process exited unexpectedly exit $?