diff --git a/surfsense_backend/app/automations/tasks/__init__.py b/surfsense_backend/app/automations/tasks/__init__.py new file mode 100644 index 000000000..6fe0d62e8 --- /dev/null +++ b/surfsense_backend/app/automations/tasks/__init__.py @@ -0,0 +1,3 @@ +"""Celery task wrappers for the automation runtime.""" + +from __future__ import annotations diff --git a/surfsense_backend/app/automations/tasks/execute_run.py b/surfsense_backend/app/automations/tasks/execute_run.py new file mode 100644 index 000000000..5fc84698b --- /dev/null +++ b/surfsense_backend/app/automations/tasks/execute_run.py @@ -0,0 +1,33 @@ +"""Celery task that runs one automation. Thin wrapper over ``runtime.executor``.""" + +from __future__ import annotations + +import logging + +from app.automations.runtime import execute_run +from app.celery_app import celery_app +from app.tasks.celery_tasks import ( + get_celery_session_maker, + run_async_celery_task, +) + +logger = logging.getLogger(__name__) + +TASK_NAME = "automation_run_execute" + + +@celery_app.task(name=TASK_NAME, bind=True) +def automation_run_execute(self, run_id: int) -> None: # noqa: ARG001 — Celery bind + """Execute one ``AutomationRun``. Idempotent: terminal runs no-op.""" + return run_async_celery_task(lambda: _impl(run_id)) + + +async def _impl(run_id: int) -> None: + session_maker = get_celery_session_maker() + async with session_maker() as session: + try: + await execute_run(session, run_id) + except Exception: + logger.exception("automation_run %d failed unexpectedly", run_id) + await session.rollback() + raise diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 5b45baca1..569178239 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -188,6 +188,7 @@ celery_app = Celery( "app.tasks.celery_tasks.document_reindex_tasks", "app.tasks.celery_tasks.stale_notification_cleanup_task", "app.tasks.celery_tasks.stripe_reconciliation_task", + "app.automations.tasks.execute_run", ], )