chore(automation): trim docstrings to intent only

Cut the docstrings and Field(description=...) text across the entire
automations/ tree down to single-line intent statements, matching the
multi_agent_chat conciseness style:

- Module docstrings: one line stating what the file is.
- Class docstrings: deleted when the class name + module docstring
  already cover intent; kept only where they add a constraint or
  rationale not visible in the signature.
- Pydantic Field descriptions: short noun phrases / clauses, not
  full sentences. Reasoning that belonged in the design plan moved
  out of the code.
- Enum values: per-value docstrings replaced with terse inline
  comments where the meaning isn't obvious from the name.

Behaviour is unchanged. The same 33 files, same public surface, same
imports — verified by re-running the 10-point registry smoke test and
the 8-point schema round-trip / constraint suite from commits 9 and
10.

LOC: 1180 → 691 (-42%).
This commit is contained in:
CREDO23 2026-05-26 23:01:22 +02:00
parent 7a96c0e29c
commit f0e00bd3ee
33 changed files with 80 additions and 568 deletions

View file

@ -1,4 +1,4 @@
"""Automations: scheduled / triggered runs of capabilities — see automation-design-plan.md.""" """Automations engine — see automation-design-plan.md."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""Persistence layer: SQLAlchemy enums under ``enums/`` and models under ``models/``.""" """SQLAlchemy models and enums for the automation tables."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""SQLAlchemy / Python enums backing the three automation tables.""" """Enums for the automation tables."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""``AutomationStatus`` — lifecycle of a stored automation definition.""" """Automation lifecycle status."""
from __future__ import annotations from __future__ import annotations
@ -6,13 +6,6 @@ from enum import StrEnum
class AutomationStatus(StrEnum): class AutomationStatus(StrEnum):
"""Status of an automation in the registry. ACTIVE = "active" # eligible to fire
PAUSED = "paused" # kept, but triggers don't fire
``active`` eligible to fire from its triggers. ARCHIVED = "archived" # read-only history
``paused`` definition retained, triggers do not fire.
``archived`` kept for run history only; no edits, no fires.
"""
ACTIVE = "active"
PAUSED = "paused"
ARCHIVED = "archived"

View file

@ -1,4 +1,4 @@
"""``RunStatus`` — the state machine of a single ``AutomationRun``.""" """AutomationRun state machine: pending → running → (succeeded|failed|cancelled|timed_out)."""
from __future__ import annotations from __future__ import annotations
@ -6,20 +6,6 @@ from enum import StrEnum
class RunStatus(StrEnum): class RunStatus(StrEnum):
"""Lifecycle states of an ``AutomationRun`` row.
Transitions are linear with three terminal branches:
pending running (succeeded | failed | cancelled | timed_out)
``pending`` row created, executor task enqueued, work not started.
``running`` executor has picked up the run.
``succeeded`` terminal: plan completed without error.
``failed`` terminal: at least one step raised an unrecoverable error.
``cancelled`` terminal: caller asked for cancellation.
``timed_out`` terminal: run exceeded its configured timeout.
"""
PENDING = "pending" PENDING = "pending"
RUNNING = "running" RUNNING = "running"
SUCCEEDED = "succeeded" SUCCEEDED = "succeeded"

View file

@ -1,4 +1,4 @@
"""``TriggerType`` — the trigger-kind discriminator (v1 = schedule, manual).""" """Trigger-kind discriminator. v1: schedule | manual; webhook/event in Phase 2/3."""
from __future__ import annotations from __future__ import annotations
@ -6,16 +6,5 @@ from enum import StrEnum
class TriggerType(StrEnum): class TriggerType(StrEnum):
"""Kind of trigger an ``AutomationTrigger`` row represents.
v1 ships two kinds:
``schedule`` fires on a cron expression managed by Celery Beat.
``manual`` fires on demand from the UI's "Run now" affordance.
``webhook`` and ``event`` are deferred to Phase 2 and Phase 3
respectively; adding them is an enum-value extension only.
"""
SCHEDULE = "schedule" SCHEDULE = "schedule"
MANUAL = "manual" MANUAL = "manual"

View file

@ -1,4 +1,4 @@
"""SQLAlchemy models: one file per table (``automation.py``, ``trigger.py``, ``run.py``).""" """SQLAlchemy models, one per table."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""``Automation`` table — the editable, versioned automation definition.""" """``automations`` table — editable, versioned automation definition."""
from __future__ import annotations from __future__ import annotations
@ -21,15 +21,6 @@ from ..enums.automation_status import AutomationStatus
class Automation(BaseModel, TimestampMixin): class Automation(BaseModel, TimestampMixin):
"""The editable, versioned spec a user authors.
The ``definition`` JSON is what the user (or the NL generator) writes
and edits. Each save bumps ``version`` by one; the previous JSON is
not kept in this row version history is reconstructed from the
``definition_snapshot`` column on every ``AutomationRun`` that fired
against a given version.
"""
__tablename__ = "automations" __tablename__ = "automations"
search_space_id = Column( search_space_id = Column(
@ -59,12 +50,7 @@ class Automation(BaseModel, TimestampMixin):
definition = Column(JSONB, nullable=False) definition = Column(JSONB, nullable=False)
version = Column( version = Column(Integer, nullable=False, default=1, server_default="1")
Integer,
nullable=False,
default=1,
server_default="1",
)
updated_at = Column( updated_at = Column(
TIMESTAMP(timezone=True), TIMESTAMP(timezone=True),

View file

@ -1,4 +1,4 @@
"""``AutomationRun`` table — the immutable per-fire execution record.""" """``automation_runs`` table — immutable per-fire execution record."""
from __future__ import annotations from __future__ import annotations
@ -18,16 +18,6 @@ from ..enums.run_status import RunStatus
class AutomationRun(BaseModel, TimestampMixin): class AutomationRun(BaseModel, TimestampMixin):
"""One execution of an automation.
Every fire of any trigger inserts exactly one row here. The row is
immutable from the user's perspective — the executor only updates
``status``, ``step_results``, ``output``, ``artifacts``, ``error``,
``started_at``, ``finished_at`` as the run progresses; the
``definition_snapshot`` is locked at fire time so the user can always
see exactly what code path executed for any historical run.
"""
__tablename__ = "automation_runs" __tablename__ = "automation_runs"
automation_id = Column( automation_id = Column(
@ -52,18 +42,14 @@ class AutomationRun(BaseModel, TimestampMixin):
index=True, index=True,
) )
# locked at fire time so historical runs always show the exact code path
definition_snapshot = Column(JSONB, nullable=False) definition_snapshot = Column(JSONB, nullable=False)
trigger_payload = Column(JSONB, nullable=True) trigger_payload = Column(JSONB, nullable=True)
resolved_inputs = Column(JSONB, nullable=False, server_default="{}") resolved_inputs = Column(JSONB, nullable=False, server_default="{}")
step_results = Column(JSONB, nullable=False, server_default="[]") step_results = Column(JSONB, nullable=False, server_default="[]")
output = Column(JSONB, nullable=True) output = Column(JSONB, nullable=True)
artifacts = Column(JSONB, nullable=False, server_default="[]") artifacts = Column(JSONB, nullable=False, server_default="[]")
error = Column(JSONB, nullable=True) error = Column(JSONB, nullable=True)
started_at = Column(TIMESTAMP(timezone=True), nullable=True) started_at = Column(TIMESTAMP(timezone=True), nullable=True)

View file

@ -1,4 +1,4 @@
"""``AutomationTrigger`` table — one row per (automation, trigger-instance) pair.""" """``automation_triggers`` table — one row per (automation, trigger-instance) pair."""
from __future__ import annotations from __future__ import annotations
@ -18,14 +18,6 @@ from ..enums.trigger_type import TriggerType
class AutomationTrigger(BaseModel, TimestampMixin): class AutomationTrigger(BaseModel, TimestampMixin):
"""One trigger attached to an automation.
An automation may have multiple triggers e.g. a ``schedule`` trigger
for the autonomous path and a ``manual`` trigger backing the UI's
"Run now" affordance. Each trigger's ``config`` is validated against
the registered ``TriggerDefinition.config_schema`` for its ``type``.
"""
__tablename__ = "automation_triggers" __tablename__ = "automation_triggers"
automation_id = Column( automation_id = Column(
@ -51,7 +43,4 @@ class AutomationTrigger(BaseModel, TimestampMixin):
index=True, index=True,
) )
last_fired_at = Column( last_fired_at = Column(TIMESTAMP(timezone=True), nullable=True)
TIMESTAMP(timezone=True),
nullable=True,
)

View file

@ -1,4 +1,4 @@
"""Three registries — ``capabilities/``, ``actions/``, ``triggers/`` — populated at import time.""" """Capability, action, and trigger registries — populated at process startup."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""Action registry: ``types.py`` (dataclass), ``store.py`` (dict + register fn).""" """Action registry."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""Action registry: in-memory dict + ``register_action`` API.""" """In-memory action registry. Populated once at process startup."""
from __future__ import annotations from __future__ import annotations
@ -8,26 +8,16 @@ _REGISTRY: dict[str, ActionDefinition] = {}
def register_action(action: ActionDefinition) -> None: def register_action(action: ActionDefinition) -> None:
"""Add an action to the in-memory registry. """Register an action. Raises on duplicate type."""
Raises ``ValueError`` on duplicate ``type`` registration runs
once per process, so a duplicate is always a bug.
"""
if action.type in _REGISTRY: if action.type in _REGISTRY:
raise ValueError( raise ValueError(f"Action already registered: {action.type!r}")
f"Action already registered: {action.type!r}"
)
_REGISTRY[action.type] = action _REGISTRY[action.type] = action
def get_action(action_type: str) -> ActionDefinition | None: def get_action(action_type: str) -> ActionDefinition | None:
"""Look up one action by type. Returns ``None`` on miss."""
return _REGISTRY.get(action_type) return _REGISTRY.get(action_type)
def all_actions() -> dict[str, ActionDefinition]: def all_actions() -> dict[str, ActionDefinition]:
"""Snapshot of the registry as a defensive copy.""" """Defensive snapshot of the registry."""
return dict(_REGISTRY) return dict(_REGISTRY)

View file

@ -1,4 +1,4 @@
"""``ActionDefinition`` dataclass — the v1-minimum action shape.""" """``ActionDefinition`` dataclass and handler signature."""
from __future__ import annotations from __future__ import annotations
@ -7,36 +7,10 @@ from dataclasses import dataclass
from typing import Any from typing import Any
ActionHandler = Callable[[dict[str, Any]], Awaitable[Any]] ActionHandler = Callable[[dict[str, Any]], Awaitable[Any]]
"""The signature every action handler must satisfy.
Identical in shape to ``CapabilityHandler`` both receive a
caller-validated input dict and return an arbitrary output. The
distinction is purely architectural: capabilities are the low-level
"what SurfSense can do" surface, actions are the user-facing
building blocks composed into a plan.
"""
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class ActionDefinition: class ActionDefinition:
"""A user-facing step type the plan editor can compose.
v1 trims the dataclass to the five fields necessary for
registry dispatch and form rendering. The full design (§4)
includes ``output_contract``, ``uses_capabilities``, and
``produces_artifacts``; all three are deferred until a consumer
feature requires them:
- ``output_contract`` the loose ``agent_task`` action declares
its output shape per-step via ``config.output_schema``, so the
action-level contract is not needed in v1.
- ``uses_capabilities`` would let the NL generator do static
analysis of which capabilities each action invokes; deferred
because v1 ships a single (``agent_task``) action.
- ``produces_artifacts`` deferred alongside the artifact
pipeline (see §13 decision 26).
"""
type: str type: str
name: str name: str
description: str description: str

View file

@ -1,4 +1,4 @@
"""Capability registry: ``types.py`` (dataclass), ``store.py`` (dict + register fn).""" """Capability registry."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""Capability registry: in-memory dict + ``register_capability`` API.""" """In-memory capability registry. Populated once at process startup."""
from __future__ import annotations from __future__ import annotations
@ -8,33 +8,16 @@ _REGISTRY: dict[str, Capability] = {}
def register_capability(capability: Capability) -> None: def register_capability(capability: Capability) -> None:
"""Add a capability to the in-memory registry. """Register a capability. Raises on duplicate id."""
Raises ``ValueError`` on duplicate ``id`` registration is
idempotent only at the module level (a module's
``register_capability`` call runs once per process), so a
duplicate is always a bug.
"""
if capability.id in _REGISTRY: if capability.id in _REGISTRY:
raise ValueError( raise ValueError(f"Capability already registered: {capability.id!r}")
f"Capability already registered: {capability.id!r}"
)
_REGISTRY[capability.id] = capability _REGISTRY[capability.id] = capability
def get_capability(capability_id: str) -> Capability | None: def get_capability(capability_id: str) -> Capability | None:
"""Look up one capability by id. Returns ``None`` on miss."""
return _REGISTRY.get(capability_id) return _REGISTRY.get(capability_id)
def all_capabilities() -> dict[str, Capability]: def all_capabilities() -> dict[str, Capability]:
"""Snapshot of the registry as a defensive copy. """Defensive snapshot of the registry."""
Returned dict is safe to iterate while other code calls
``register_capability`` (which v1 never does post-startup, but
the contract holds anyway).
"""
return dict(_REGISTRY) return dict(_REGISTRY)

View file

@ -1,4 +1,4 @@
"""``Capability`` dataclass — the v1-minimum five-field shape.""" """``Capability`` dataclass and handler signature. Locked at five fields for v1."""
from __future__ import annotations from __future__ import annotations
@ -7,32 +7,10 @@ from dataclasses import dataclass
from typing import Any from typing import Any
CapabilityHandler = Callable[[dict[str, Any]], Awaitable[Any]] CapabilityHandler = Callable[[dict[str, Any]], Awaitable[Any]]
"""The signature every capability handler must satisfy.
The handler is a closure that already holds whatever runtime context
it needs (DB session, search-space scope, logger, etc.). The
registry only passes through the caller's input dict — the same dict
that was validated against ``input_schema``.
"""
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class Capability: class Capability:
"""The unit of "what SurfSense can do," consumed by every layer.
v1 keeps the dataclass to exactly five fields. Earlier drafts
considered ``name``, ``required_credentials``, ``side_effects``,
``expected_duration_seconds``, and ``cost_estimate``; every one
of those has been removed until a concrete consumer feature
requires it (see ``automation-design-plan.md`` §3, decision v1).
The handler is a ready-to-call function. It does not receive a
context argument context is bound at registration time by the
factory that builds the closure (so a capability returned to an
agent's tool list looks identical to one returned to an
automation's action runtime).
"""
id: str id: str
description: str description: str
input_schema: dict[str, Any] input_schema: dict[str, Any]

View file

@ -1,4 +1,4 @@
"""Trigger registry: ``types.py`` (dataclass), ``store.py`` (dict + register fn).""" """Trigger registry."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""Trigger registry: in-memory dict + ``register_trigger`` API.""" """In-memory trigger registry. Populated once at process startup."""
from __future__ import annotations from __future__ import annotations
@ -8,26 +8,16 @@ _REGISTRY: dict[str, TriggerDefinition] = {}
def register_trigger(trigger: TriggerDefinition) -> None: def register_trigger(trigger: TriggerDefinition) -> None:
"""Add a trigger to the in-memory registry. """Register a trigger. Raises on duplicate type."""
Raises ``ValueError`` on duplicate ``type`` registration runs
once per process, so a duplicate is always a bug.
"""
if trigger.type in _REGISTRY: if trigger.type in _REGISTRY:
raise ValueError( raise ValueError(f"Trigger already registered: {trigger.type!r}")
f"Trigger already registered: {trigger.type!r}"
)
_REGISTRY[trigger.type] = trigger _REGISTRY[trigger.type] = trigger
def get_trigger(trigger_type: str) -> TriggerDefinition | None: def get_trigger(trigger_type: str) -> TriggerDefinition | None:
"""Look up one trigger by type. Returns ``None`` on miss."""
return _REGISTRY.get(trigger_type) return _REGISTRY.get(trigger_type)
def all_triggers() -> dict[str, TriggerDefinition]: def all_triggers() -> dict[str, TriggerDefinition]:
"""Snapshot of the registry as a defensive copy.""" """Defensive snapshot of the registry."""
return dict(_REGISTRY) return dict(_REGISTRY)

View file

@ -1,4 +1,4 @@
"""``TriggerDefinition`` dataclass — declarative trigger metadata, no handler.""" """``TriggerDefinition`` dataclass. Declarative; firing is the dispatcher's job."""
from __future__ import annotations from __future__ import annotations
@ -8,27 +8,6 @@ from typing import Any
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class TriggerDefinition: class TriggerDefinition:
"""A trigger type the dispatcher knows how to fire.
Triggers are purely declarative: the dispatcher (a single
process-wide component, not a per-type handler) reads the
``automation_triggers`` table and decides when each row should
fire. The trigger's job here is to declare its input/output
contract:
- ``config_schema``: JSON Schema for the persisted
``AutomationTrigger.config`` used by the form editor and
validated on save.
- ``payload_schema``: JSON Schema for the payload the dispatcher
will deliver to the executor at fire time (e.g., a schedule
trigger emits ``fired_at`` / ``scheduled_for`` /
``last_fired_at``).
No ``handler`` field firing is a dispatcher responsibility,
not a per-trigger one. This keeps the dispatcher single and
leaves trigger types as pure metadata.
"""
type: str type: str
description: str description: str
config_schema: dict[str, Any] config_schema: dict[str, Any]

View file

@ -1,4 +1,4 @@
"""Pydantic schemas: definition envelope, trigger configs, action configs.""" """Pydantic schemas for the automation definition and per-type configs."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""Per-action config schemas: one file per action type registered in v1.""" """Per-action config schemas, one per action type."""
from __future__ import annotations from __future__ import annotations

View file

@ -8,59 +8,20 @@ from pydantic import BaseModel, ConfigDict, Field
class AgentTaskActionConfig(BaseModel): class AgentTaskActionConfig(BaseModel):
"""Config for an ``agent_task`` plan step. """Run a LangGraph Deep Agent restricted to a scoped capability list."""
Validated against ``PlanStep.config`` whenever the step's
``action`` is ``agent_task``. The step instructs the LangGraph
Deep Agent runtime to:
1. Receive ``prompt`` (with all preceding-step outputs and inputs
already rendered by the template engine).
2. Run the agent with access to *exactly* the capabilities named
in ``tools`` nothing else from the registry is visible to
this agent invocation.
3. Return a JSON object matching ``output_schema`` (recommended;
the executor validates and re-prompts on mismatch).
``output_schema`` is the design's "dynamic output contract"
instead of locking the output shape on the ActionDefinition (as
tight actions do), the user declares the shape they want for this
specific step, and the agent has to match it.
"""
model_config = ConfigDict(extra="forbid") model_config = ConfigDict(extra="forbid")
prompt: str = Field( prompt: str = Field(..., min_length=1, description="Task prompt; Jinja-rendered.")
...,
description=(
"The task prompt rendered through the Jinja sandbox. May "
"reference automation inputs and prior-step outputs."
),
min_length=1,
)
tools: list[str] = Field( tools: list[str] = Field(
default_factory=list, default_factory=list,
description=( description="Capability IDs the agent may call. Empty = no tool access.",
"Allowlist of capability IDs the agent may call (e.g., "
"'search_space.query'). Empty list = no tool access; the "
"agent must answer from the prompt alone."
),
) )
model: str | None = Field( model: str | None = Field(
default=None, default=None,
description=( description="LiteLLM model id. Defaults to the search space's agent_llm_id.",
"Optional LiteLLM model identifier (e.g., "
"'anthropic/claude-sonnet-4-7'). Omitted means the "
"automation falls back to the search space's default "
"agent_llm_id."
),
) )
output_schema: dict[str, Any] | None = Field( output_schema: dict[str, Any] | None = Field(
default=None, default=None,
description=( description="JSON Schema the agent must return. Recommended.",
"Optional JSON Schema declaring the shape the agent must "
"return. Strongly recommended; the editor warns when "
"missing. Validated by the executor before binding to "
"``output_as``."
),
) )

View file

@ -1,4 +1,4 @@
"""Automation definition envelope: the editable structured spec users author and run.""" """Automation definition envelope and its building blocks."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""``AutomationDefinition`` — the top-level envelope persisted in ``automations.definition``.""" """``AutomationDefinition`` — top-level envelope persisted in ``automations.definition``."""
from __future__ import annotations from __future__ import annotations
@ -12,78 +12,15 @@ from .trigger_spec import TriggerSpec
class AutomationDefinition(BaseModel): class AutomationDefinition(BaseModel):
"""The top-level JSON shape stored in ``automations.definition``. """Top-level shape of an automation. See automation-design-plan.md §5."""
This is the editable spec a user authors (or the NL generator
produces). The envelope is structural only every nested
discriminator (``triggers[].type``, ``plan[].action``) is resolved
against the registries at validation time, so adding a new
trigger or action type does not require touching this schema.
See ``automation-design-plan.md`` §5 for the worked example and
rationale.
"""
model_config = ConfigDict(extra="forbid") model_config = ConfigDict(extra="forbid")
schema_version: str = Field( schema_version: str = "1.0"
default="1.0", name: str = Field(..., min_length=1, max_length=200)
description=( goal: str | None = None
"Schema version of the envelope itself. Migrations bump " inputs: InputsBlock | None = None
"this when the envelope shape changes; nested per-type " triggers: list[TriggerSpec] = Field(default_factory=list)
"configs evolve independently via the registries." plan: list[PlanStep] = Field(..., min_length=1)
), execution: ExecutionBlock = Field(default_factory=ExecutionBlock)
) metadata: MetadataBlock = Field(default_factory=MetadataBlock)
name: str = Field(
...,
description="Short, user-facing name shown in lists.",
min_length=1,
max_length=200,
)
goal: str | None = Field(
default=None,
description=(
"Optional plain-language statement of what the "
"automation is for. Used by the NL generator's review "
"pass and by the UI's run dialog."
),
)
inputs: InputsBlock | None = Field(
default=None,
description=(
"Optional input contract. When omitted, the automation "
"accepts no inputs at fire time."
),
)
triggers: list[TriggerSpec] = Field(
default_factory=list,
description=(
"Triggers that fire this automation. Empty list means "
"the automation is only runnable via the manual "
"``Run now`` path."
),
)
plan: list[PlanStep] = Field(
...,
description=(
"Ordered sequence of steps. Executed in array order — "
"no parallelism, no DAGs, no loops at the envelope "
"level."
),
min_length=1,
)
execution: ExecutionBlock = Field(
default_factory=ExecutionBlock,
description=(
"Execution defaults (timeouts, retries, concurrency, "
"budget). All fields default to safe values; the block "
"may be omitted entirely."
),
)
metadata: MetadataBlock = Field(
default_factory=MetadataBlock,
description=(
"Free-form metadata (tags, NL-generator breadcrumbs, "
"UI annotations). Tolerates unknown keys by design."
),
)

View file

@ -1,4 +1,4 @@
"""``ExecutionBlock`` — the ``execution`` section of the automation definition.""" """``ExecutionBlock`` — automation-wide execution defaults (overridable per step)."""
from __future__ import annotations from __future__ import annotations
@ -10,67 +10,16 @@ from .plan_step import PlanStep
class ExecutionBlock(BaseModel): class ExecutionBlock(BaseModel):
"""The ``execution`` block of an ``AutomationDefinition``.
Carries automation-wide defaults that individual ``PlanStep``s
can override. Every field has a sane default so an automation
definition may omit the block entirely; in that case all defaults
apply.
``on_failure`` is a secondary plan that runs only when the main
``plan`` fails after retries exhaust. It uses the same
``PlanStep`` shape as the main plan and shares the same execution
semantics.
"""
model_config = ConfigDict(extra="forbid") model_config = ConfigDict(extra="forbid")
timeout_seconds: int = Field( timeout_seconds: int = Field(default=600, gt=0, description="Wall-clock cap for the run.")
default=600, max_retries: int = Field(default=2, ge=0, description="Per-step retry budget.")
gt=0, retry_backoff: Literal["exponential", "linear", "none"] = "exponential"
description=( concurrency: Literal["drop_if_running", "queue", "always"] = "drop_if_running"
"Hard wall-clock cap for the entire run. The executor "
"transitions the run to ``timed_out`` when this is "
"exceeded."
),
)
max_retries: int = Field(
default=2,
ge=0,
description=(
"Per-step retry budget applied when a step raises a "
"retryable error. Steps may override per-step."
),
)
retry_backoff: Literal["exponential", "linear", "none"] = Field(
default="exponential",
description="Backoff policy between retries.",
)
concurrency: Literal[
"drop_if_running", "queue", "always"
] = Field(
default="drop_if_running",
description=(
"Behaviour when a new fire arrives while a previous run "
"is still in progress. ``drop_if_running`` skips the new "
"fire, ``queue`` enqueues it, ``always`` runs it in "
"parallel."
),
)
budget_cap_usd: float | None = Field( budget_cap_usd: float | None = Field(
default=None, default=None, gt=0, description="Kill the run when accumulated cost exceeds this."
gt=0,
description=(
"Optional mid-flight cost cap in USD. The executor kills "
"the run when accumulated cost exceeds this value. v1 "
"treats this as an advisory because cost tracking lands "
"with the executor in a later step."
),
) )
on_failure: list[PlanStep] = Field( on_failure: list[PlanStep] = Field(
default_factory=list, default_factory=list,
description=( description="Steps run when the main plan fails after retries.",
"Secondary plan executed only when the main plan fails "
"after retries exhaust. Empty list means no fallback."
),
) )

View file

@ -1,4 +1,4 @@
"""``InputsBlock`` — the ``inputs`` section of the automation definition.""" """``InputsBlock`` — JSON Schema for inputs an automation accepts at fire time."""
from __future__ import annotations from __future__ import annotations
@ -8,23 +8,6 @@ from pydantic import BaseModel, ConfigDict, Field
class InputsBlock(BaseModel): class InputsBlock(BaseModel):
"""The ``inputs`` block of an ``AutomationDefinition``.
Holds a JSON Schema describing what data the automation accepts at
fire time. The same schema is used by:
- The form editor (to render the manual-run dialog).
- The dispatcher (to validate trigger payloads before enqueueing
executor work).
- The template engine (to expose ``{{ inputs.* }}`` references in
plan-step configs).
The ``schema`` value is the JSON-Schema dict itself, not a
Pydantic model automations express their input contract in pure
JSON Schema so it round-trips losslessly through the database and
the NL generator.
"""
model_config = ConfigDict( model_config = ConfigDict(
extra="forbid", extra="forbid",
populate_by_name=True, populate_by_name=True,
@ -34,10 +17,5 @@ class InputsBlock(BaseModel):
schema_: dict[str, Any] = Field( schema_: dict[str, Any] = Field(
..., ...,
alias="schema", alias="schema",
description=( description="JSON Schema (draft-07) for accepted inputs.",
"JSON Schema (draft-07 compatible) describing the inputs "
"this automation accepts. Properties may use the special "
"``$last_fired_at`` default literal to bind to the "
"trigger's last fire time."
),
) )

View file

@ -1,4 +1,4 @@
"""``MetadataBlock`` — the ``metadata`` section of the automation definition.""" """``MetadataBlock`` — free-form metadata on a definition. Extra keys allowed."""
from __future__ import annotations from __future__ import annotations
@ -6,31 +6,9 @@ from pydantic import BaseModel, ConfigDict, Field
class MetadataBlock(BaseModel): class MetadataBlock(BaseModel):
"""Free-form metadata attached to the automation definition.
Unlike the rest of the envelope this block tolerates unknown keys
(``extra='allow'``) it's a deliberate extension point for
UI annotations, NL-generator breadcrumbs, custom tags, etc.
Two fields are first-class so the rest of the system can rely on
them without reaching into the loose extras:
``tags`` used by the UI for filtering and grouping.
``created_from_nl`` set by the NL generator so we can later
measure how many runs came from natural-language authoring.
"""
model_config = ConfigDict(extra="allow") model_config = ConfigDict(extra="allow")
tags: list[str] = Field( tags: list[str] = Field(default_factory=list)
default_factory=list,
description="UI-facing tags. No semantic meaning to the engine.",
)
created_from_nl: bool = Field( created_from_nl: bool = Field(
default=False, default=False, description="True when produced by the NL generator."
description=(
"True when the definition was produced by the NL "
"generator (set automatically by the generator path; "
"human-authored definitions keep this false)."
),
) )

View file

@ -1,4 +1,4 @@
"""``PlanStep`` — one entry in the envelope's ``plan`` array.""" """``PlanStep`` — one step in the sequential plan."""
from __future__ import annotations from __future__ import annotations
@ -8,79 +8,21 @@ from pydantic import BaseModel, ConfigDict, Field
class PlanStep(BaseModel): class PlanStep(BaseModel):
"""One step in an automation's sequential plan.
Steps run in array order, no parallelism, no DAGs, no loops. The
``when`` Jinja expression provides conditional skip; branching is
achieved by ``when`` clauses on multiple steps. For looping or
parallel work, the user routes through ``agent_task`` and lets the
agent reason about it.
``config`` is dispatched against the action registry at
validation time its shape is determined by
``ActionDefinition.config_schema`` for the ``action`` value.
``output_as`` binds the step's typed output into the template
namespace for later steps, e.g. ``output_as: 'summary'`` then
``{{ summary.bullets }}`` in a downstream step's config.
"""
model_config = ConfigDict(extra="forbid") model_config = ConfigDict(extra="forbid")
step_id: str = Field( step_id: str = Field(..., min_length=1, description="Unique within the plan.")
..., action: str = Field(..., min_length=1, description="Action type; resolved via registry.")
description=(
"Unique-within-plan identifier. Used in run logs and as "
"the default for ``output_as`` when not provided."
),
min_length=1,
)
action: str = Field(
...,
description=(
"Action-type discriminator (e.g., ``agent_task``). "
"Resolved against the action registry."
),
min_length=1,
)
when: str | None = Field( when: str | None = Field(
default=None, default=None,
description=( description="Optional Jinja expression; step is skipped when falsy.",
"Optional Jinja expression evaluated against the run "
"context. Step is skipped when the expression is "
"falsy."
),
) )
config: dict[str, Any] = Field( config: dict[str, Any] = Field(
default_factory=dict, default_factory=dict,
description=( description="Action-type-specific config; Jinja-rendered at execute time.",
"Action-type-specific config. Validated against the "
"registered ``ActionDefinition.config_schema`` for "
"``action`` at definition-save time. Jinja templates "
"inside config are rendered at step-execute time."
),
) )
output_as: str | None = Field( output_as: str | None = Field(
default=None, default=None,
description=( description="Bind step output under this name. Defaults to step_id.",
"Name to bind the step output under for downstream "
"steps. Defaults to ``step_id`` when omitted."
),
)
max_retries: int | None = Field(
default=None,
ge=0,
description=(
"Per-step override of the automation-level ``max_retries``. "
"Omitted means inherit from execution block."
),
)
timeout_seconds: int | None = Field(
default=None,
gt=0,
description=(
"Per-step override of the automation-level "
"``timeout_seconds``. Omitted means inherit from "
"execution block."
),
) )
max_retries: int | None = Field(default=None, ge=0)
timeout_seconds: int | None = Field(default=None, gt=0)

View file

@ -1,4 +1,4 @@
"""``TriggerSpec`` — one entry in the envelope's ``triggers`` array.""" """``TriggerSpec`` — one entry in the definition's ``triggers[]`` array."""
from __future__ import annotations from __future__ import annotations
@ -8,33 +8,10 @@ from pydantic import BaseModel, ConfigDict, Field
class TriggerSpec(BaseModel): class TriggerSpec(BaseModel):
"""One trigger attached to an automation, as it appears in the definition.
The envelope keeps ``config`` as an untyped JSON object on purpose
the per-type config schemas live in
``app.automations.schemas.triggers`` and are dispatched at
validation time by looking up ``type`` in the trigger registry.
This mirrors the design's "definitions are pure data" principle:
the envelope describes shape, the registry resolves names to
behaviour.
"""
model_config = ConfigDict(extra="forbid") model_config = ConfigDict(extra="forbid")
type: str = Field( type: str = Field(..., min_length=1, description="Trigger type; resolved via registry.")
...,
description=(
"Trigger-type discriminator (e.g., ``schedule``, ``manual``). "
"Resolved against the trigger registry."
),
min_length=1,
)
config: dict[str, Any] = Field( config: dict[str, Any] = Field(
default_factory=dict, default_factory=dict,
description=( description="Type-specific config; validated against the trigger's schema.",
"Trigger-type-specific config. Validated against the "
"registered ``TriggerDefinition.config_schema`` for "
"``type`` at definition-save time."
),
) )

View file

@ -1,4 +1,4 @@
"""Per-trigger config schemas: one file per trigger type registered in v1.""" """Per-trigger config schemas, one per trigger type."""
from __future__ import annotations from __future__ import annotations

View file

@ -1,4 +1,4 @@
"""``ManualTriggerConfig`` — config for the ``manual`` trigger type (empty in v1).""" """``ManualTriggerConfig`` — config for the ``manual`` trigger (empty in v1)."""
from __future__ import annotations from __future__ import annotations
@ -6,16 +6,4 @@ from pydantic import BaseModel, ConfigDict
class ManualTriggerConfig(BaseModel): class ManualTriggerConfig(BaseModel):
"""Config for the UI-driven ``manual`` trigger.
Validated against ``AutomationTrigger.config`` whenever the
persisted ``type`` is ``manual``. v1 carries no configurable
fields the "Run now" affordance simply fires this trigger with
an empty config object. The model exists so the registry dispatch
is uniform across all trigger types.
Future versions may add fields here (e.g., a fixed prompt to
pre-fill the run dialog with) without breaking v1 payloads.
"""
model_config = ConfigDict(extra="forbid") model_config = ConfigDict(extra="forbid")

View file

@ -6,28 +6,7 @@ from pydantic import BaseModel, ConfigDict, Field
class ScheduleTriggerConfig(BaseModel): class ScheduleTriggerConfig(BaseModel):
"""Config for a cron-driven trigger.
Validated against ``AutomationTrigger.config`` whenever the
persisted ``type`` is ``schedule``. The cron expression is
evaluated by Celery Beat's source; the timezone is an IANA name
(e.g., ``Africa/Kigali``) and is required so the user's cron is
unambiguous across DST boundaries.
"""
model_config = ConfigDict(extra="forbid") model_config = ConfigDict(extra="forbid")
cron: str = Field( cron: str = Field(..., description="Five-field cron expression.", examples=["0 9 * * 1-5"])
..., timezone: str = Field(..., description="IANA timezone.", examples=["Africa/Kigali"])
description=(
"Five-field cron expression. Minimum resolution is one "
"minute; the form editor warns when intervals tighter "
"than 15 minutes are used."
),
examples=["0 9 * * 1-5"],
)
timezone: str = Field(
...,
description="IANA timezone name (e.g., 'Africa/Kigali', 'UTC').",
examples=["Africa/Kigali"],
)