diff --git a/surfsense_backend/app/automations/runtime/executor.py b/surfsense_backend/app/automations/runtime/executor.py new file mode 100644 index 000000000..51c4417e3 --- /dev/null +++ b/surfsense_backend/app/automations/runtime/executor.py @@ -0,0 +1,105 @@ +"""Walk an ``AutomationRun``'s snapshot plan to terminal state.""" + +from __future__ import annotations + +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.automations.persistence.enums.run_status import RunStatus +from app.automations.persistence.models.run import AutomationRun +from app.automations.schemas.definition.envelope import AutomationDefinition +from app.automations.templating import build_run_context + +from . import repository +from .step import execute_step + + +async def execute_run(session: AsyncSession, run_id: int) -> None: + """Load run ``run_id`` and execute its snapshot plan to a terminal state.""" + run = await repository.load_run(session, run_id) + if run is None: + raise ValueError(f"automation_run {run_id} not found") + + if run.status != RunStatus.PENDING: + return + + try: + definition = AutomationDefinition.model_validate(run.definition_snapshot) + except Exception as exc: + await repository.mark_failed( + session, + run, + {"message": f"definition_snapshot invalid: {exc}", "type": type(exc).__name__}, + ) + await session.commit() + return + + await repository.mark_running(session, run) + await session.commit() + + step_outputs: dict[str, Any] = {} + + for step in definition.plan: + ctx = _build_ctx(run, step_outputs) + result = await execute_step( + step=step, + template_context=ctx, + default_max_retries=definition.execution.max_retries, + default_retry_backoff=definition.execution.retry_backoff, + default_timeout_seconds=definition.execution.timeout_seconds, + ) + await repository.append_step_result(session, run, result) + await session.commit() + + if result["status"] == "failed": + await _run_on_failure(session, run, definition) + await repository.mark_failed(session, run, result.get("error")) + await session.commit() + return + + if result["status"] == "succeeded": + step_outputs[step.output_as or step.step_id] = result.get("result") + + await repository.mark_succeeded(session, run) + await session.commit() + + +async def _run_on_failure( + session: AsyncSession, + run: AutomationRun, + definition: AutomationDefinition, +) -> None: + """Run the on_failure steps. Their failures don't recurse into more on_failure.""" + if not definition.execution.on_failure: + return + ctx = _build_ctx(run, step_outputs={}) + for step in definition.execution.on_failure: + result = await execute_step( + step=step, + template_context=ctx, + default_max_retries=definition.execution.max_retries, + default_retry_backoff=definition.execution.retry_backoff, + default_timeout_seconds=definition.execution.timeout_seconds, + ) + await repository.append_step_result(session, run, result) + await session.commit() + + +def _build_ctx(run: AutomationRun, step_outputs: dict[str, Any]) -> dict[str, Any]: + automation = run.automation + trigger = run.trigger + return build_run_context( + run_id=run.id, + automation_id=run.automation_id, + automation_name=automation.name if automation else None, + automation_version=automation.version if automation else None, + search_space_id=automation.search_space_id if automation else None, + creator_id=automation.created_by_user_id if automation else None, + trigger_id=run.trigger_id, + trigger_type=trigger.type.value if trigger else None, + started_at=run.started_at, + attempt=1, + resolved_inputs=run.resolved_inputs or {}, + step_outputs=step_outputs, + )