refactor(automations): vertical-slice actions and triggers by domain

This commit is contained in:
CREDO23 2026-05-27 17:07:20 +02:00
parent ce45e11009
commit 8c32455818
24 changed files with 86 additions and 108 deletions

View file

@ -1 +1,24 @@
"""Action implementations. One subpackage per built-in action type."""
"""Actions domain: registry surface + built-in action packages.
Each action lives in its own subpackage (``agent_task/``, ...) and self-registers
at import time via its ``definition`` module. Side-effect imports below ensure
the registry is populated whenever anyone touches the actions package.
"""
from __future__ import annotations
from .store import all_actions, get_action, register_action
from .types import ActionContext, ActionDefinition, ActionHandler, ActionHandlerFactory
__all__ = [
"ActionContext",
"ActionDefinition",
"ActionHandler",
"ActionHandlerFactory",
"all_actions",
"get_action",
"register_action",
]
# Built-in actions self-register at import time.
from . import agent_task # noqa: E402, F401

View file

@ -1,7 +1,15 @@
"""``agent_task`` action: spin up multi_agent_chat for one rendered query."""
"""``agent_task`` action: spin up multi_agent_chat for one rendered query.
Imports ``definition`` for its side-effect (self-registration on the actions
registry) and re-exports ``build_handler`` for direct consumers.
"""
from __future__ import annotations
from .factory import build_handler
from .params import AgentTaskActionParams
__all__ = ["build_handler"]
__all__ = ["AgentTaskActionParams", "build_handler"]
# Side-effect: register on the actions store.
from . import definition # noqa: E402, F401

View file

@ -0,0 +1,18 @@
"""``agent_task`` ``ActionDefinition`` registration."""
from __future__ import annotations
from ..store import register_action
from ..types import ActionDefinition
from .factory import build_handler
from .params import AgentTaskActionParams
AGENT_TASK_ACTION = ActionDefinition(
type="agent_task",
name="Agent task",
description="Run a multi_agent_chat turn from an automation step.",
params_schema=AgentTaskActionParams.model_json_schema(),
build_handler=build_handler,
)
register_action(AGENT_TASK_ACTION)

View file

@ -4,13 +4,9 @@ from __future__ import annotations
from typing import Any
from app.automations.registries.actions.types import (
ActionContext,
ActionHandler,
)
from app.automations.schemas.actions import AgentTaskActionParams
from ..types import ActionContext, ActionHandler
from .invoke import run_agent_task
from .params import AgentTaskActionParams
def build_handler(ctx: ActionContext) -> ActionHandler:

View file

@ -10,9 +10,10 @@ from langchain_core.messages import HumanMessage
from langgraph.types import Command
from app.agents.multi_agent_chat import create_multi_agent_chat_deep_agent
from app.automations.registries.actions.types import ActionContext
from app.db import ChatVisibility, async_session_maker
from ..types import ActionContext
from .auto_decide import build_auto_decisions
from .dependencies import build_dependencies
from .finalize import extract_final_assistant_message

View file

@ -0,0 +1,21 @@
"""``AgentTaskActionParams`` — params for the ``agent_task`` action type."""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
class AgentTaskActionParams(BaseModel):
"""Run a multi_agent_chat turn from an automation step."""
model_config = ConfigDict(extra="forbid")
query: str = Field(
...,
min_length=1,
description="User query for the agent; rendered at execute time.",
)
auto_approve_all: bool = Field(
default=False,
description="If true, every HITL approval is auto-approved; otherwise rejected.",
)

View file

@ -0,0 +1,23 @@
"""In-memory action registry. Populated once at process startup."""
from __future__ import annotations
from .types import ActionDefinition
_REGISTRY: dict[str, ActionDefinition] = {}
def register_action(action: ActionDefinition) -> None:
"""Register an action. Raises on duplicate type."""
if action.type in _REGISTRY:
raise ValueError(f"Action already registered: {action.type!r}")
_REGISTRY[action.type] = action
def get_action(action_type: str) -> ActionDefinition | None:
return _REGISTRY.get(action_type)
def all_actions() -> dict[str, ActionDefinition]:
"""Defensive snapshot of the registry."""
return dict(_REGISTRY)

View file

@ -0,0 +1,34 @@
"""``ActionDefinition``, ``ActionContext``, and handler/factory signatures."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
@dataclass(frozen=True, slots=True)
class ActionContext:
"""Per-invocation dependencies bound to an action handler at execute time."""
session: AsyncSession
run_id: int
step_id: str
search_space_id: int
creator_user_id: UUID | None
ActionHandler = Callable[[dict[str, Any]], Awaitable[Any]]
ActionHandlerFactory = Callable[[ActionContext], ActionHandler]
@dataclass(frozen=True, slots=True)
class ActionDefinition:
type: str
name: str
description: str
params_schema: dict[str, Any]
build_handler: ActionHandlerFactory