diff --git a/surfsense_backend/app/automations/triggers/builtin/event/__init__.py b/surfsense_backend/app/automations/triggers/builtin/event/__init__.py new file mode 100644 index 000000000..8dc89dfa1 --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/event/__init__.py @@ -0,0 +1,29 @@ +"""``event`` trigger: fire an automation when a matching domain event is published. + +Subscribes to the event bus and matches events against a user-authored JSON +filter (see :mod:`.filter`). +""" + +from __future__ import annotations + +from app.event_bus import bus + +from .filter import FilterError, matches +from .inputs import event_runtime_inputs +from .match import trigger_matches_event +from .params import EventTriggerParams +from .source import on_event + +__all__ = [ + "EventTriggerParams", + "FilterError", + "event_runtime_inputs", + "matches", + "trigger_matches_event", +] + +# Side-effect: register on the triggers store. +from . import definition # noqa: F401 + +# Side-effect: react to published events. +bus.subscribe(on_event) diff --git a/surfsense_backend/app/automations/triggers/builtin/event/definition.py b/surfsense_backend/app/automations/triggers/builtin/event/definition.py new file mode 100644 index 000000000..b1ef6d4e2 --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/event/definition.py @@ -0,0 +1,16 @@ +"""``event`` ``TriggerDefinition`` registration.""" + +from __future__ import annotations + +from app.automations.triggers.store import register_trigger +from app.automations.triggers.types import TriggerDefinition + +from .params import EventTriggerParams + +EVENT_TRIGGER = TriggerDefinition( + type="event", + description="Fire when a matching domain event is published.", + params_model=EventTriggerParams, +) + +register_trigger(EVENT_TRIGGER) diff --git a/surfsense_backend/app/automations/triggers/builtin/event/selector.py b/surfsense_backend/app/automations/triggers/builtin/event/selector.py new file mode 100644 index 000000000..11d9ae7f5 --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/event/selector.py @@ -0,0 +1,75 @@ +"""Event selector (worker task): pick the triggers an event fires, start each. + +The source enqueues this with a serialized event. Here we load the enabled +``event`` triggers for that event type, keep the ones whose filter matches the +payload, and start a run for each. Per-trigger failures are isolated. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.automations.dispatch import start_run +from app.automations.persistence.enums.trigger_type import TriggerType +from app.automations.persistence.models.trigger import AutomationTrigger +from app.celery_app import celery_app +from app.event_bus import Event +from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task + +from .inputs import event_runtime_inputs +from .match import trigger_matches_event +from .source import TASK_NAME + +logger = logging.getLogger(__name__) + + +@celery_app.task(name=TASK_NAME) +def automation_event_select(event: dict[str, Any]) -> None: + """Select and start the runs an event fires.""" + return run_async_celery_task(lambda: _select_and_start(event)) + + +async def _select_and_start(event_dict: dict[str, Any]) -> None: + event = Event.model_validate(event_dict) + session_maker = get_celery_session_maker() + async with session_maker() as session: + for trigger in await _eligible(session, event=event): + await _start_one(session, trigger=trigger, event=event) + + +async def _eligible( + session: AsyncSession, *, event: Event +) -> list[AutomationTrigger]: + """Enabled ``event`` triggers for this event type whose filter matches.""" + stmt = select(AutomationTrigger).where( + AutomationTrigger.type == TriggerType.EVENT, + AutomationTrigger.enabled.is_(True), + AutomationTrigger.params["event_type"].astext == event.event_type, + ) + triggers = (await session.execute(stmt)).scalars().all() + return [t for t in triggers if trigger_matches_event(t.params, event)] + + +async def _start_one( + session: AsyncSession, *, trigger: AutomationTrigger, event: Event +) -> None: + try: + run = await start_run( + session=session, + trigger=trigger, + runtime_inputs=event_runtime_inputs(event), + ) + logger.info( + "event fire: trigger=%d automation=%d run=%d event=%s", + trigger.id, + trigger.automation_id, + run.id, + event.event_id, + ) + except Exception: + logger.exception("event fire failed for trigger %d", trigger.id) + await session.rollback() diff --git a/surfsense_backend/app/automations/triggers/builtin/event/source.py b/surfsense_backend/app/automations/triggers/builtin/event/source.py new file mode 100644 index 000000000..b8e067b12 --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/event/source.py @@ -0,0 +1,19 @@ +"""Event trigger source: the bus subscriber that enqueues the selector. + +Runs in whatever process published the event, so it stays thin — it only hands +the event to a worker (the selector does the DB matching). +""" + +from __future__ import annotations + +from app.event_bus import Event + +TASK_NAME = "automation_event_select" + + +async def on_event(event: Event) -> None: + """Enqueue the selector for ``event``.""" + # Lazy import: keeps app.celery_app out of the triggers-package import graph. + from app.celery_app import celery_app + + celery_app.send_task(TASK_NAME, kwargs={"event": event.model_dump(mode="json")}) diff --git a/surfsense_backend/tests/unit/automations/triggers/builtin/event/test_definition.py b/surfsense_backend/tests/unit/automations/triggers/builtin/event/test_definition.py new file mode 100644 index 000000000..479943cc2 --- /dev/null +++ b/surfsense_backend/tests/unit/automations/triggers/builtin/event/test_definition.py @@ -0,0 +1,18 @@ +"""The ``event`` trigger self-registers on the triggers store at import.""" + +from __future__ import annotations + +import pytest + +from app.automations.triggers import get_trigger +from app.automations.triggers.builtin.event.params import EventTriggerParams + +pytestmark = pytest.mark.unit + + +def test_event_trigger_is_registered() -> None: + definition = get_trigger("event") + + assert definition is not None + assert definition.type == "event" + assert definition.params_model is EventTriggerParams