From b1642993baa0b77f11252603fe26c0c868ad9312 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 29 May 2026 17:49:12 +0200 Subject: [PATCH] feat(automations): add schedule trigger source, selector and inputs --- .../triggers/builtin/schedule/inputs.py | 27 +++ .../triggers/builtin/schedule/selector.py | 182 ++++++++++++++++++ .../triggers/builtin/schedule/source.py | 20 ++ 3 files changed, 229 insertions(+) create mode 100644 surfsense_backend/app/automations/triggers/builtin/schedule/inputs.py create mode 100644 surfsense_backend/app/automations/triggers/builtin/schedule/selector.py create mode 100644 surfsense_backend/app/automations/triggers/builtin/schedule/source.py diff --git a/surfsense_backend/app/automations/triggers/builtin/schedule/inputs.py b/surfsense_backend/app/automations/triggers/builtin/schedule/inputs.py new file mode 100644 index 000000000..947975b28 --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/schedule/inputs.py @@ -0,0 +1,27 @@ +"""Build run inputs from a schedule fire.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + + +def schedule_runtime_inputs( + *, + fired_at: datetime, + scheduled_for: datetime, + previous_last_fired_at: datetime | None, +) -> dict[str, Any]: + """Calendar context for a scheduled run. + + - ``fired_at`` — actual fire time + - ``scheduled_for`` — cron-derived target time for this fire + - ``last_fired_at`` — previous fire time, or null on first fire + """ + return { + "fired_at": fired_at.isoformat(), + "scheduled_for": scheduled_for.isoformat(), + "last_fired_at": ( + previous_last_fired_at.isoformat() if previous_last_fired_at else None + ), + } diff --git a/surfsense_backend/app/automations/triggers/builtin/schedule/selector.py b/surfsense_backend/app/automations/triggers/builtin/schedule/selector.py new file mode 100644 index 000000000..9f52bc9f0 --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/schedule/selector.py @@ -0,0 +1,182 @@ +"""Schedule selector (worker task): claim due triggers and start each. + +Beat ticks this every minute. Two passes: + +1. **Self-heal**: enabled schedule triggers with NULL ``next_fire_at`` get it + computed from their ``cron`` + ``timezone`` (fresh inserts, restored rows). +2. **Claim & start**: due rows are locked ``FOR UPDATE SKIP LOCKED``, their + ``next_fire_at`` is advanced and ``last_fired_at`` set, and a run is started + for each. A missed fire stays missed (``catchup=False`` semantics). +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from datetime import UTC, datetime + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.automations.dispatch import start_run +from app.automations.persistence.enums.trigger_type import TriggerType +from app.automations.persistence.models.trigger import AutomationTrigger +from app.celery_app import celery_app +from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task + +from .cron import InvalidCronError, compute_next_fire_at +from .inputs import schedule_runtime_inputs +from .source import TASK_NAME + +logger = logging.getLogger(__name__) + +# Cap rows touched per tick so a backlog of due triggers can't starve the +# worker; remaining rows fire on the next tick. +_TICK_BATCH = 200 + + +@dataclass(frozen=True, slots=True) +class _Claim: + """Per-trigger fire context captured before row state is mutated.""" + + trigger_id: int + scheduled_for: datetime + previous_last_fired_at: datetime | None + + +@celery_app.task(name=TASK_NAME) +def automation_schedule_select() -> None: + """Tick once: self-heal NULL next_fire_at, claim due rows, start each.""" + return run_async_celery_task(_tick) + + +async def _tick() -> None: + session_maker = get_celery_session_maker() + async with session_maker() as session: + now = datetime.now(UTC) + + await _self_heal_null_next_fire(session, now=now) + + claims = await _claim_due_triggers(session, now=now) + if not claims: + return + + for claim in claims: + await _start_one(session, claim=claim, fired_at=now) + + +async def _self_heal_null_next_fire(session: AsyncSession, *, now: datetime) -> None: + """Backfill ``next_fire_at`` for enabled schedule triggers missing it.""" + stmt = ( + select(AutomationTrigger) + .where( + AutomationTrigger.type == TriggerType.SCHEDULE, + AutomationTrigger.enabled.is_(True), + AutomationTrigger.next_fire_at.is_(None), + ) + .limit(_TICK_BATCH) + ) + triggers = (await session.execute(stmt)).scalars().all() + if not triggers: + return + + for trigger in triggers: + try: + trigger.next_fire_at = compute_next_fire_at( + trigger.params["cron"], + trigger.params["timezone"], + after=now, + ) + except (InvalidCronError, KeyError, TypeError) as exc: + logger.warning( + "automation_trigger %d has invalid schedule params, disabling: %s", + trigger.id, + exc, + ) + trigger.enabled = False + + await session.commit() + + +async def _claim_due_triggers(session: AsyncSession, *, now: datetime) -> list[_Claim]: + """Lock and advance due rows; return per-trigger fire context.""" + stmt = ( + select(AutomationTrigger) + .where( + AutomationTrigger.type == TriggerType.SCHEDULE, + AutomationTrigger.enabled.is_(True), + AutomationTrigger.next_fire_at.isnot(None), + AutomationTrigger.next_fire_at <= now, + ) + .order_by(AutomationTrigger.next_fire_at) + .limit(_TICK_BATCH) + .with_for_update(skip_locked=True) + ) + triggers = (await session.execute(stmt)).scalars().all() + if not triggers: + return [] + + claims: list[_Claim] = [] + for trigger in triggers: + # Snapshot fire-context BEFORE we advance the row. + scheduled_for = trigger.next_fire_at + previous_last_fired_at = trigger.last_fired_at + + try: + trigger.next_fire_at = compute_next_fire_at( + trigger.params["cron"], + trigger.params["timezone"], + after=now, + ) + except (InvalidCronError, KeyError, TypeError) as exc: + logger.warning( + "automation_trigger %d has invalid schedule params, disabling: %s", + trigger.id, + exc, + ) + trigger.enabled = False + continue + + trigger.last_fired_at = now + claims.append( + _Claim( + trigger_id=trigger.id, + scheduled_for=scheduled_for, + previous_last_fired_at=previous_last_fired_at, + ) + ) + + await session.commit() + return claims + + +async def _start_one( + session: AsyncSession, *, claim: _Claim, fired_at: datetime +) -> None: + """Reload the trigger post-commit and start a run for it.""" + trigger = await session.get(AutomationTrigger, claim.trigger_id) + if trigger is None: + return + + try: + run = await start_run( + session=session, + trigger=trigger, + runtime_inputs=schedule_runtime_inputs( + fired_at=fired_at, + scheduled_for=claim.scheduled_for, + previous_last_fired_at=claim.previous_last_fired_at, + ), + ) + logger.info( + "scheduled fire: trigger=%d automation=%d run=%d", + claim.trigger_id, + trigger.automation_id, + run.id, + ) + except Exception: + logger.exception( + "scheduled fire failed for trigger %d (next attempt at next match)", + claim.trigger_id, + ) + await session.rollback() diff --git a/surfsense_backend/app/automations/triggers/builtin/schedule/source.py b/surfsense_backend/app/automations/triggers/builtin/schedule/source.py new file mode 100644 index 000000000..997c17562 --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/schedule/source.py @@ -0,0 +1,20 @@ +"""Schedule trigger source: Celery Beat ticks the selector every minute. + +``BEAT_SCHEDULE`` is merged into ``celery_app.conf.beat_schedule``. Per-row cron +math is precomputed (the ``next_fire_at`` column), so each tick is an indexed +lookup rather than N cron evaluations. +""" + +from __future__ import annotations + +from celery.schedules import crontab + +TASK_NAME = "automation_schedule_select" + +BEAT_SCHEDULE = { + "automation-schedule-select": { + "task": TASK_NAME, + "schedule": crontab(minute="*"), + "options": {"expires": 50}, + }, +}