mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-29 19:35:20 +02:00
65 lines
2.2 KiB
Python
65 lines
2.2 KiB
Python
"""``AutomationService`` — orchestration for the ``Automation`` resource."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from fastapi import Depends, HTTPException
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.automations.dispatch import DispatchError
|
|
from app.automations.persistence.models.automation import Automation
|
|
from app.automations.persistence.models.run import AutomationRun
|
|
from app.automations.triggers.manual import dispatch_manual_run
|
|
from app.db import Permission, User, get_async_session
|
|
from app.users import current_active_user
|
|
from app.utils.rbac import check_permission
|
|
|
|
|
|
class AutomationService:
|
|
"""Service for the ``Automation`` resource."""
|
|
|
|
def __init__(self, *, session: AsyncSession, user: User) -> None:
|
|
self.session = session
|
|
self.user = user
|
|
|
|
async def run_now(
|
|
self,
|
|
*,
|
|
automation_id: int,
|
|
payload: dict[str, Any] | None,
|
|
) -> AutomationRun:
|
|
"""Fire a manual run for ``automation_id``."""
|
|
automation = await self._get_automation_or_raise(automation_id)
|
|
await check_permission(
|
|
self.session,
|
|
self.user,
|
|
automation.search_space_id,
|
|
Permission.AUTOMATIONS_EXECUTE.value,
|
|
"You don't have permission to execute automations in this search space",
|
|
)
|
|
|
|
try:
|
|
return await dispatch_manual_run(
|
|
session=self.session,
|
|
automation_id=automation_id,
|
|
payload=payload,
|
|
)
|
|
except DispatchError as exc:
|
|
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
|
|
|
async def _get_automation_or_raise(self, automation_id: int) -> Automation:
|
|
"""Get the automation by id; 404 if missing."""
|
|
automation = await self.session.get(Automation, automation_id)
|
|
if automation is None:
|
|
raise HTTPException(
|
|
status_code=404, detail=f"automation {automation_id} not found"
|
|
)
|
|
return automation
|
|
|
|
|
|
def get_automation_service(
|
|
session: AsyncSession = Depends(get_async_session),
|
|
user: User = Depends(current_active_user),
|
|
) -> AutomationService:
|
|
return AutomationService(session=session, user=user)
|