mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-29 19:35:20 +02:00
feat(automation): add automation run executor
This commit is contained in:
parent
0a329e5a69
commit
d3cda12191
1 changed files with 105 additions and 0 deletions
105
surfsense_backend/app/automations/runtime/executor.py
Normal file
105
surfsense_backend/app/automations/runtime/executor.py
Normal file
|
|
@ -0,0 +1,105 @@
|
||||||
|
"""Walk an ``AutomationRun``'s snapshot plan to terminal state."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from app.automations.persistence.enums.run_status import RunStatus
|
||||||
|
from app.automations.persistence.models.run import AutomationRun
|
||||||
|
from app.automations.schemas.definition.envelope import AutomationDefinition
|
||||||
|
from app.automations.templating import build_run_context
|
||||||
|
|
||||||
|
from . import repository
|
||||||
|
from .step import execute_step
|
||||||
|
|
||||||
|
|
||||||
|
async def execute_run(session: AsyncSession, run_id: int) -> None:
|
||||||
|
"""Load run ``run_id`` and execute its snapshot plan to a terminal state."""
|
||||||
|
run = await repository.load_run(session, run_id)
|
||||||
|
if run is None:
|
||||||
|
raise ValueError(f"automation_run {run_id} not found")
|
||||||
|
|
||||||
|
if run.status != RunStatus.PENDING:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
definition = AutomationDefinition.model_validate(run.definition_snapshot)
|
||||||
|
except Exception as exc:
|
||||||
|
await repository.mark_failed(
|
||||||
|
session,
|
||||||
|
run,
|
||||||
|
{"message": f"definition_snapshot invalid: {exc}", "type": type(exc).__name__},
|
||||||
|
)
|
||||||
|
await session.commit()
|
||||||
|
return
|
||||||
|
|
||||||
|
await repository.mark_running(session, run)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
step_outputs: dict[str, Any] = {}
|
||||||
|
|
||||||
|
for step in definition.plan:
|
||||||
|
ctx = _build_ctx(run, step_outputs)
|
||||||
|
result = await execute_step(
|
||||||
|
step=step,
|
||||||
|
template_context=ctx,
|
||||||
|
default_max_retries=definition.execution.max_retries,
|
||||||
|
default_retry_backoff=definition.execution.retry_backoff,
|
||||||
|
default_timeout_seconds=definition.execution.timeout_seconds,
|
||||||
|
)
|
||||||
|
await repository.append_step_result(session, run, result)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
if result["status"] == "failed":
|
||||||
|
await _run_on_failure(session, run, definition)
|
||||||
|
await repository.mark_failed(session, run, result.get("error"))
|
||||||
|
await session.commit()
|
||||||
|
return
|
||||||
|
|
||||||
|
if result["status"] == "succeeded":
|
||||||
|
step_outputs[step.output_as or step.step_id] = result.get("result")
|
||||||
|
|
||||||
|
await repository.mark_succeeded(session, run)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_on_failure(
|
||||||
|
session: AsyncSession,
|
||||||
|
run: AutomationRun,
|
||||||
|
definition: AutomationDefinition,
|
||||||
|
) -> None:
|
||||||
|
"""Run the on_failure steps. Their failures don't recurse into more on_failure."""
|
||||||
|
if not definition.execution.on_failure:
|
||||||
|
return
|
||||||
|
ctx = _build_ctx(run, step_outputs={})
|
||||||
|
for step in definition.execution.on_failure:
|
||||||
|
result = await execute_step(
|
||||||
|
step=step,
|
||||||
|
template_context=ctx,
|
||||||
|
default_max_retries=definition.execution.max_retries,
|
||||||
|
default_retry_backoff=definition.execution.retry_backoff,
|
||||||
|
default_timeout_seconds=definition.execution.timeout_seconds,
|
||||||
|
)
|
||||||
|
await repository.append_step_result(session, run, result)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def _build_ctx(run: AutomationRun, step_outputs: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
automation = run.automation
|
||||||
|
trigger = run.trigger
|
||||||
|
return build_run_context(
|
||||||
|
run_id=run.id,
|
||||||
|
automation_id=run.automation_id,
|
||||||
|
automation_name=automation.name if automation else None,
|
||||||
|
automation_version=automation.version if automation else None,
|
||||||
|
search_space_id=automation.search_space_id if automation else None,
|
||||||
|
creator_id=automation.created_by_user_id if automation else None,
|
||||||
|
trigger_id=run.trigger_id,
|
||||||
|
trigger_type=trigger.type.value if trigger else None,
|
||||||
|
started_at=run.started_at,
|
||||||
|
attempt=1,
|
||||||
|
resolved_inputs=run.resolved_inputs or {},
|
||||||
|
step_outputs=step_outputs,
|
||||||
|
)
|
||||||
Loading…
Add table
Add a link
Reference in a new issue