mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-25 00:36:31 +02:00
hotpatch(cloud): added SERVICE_ROLE and CELERY_QUEUES for task seperation
This commit is contained in:
parent
7b1017c295
commit
e1da6a61a4
3 changed files with 157 additions and 36 deletions
|
|
@ -74,6 +74,23 @@ RUN dos2unix /app/scripts/docker/entrypoint.sh && chmod +x /app/scripts/docker/e
|
|||
ENV PYTHONPATH=/app
|
||||
ENV UVICORN_LOOP=asyncio
|
||||
|
||||
# SERVICE_ROLE controls which process this container runs:
|
||||
# api – FastAPI backend only (runs migrations on startup)
|
||||
# worker – Celery worker only
|
||||
# beat – Celery beat scheduler only
|
||||
# all – All three (legacy / dev default)
|
||||
ENV SERVICE_ROLE=all
|
||||
|
||||
# Celery worker tuning (only used when SERVICE_ROLE=worker or all)
|
||||
ENV CELERY_MAX_WORKERS=10
|
||||
ENV CELERY_MIN_WORKERS=2
|
||||
ENV CELERY_MAX_TASKS_PER_CHILD=50
|
||||
# CELERY_QUEUES: comma-separated queues to consume (empty = all queues)
|
||||
# "surfsense" – fast tasks only (file uploads, podcasts, etc.)
|
||||
# "surfsense.connectors" – slow connector indexing tasks only
|
||||
# "" – both queues (default, for single-worker setups)
|
||||
ENV CELERY_QUEUES=""
|
||||
|
||||
# Run
|
||||
EXPOSE 8000-8001
|
||||
CMD ["/app/scripts/docker/entrypoint.sh"]
|
||||
|
|
@ -86,6 +86,11 @@ celery_app = Celery(
|
|||
],
|
||||
)
|
||||
|
||||
# ── Queue names ──────────────────────────────────────────────
|
||||
# Default queue : fast, user-facing tasks (file upload, podcast, reindex, …)
|
||||
# Connectors queue: slow, long-running indexing tasks (Notion, Gmail, web crawl, …)
|
||||
CONNECTORS_QUEUE = f"{CELERY_TASK_DEFAULT_QUEUE}.connectors"
|
||||
|
||||
# Celery configuration
|
||||
celery_app.conf.update(
|
||||
# Task settings
|
||||
|
|
@ -114,6 +119,34 @@ celery_app.conf.update(
|
|||
broker_connection_retry_on_startup=True,
|
||||
# Beat scheduler settings
|
||||
beat_max_loop_interval=60, # Check every minute
|
||||
# ── Task routing ─────────────────────────────────────────
|
||||
# Route slow connector/indexing tasks to a dedicated queue so they
|
||||
# never block fast user-facing tasks (file uploads, podcasts, etc.)
|
||||
task_routes={
|
||||
# Connector indexing tasks → connectors queue
|
||||
"index_slack_messages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_notion_pages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_github_repos": {"queue": CONNECTORS_QUEUE},
|
||||
"index_linear_issues": {"queue": CONNECTORS_QUEUE},
|
||||
"index_jira_issues": {"queue": CONNECTORS_QUEUE},
|
||||
"index_confluence_pages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_clickup_tasks": {"queue": CONNECTORS_QUEUE},
|
||||
"index_google_calendar_events": {"queue": CONNECTORS_QUEUE},
|
||||
"index_airtable_records": {"queue": CONNECTORS_QUEUE},
|
||||
"index_google_gmail_messages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_google_drive_files": {"queue": CONNECTORS_QUEUE},
|
||||
"index_discord_messages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_teams_messages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_luma_events": {"queue": CONNECTORS_QUEUE},
|
||||
"index_elasticsearch_documents": {"queue": CONNECTORS_QUEUE},
|
||||
"index_crawled_urls": {"queue": CONNECTORS_QUEUE},
|
||||
"index_bookstack_pages": {"queue": CONNECTORS_QUEUE},
|
||||
"index_obsidian_vault": {"queue": CONNECTORS_QUEUE},
|
||||
"index_composio_connector": {"queue": CONNECTORS_QUEUE},
|
||||
"delete_connector_with_documents": {"queue": CONNECTORS_QUEUE},
|
||||
# Everything else (document processing, podcasts, reindexing,
|
||||
# schedule checker, cleanup) stays on the default fast queue.
|
||||
},
|
||||
)
|
||||
|
||||
# Configure Celery Beat schedule
|
||||
|
|
|
|||
|
|
@ -1,58 +1,129 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
# Function to handle shutdown gracefully
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
# SERVICE_ROLE controls which process(es) this container runs.
|
||||
#
|
||||
# api – FastAPI backend only (runs migrations on startup)
|
||||
# worker – Celery worker only
|
||||
# beat – Celery beat scheduler only
|
||||
# all – All three in one container (legacy / dev default)
|
||||
#
|
||||
# Set SERVICE_ROLE as an environment variable in Coolify for
|
||||
# each service deployment.
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
SERVICE_ROLE="${SERVICE_ROLE:-all}"
|
||||
echo "Starting SurfSense with SERVICE_ROLE=${SERVICE_ROLE}"
|
||||
|
||||
# ── Autoscale defaults (override via env) ────────────────────
|
||||
# CELERY_MAX_WORKERS – max concurrent worker processes
|
||||
# CELERY_MIN_WORKERS – min workers kept warm
|
||||
# CELERY_QUEUES – comma-separated queues to consume
|
||||
# (empty = all queues for backward compat)
|
||||
CELERY_MAX_WORKERS="${CELERY_MAX_WORKERS:-10}"
|
||||
CELERY_MIN_WORKERS="${CELERY_MIN_WORKERS:-2}"
|
||||
CELERY_MAX_TASKS_PER_CHILD="${CELERY_MAX_TASKS_PER_CHILD:-50}"
|
||||
CELERY_QUEUES="${CELERY_QUEUES:-}"
|
||||
|
||||
# ── Graceful shutdown ────────────────────────────────────────
|
||||
PIDS=()
|
||||
|
||||
cleanup() {
|
||||
echo "Shutting down services..."
|
||||
kill -TERM "$backend_pid" "$celery_worker_pid" "$celery_beat_pid" 2>/dev/null || true
|
||||
wait "$backend_pid" "$celery_worker_pid" "$celery_beat_pid" 2>/dev/null || true
|
||||
for pid in "${PIDS[@]}"; do
|
||||
kill -TERM "$pid" 2>/dev/null || true
|
||||
done
|
||||
for pid in "${PIDS[@]}"; do
|
||||
wait "$pid" 2>/dev/null || true
|
||||
done
|
||||
exit 0
|
||||
}
|
||||
|
||||
trap cleanup SIGTERM SIGINT
|
||||
|
||||
# Run database migrations with safeguards
|
||||
echo "Running database migrations..."
|
||||
# Wait for database to be ready (max 30 seconds)
|
||||
for i in {1..30}; do
|
||||
if python -c "from app.db import engine; import asyncio; asyncio.run(engine.dispose())" 2>/dev/null; then
|
||||
echo "Database is ready."
|
||||
break
|
||||
# ── Database migrations (only for api / all) ─────────────────
|
||||
run_migrations() {
|
||||
echo "Running database migrations..."
|
||||
for i in {1..30}; do
|
||||
if python -c "from app.db import engine; import asyncio; asyncio.run(engine.dispose())" 2>/dev/null; then
|
||||
echo "Database is ready."
|
||||
break
|
||||
fi
|
||||
echo "Waiting for database... ($i/30)"
|
||||
sleep 1
|
||||
done
|
||||
|
||||
if timeout 60 alembic upgrade head 2>&1; then
|
||||
echo "Migrations completed successfully."
|
||||
else
|
||||
echo "WARNING: Migration failed or timed out. Continuing anyway..."
|
||||
echo "You may need to run migrations manually: alembic upgrade head"
|
||||
fi
|
||||
echo "Waiting for database... ($i/30)"
|
||||
sleep 1
|
||||
done
|
||||
}
|
||||
|
||||
# Run migrations with timeout (60 seconds max)
|
||||
if timeout 60 alembic upgrade head 2>&1; then
|
||||
echo "Migrations completed successfully."
|
||||
else
|
||||
echo "WARNING: Migration failed or timed out. Continuing anyway..."
|
||||
echo "You may need to run migrations manually: alembic upgrade head"
|
||||
fi
|
||||
# ── Service starters ─────────────────────────────────────────
|
||||
start_api() {
|
||||
echo "Starting FastAPI Backend..."
|
||||
python main.py &
|
||||
PIDS+=($!)
|
||||
echo " FastAPI PID=${PIDS[-1]}"
|
||||
}
|
||||
|
||||
echo "Starting FastAPI Backend..."
|
||||
python main.py &
|
||||
backend_pid=$!
|
||||
start_worker() {
|
||||
QUEUE_ARGS=""
|
||||
if [ -n "${CELERY_QUEUES}" ]; then
|
||||
QUEUE_ARGS="--queues=${CELERY_QUEUES}"
|
||||
fi
|
||||
|
||||
# Wait a bit for backend to initialize
|
||||
sleep 5
|
||||
echo "Starting Celery Worker (autoscale=${CELERY_MAX_WORKERS},${CELERY_MIN_WORKERS}, max-tasks-per-child=${CELERY_MAX_TASKS_PER_CHILD}, queues=${CELERY_QUEUES:-all})..."
|
||||
celery -A app.celery_app worker \
|
||||
--loglevel=info \
|
||||
--autoscale="${CELERY_MAX_WORKERS},${CELERY_MIN_WORKERS}" \
|
||||
--max-tasks-per-child="${CELERY_MAX_TASKS_PER_CHILD}" \
|
||||
--prefetch-multiplier=1 \
|
||||
-Ofair \
|
||||
${QUEUE_ARGS} &
|
||||
PIDS+=($!)
|
||||
echo " Celery Worker PID=${PIDS[-1]}"
|
||||
}
|
||||
|
||||
echo "Starting Celery Worker..."
|
||||
celery -A app.celery_app worker --loglevel=info --autoscale=128,4 &
|
||||
celery_worker_pid=$!
|
||||
start_beat() {
|
||||
echo "Starting Celery Beat..."
|
||||
celery -A app.celery_app beat --loglevel=info &
|
||||
PIDS+=($!)
|
||||
echo " Celery Beat PID=${PIDS[-1]}"
|
||||
}
|
||||
|
||||
# Wait a bit for worker to initialize
|
||||
sleep 3
|
||||
# ── Main: run based on role ──────────────────────────────────
|
||||
case "${SERVICE_ROLE}" in
|
||||
api)
|
||||
run_migrations
|
||||
start_api
|
||||
;;
|
||||
worker)
|
||||
start_worker
|
||||
;;
|
||||
beat)
|
||||
start_beat
|
||||
;;
|
||||
all)
|
||||
run_migrations
|
||||
start_api
|
||||
sleep 5
|
||||
start_worker
|
||||
sleep 3
|
||||
start_beat
|
||||
;;
|
||||
*)
|
||||
echo "ERROR: Unknown SERVICE_ROLE '${SERVICE_ROLE}'. Use: api, worker, beat, or all"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "Starting Celery Beat..."
|
||||
celery -A app.celery_app beat --loglevel=info &
|
||||
celery_beat_pid=$!
|
||||
|
||||
echo "All services started. PIDs: Backend=$backend_pid, Worker=$celery_worker_pid, Beat=$celery_beat_pid"
|
||||
echo "All requested services started. PIDs: ${PIDS[*]}"
|
||||
|
||||
# Wait for any process to exit
|
||||
wait -n
|
||||
|
||||
# If we get here, one process exited, so exit with its status
|
||||
# If we get here, one process exited unexpectedly
|
||||
exit $?
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue