diff --git a/surfsense_backend/app/automations/dispatch/__init__.py b/surfsense_backend/app/automations/dispatch/__init__.py index 4a549a4ce..be8a36581 100644 --- a/surfsense_backend/app/automations/dispatch/__init__.py +++ b/surfsense_backend/app/automations/dispatch/__init__.py @@ -1,8 +1,8 @@ -"""Public dispatch surface for firing automations.""" +"""Generic dispatch primitives shared across trigger types.""" -from .manual import DispatchError, dispatch_manual_run +from __future__ import annotations -__all__ = [ - "DispatchError", - "dispatch_manual_run", -] +from .errors import DispatchError +from .run import dispatch_run + +__all__ = ["DispatchError", "dispatch_run"] diff --git a/surfsense_backend/app/automations/dispatch/errors.py b/surfsense_backend/app/automations/dispatch/errors.py new file mode 100644 index 000000000..75640a987 --- /dev/null +++ b/surfsense_backend/app/automations/dispatch/errors.py @@ -0,0 +1,7 @@ +"""Dispatch errors raised when a fire request cannot be turned into a run.""" + +from __future__ import annotations + + +class DispatchError(Exception): + """A dispatch could not proceed (missing trigger, invalid inputs, ...).""" diff --git a/surfsense_backend/app/automations/dispatch/run.py b/surfsense_backend/app/automations/dispatch/run.py new file mode 100644 index 000000000..fd5107a18 --- /dev/null +++ b/surfsense_backend/app/automations/dispatch/run.py @@ -0,0 +1,72 @@ +"""Generic run dispatch: validate, snapshot, persist, enqueue. Shared by every trigger.""" + +from __future__ import annotations + +from typing import Any + +import jsonschema +from sqlalchemy.ext.asyncio import AsyncSession + +from app.automations.persistence.enums.run_status import RunStatus +from app.automations.persistence.models.automation import Automation +from app.automations.persistence.models.run import AutomationRun +from app.automations.persistence.models.trigger import AutomationTrigger +from app.automations.schemas.definition.envelope import AutomationDefinition +from app.automations.tasks.execute_run import automation_run_execute + +from .errors import DispatchError + + +async def dispatch_run( + *, + session: AsyncSession, + automation: Automation, + trigger: AutomationTrigger, + payload: dict[str, Any] | None, +) -> AutomationRun: + """Validate, snapshot the definition, persist an ``AutomationRun``, enqueue execution. + + Callers (trigger-specific adapters) are responsible for resolving + ``automation`` and ``trigger`` and for the trigger-side ``ACTIVE`` / + ``enabled`` guards. This function only handles what's identical across + every trigger type. + """ + try: + definition = AutomationDefinition.model_validate(automation.definition) + except Exception as exc: + raise DispatchError(f"invalid automation definition: {exc}") from exc + + resolved_inputs = _validate_inputs(definition, payload or {}) + snapshot = definition.model_dump(mode="json", by_alias=True) + + run = AutomationRun( + automation_id=automation.id, + trigger_id=trigger.id, + status=RunStatus.PENDING, + definition_snapshot=snapshot, + trigger_payload=payload, + resolved_inputs=resolved_inputs, + step_results=[], + artifacts=[], + ) + session.add(run) + await session.commit() + await session.refresh(run) + + automation_run_execute.apply_async( + args=[run.id], + time_limit=definition.execution.timeout_seconds, + ) + return run + + +def _validate_inputs( + definition: AutomationDefinition, payload: dict[str, Any] +) -> dict[str, Any]: + if definition.inputs is None or not definition.inputs.schema_: + return {} + try: + jsonschema.validate(instance=payload, schema=definition.inputs.schema_) + except jsonschema.ValidationError as exc: + raise DispatchError(f"inputs: {exc.message}") from exc + return payload diff --git a/surfsense_backend/app/automations/triggers/manual/__init__.py b/surfsense_backend/app/automations/triggers/manual/__init__.py index bd9b8bf43..65cca9270 100644 --- a/surfsense_backend/app/automations/triggers/manual/__init__.py +++ b/surfsense_backend/app/automations/triggers/manual/__init__.py @@ -2,9 +2,10 @@ from __future__ import annotations +from .dispatch import dispatch_manual_run from .params import ManualTriggerParams -__all__ = ["ManualTriggerParams"] +__all__ = ["ManualTriggerParams", "dispatch_manual_run"] # Side-effect: register on the triggers store. from . import definition # noqa: E402, F401 diff --git a/surfsense_backend/app/automations/dispatch/manual.py b/surfsense_backend/app/automations/triggers/manual/dispatch.py similarity index 51% rename from surfsense_backend/app/automations/dispatch/manual.py rename to surfsense_backend/app/automations/triggers/manual/dispatch.py index 221d6a3e2..750c99937 100644 --- a/surfsense_backend/app/automations/dispatch/manual.py +++ b/surfsense_backend/app/automations/triggers/manual/dispatch.py @@ -1,25 +1,18 @@ -"""Manual ``Run now`` dispatch: validate inputs, snapshot the definition, enqueue.""" +"""Manual ``Run now`` dispatch adapter: load + guard, then call generic dispatch.""" from __future__ import annotations from typing import Any -import jsonschema from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from app.automations.dispatch import DispatchError, dispatch_run from app.automations.persistence.enums.automation_status import AutomationStatus -from app.automations.persistence.enums.run_status import RunStatus from app.automations.persistence.enums.trigger_type import TriggerType from app.automations.persistence.models.automation import Automation from app.automations.persistence.models.run import AutomationRun from app.automations.persistence.models.trigger import AutomationTrigger -from app.automations.schemas.definition.envelope import AutomationDefinition -from app.automations.tasks.execute_run import automation_run_execute - - -class DispatchError(Exception): - """A manual dispatch could not proceed (missing trigger, invalid inputs, ...).""" async def dispatch_manual_run( @@ -28,7 +21,7 @@ async def dispatch_manual_run( automation_id: int, payload: dict[str, Any] | None, ) -> AutomationRun: - """Validate, snapshot, persist, and enqueue an ``AutomationRun``.""" + """Find the automation + its enabled manual trigger, then run the generic dispatch.""" automation = await _load_automation(session, automation_id) if automation is None: raise DispatchError(f"automation {automation_id} not found") @@ -38,39 +31,18 @@ async def dispatch_manual_run( f"automation {automation_id} is {automation.status.value}, not active" ) - try: - definition = AutomationDefinition.model_validate(automation.definition) - except Exception as exc: - raise DispatchError(f"invalid automation definition: {exc}") from exc - trigger = await _find_manual_trigger(session, automation_id) if trigger is None: raise DispatchError( f"automation {automation_id} has no enabled manual trigger" ) - resolved_inputs = _validate_inputs(definition, payload or {}) - snapshot = definition.model_dump(mode="json", by_alias=True) - - run = AutomationRun( - automation_id=automation_id, - trigger_id=trigger.id, - status=RunStatus.PENDING, - definition_snapshot=snapshot, - trigger_payload=payload, - resolved_inputs=resolved_inputs, - step_results=[], - artifacts=[], + return await dispatch_run( + session=session, + automation=automation, + trigger=trigger, + payload=payload, ) - session.add(run) - await session.commit() - await session.refresh(run) - - automation_run_execute.apply_async( - args=[run.id], - time_limit=definition.execution.timeout_seconds, - ) - return run async def _load_automation( @@ -93,15 +65,3 @@ async def _find_manual_trigger( .limit(1) ) return (await session.execute(stmt)).scalar_one_or_none() - - -def _validate_inputs( - definition: AutomationDefinition, payload: dict[str, Any] -) -> dict[str, Any]: - if definition.inputs is None or not definition.inputs.schema_: - return {} - try: - jsonschema.validate(instance=payload, schema=definition.inputs.schema_) - except jsonschema.ValidationError as exc: - raise DispatchError(f"inputs: {exc.message}") from exc - return payload diff --git a/surfsense_backend/app/routes/automations_routes.py b/surfsense_backend/app/routes/automations_routes.py index 02c019625..6c169b199 100644 --- a/surfsense_backend/app/routes/automations_routes.py +++ b/surfsense_backend/app/routes/automations_routes.py @@ -8,8 +8,9 @@ from fastapi import APIRouter, Body, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from app.automations.dispatch import DispatchError, dispatch_manual_run +from app.automations.dispatch import DispatchError from app.automations.persistence.models.automation import Automation +from app.automations.triggers.manual import dispatch_manual_run from app.db import Permission, User, get_async_session from app.users import current_active_user from app.utils.rbac import check_permission