feat(automation): add Pydantic schemas for the automation definition

Three layers of Pydantic models under app/automations/schemas/, one
file per concern (SRP), matching the envelope in
automation-design-plan.md §5.

definition/ — the editable envelope persisted in
automations.definition:
  - envelope.py       AutomationDefinition (top-level shape)
  - plan_step.py      PlanStep (one step in the sequential plan)
  - inputs.py         InputsBlock (the inputs JSON Schema wrapper)
  - execution.py      ExecutionBlock (timeouts, retries, concurrency,
                                      budget cap, on_failure plan)
  - metadata.py       MetadataBlock (tags + created_from_nl + extras)
  - trigger_spec.py   TriggerSpec (one entry in triggers[])

triggers/ — per-trigger config schemas, dispatched by registry on the
TriggerSpec.type discriminator:
  - schedule.py       ScheduleTriggerConfig(cron, timezone)
  - manual.py         ManualTriggerConfig() — empty in v1

actions/ — per-action config schemas, dispatched by registry on the
PlanStep.action discriminator:
  - agent_task.py     AgentTaskActionConfig(prompt, tools, model,
                                            output_schema)

Design properties verified by an inline smoke test:
  - The §5 worked example round-trips through model_validate_json /
    model_dump_json byte-for-byte (InputsBlock uses
    serialize_by_alias so the JSON key stays "schema" not
    "schema_").
  - Envelope rejects unknown top-level keys (extra="forbid").
  - MetadataBlock tolerates unknown keys (extra="allow").
  - ExecutionBlock defaults apply when the block is omitted.
  - retry_backoff and concurrency are typed as Literal — bogus
    values rejected at validation time.
  - Per-type configs enforce their required fields (cron + timezone
    on schedule; non-empty prompt on agent_task).

The envelope keeps trigger and action configs as untyped dicts on
purpose — per-type validation is a registry-driven dispatch (commit
10), keeping the envelope free of every-type-knows-every-type
coupling.
This commit is contained in:
CREDO23 2026-05-26 22:50:52 +02:00
parent d9183464d9
commit be4d43d6c9
13 changed files with 539 additions and 4 deletions

View file

@ -2,4 +2,25 @@
from __future__ import annotations
__all__: list[str] = []
from .actions import AgentTaskActionConfig
from .definition import (
AutomationDefinition,
ExecutionBlock,
InputsBlock,
MetadataBlock,
PlanStep,
TriggerSpec,
)
from .triggers import ManualTriggerConfig, ScheduleTriggerConfig
__all__ = [
"AgentTaskActionConfig",
"AutomationDefinition",
"ExecutionBlock",
"InputsBlock",
"ManualTriggerConfig",
"MetadataBlock",
"PlanStep",
"ScheduleTriggerConfig",
"TriggerSpec",
]

View file

@ -2,4 +2,8 @@
from __future__ import annotations
__all__: list[str] = []
from .agent_task import AgentTaskActionConfig
__all__ = [
"AgentTaskActionConfig",
]

View file

@ -0,0 +1,66 @@
"""``AgentTaskActionConfig`` — config for the ``agent_task`` action type."""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class AgentTaskActionConfig(BaseModel):
"""Config for an ``agent_task`` plan step.
Validated against ``PlanStep.config`` whenever the step's
``action`` is ``agent_task``. The step instructs the LangGraph
Deep Agent runtime to:
1. Receive ``prompt`` (with all preceding-step outputs and inputs
already rendered by the template engine).
2. Run the agent with access to *exactly* the capabilities named
in ``tools`` nothing else from the registry is visible to
this agent invocation.
3. Return a JSON object matching ``output_schema`` (recommended;
the executor validates and re-prompts on mismatch).
``output_schema`` is the design's "dynamic output contract"
instead of locking the output shape on the ActionDefinition (as
tight actions do), the user declares the shape they want for this
specific step, and the agent has to match it.
"""
model_config = ConfigDict(extra="forbid")
prompt: str = Field(
...,
description=(
"The task prompt rendered through the Jinja sandbox. May "
"reference automation inputs and prior-step outputs."
),
min_length=1,
)
tools: list[str] = Field(
default_factory=list,
description=(
"Allowlist of capability IDs the agent may call (e.g., "
"'search_space.query'). Empty list = no tool access; the "
"agent must answer from the prompt alone."
),
)
model: str | None = Field(
default=None,
description=(
"Optional LiteLLM model identifier (e.g., "
"'anthropic/claude-sonnet-4-7'). Omitted means the "
"automation falls back to the search space's default "
"agent_llm_id."
),
)
output_schema: dict[str, Any] | None = Field(
default=None,
description=(
"Optional JSON Schema declaring the shape the agent must "
"return. Strongly recommended; the editor warns when "
"missing. Validated by the executor before binding to "
"``output_as``."
),
)

View file

@ -2,4 +2,18 @@
from __future__ import annotations
__all__: list[str] = []
from .envelope import AutomationDefinition
from .execution import ExecutionBlock
from .inputs import InputsBlock
from .metadata import MetadataBlock
from .plan_step import PlanStep
from .trigger_spec import TriggerSpec
__all__ = [
"AutomationDefinition",
"ExecutionBlock",
"InputsBlock",
"MetadataBlock",
"PlanStep",
"TriggerSpec",
]

View file

@ -0,0 +1,89 @@
"""``AutomationDefinition`` — the top-level envelope persisted in ``automations.definition``."""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
from .execution import ExecutionBlock
from .inputs import InputsBlock
from .metadata import MetadataBlock
from .plan_step import PlanStep
from .trigger_spec import TriggerSpec
class AutomationDefinition(BaseModel):
"""The top-level JSON shape stored in ``automations.definition``.
This is the editable spec a user authors (or the NL generator
produces). The envelope is structural only every nested
discriminator (``triggers[].type``, ``plan[].action``) is resolved
against the registries at validation time, so adding a new
trigger or action type does not require touching this schema.
See ``automation-design-plan.md`` §5 for the worked example and
rationale.
"""
model_config = ConfigDict(extra="forbid")
schema_version: str = Field(
default="1.0",
description=(
"Schema version of the envelope itself. Migrations bump "
"this when the envelope shape changes; nested per-type "
"configs evolve independently via the registries."
),
)
name: str = Field(
...,
description="Short, user-facing name shown in lists.",
min_length=1,
max_length=200,
)
goal: str | None = Field(
default=None,
description=(
"Optional plain-language statement of what the "
"automation is for. Used by the NL generator's review "
"pass and by the UI's run dialog."
),
)
inputs: InputsBlock | None = Field(
default=None,
description=(
"Optional input contract. When omitted, the automation "
"accepts no inputs at fire time."
),
)
triggers: list[TriggerSpec] = Field(
default_factory=list,
description=(
"Triggers that fire this automation. Empty list means "
"the automation is only runnable via the manual "
"``Run now`` path."
),
)
plan: list[PlanStep] = Field(
...,
description=(
"Ordered sequence of steps. Executed in array order — "
"no parallelism, no DAGs, no loops at the envelope "
"level."
),
min_length=1,
)
execution: ExecutionBlock = Field(
default_factory=ExecutionBlock,
description=(
"Execution defaults (timeouts, retries, concurrency, "
"budget). All fields default to safe values; the block "
"may be omitted entirely."
),
)
metadata: MetadataBlock = Field(
default_factory=MetadataBlock,
description=(
"Free-form metadata (tags, NL-generator breadcrumbs, "
"UI annotations). Tolerates unknown keys by design."
),
)

View file

@ -0,0 +1,76 @@
"""``ExecutionBlock`` — the ``execution`` section of the automation definition."""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field
from .plan_step import PlanStep
class ExecutionBlock(BaseModel):
"""The ``execution`` block of an ``AutomationDefinition``.
Carries automation-wide defaults that individual ``PlanStep``s
can override. Every field has a sane default so an automation
definition may omit the block entirely; in that case all defaults
apply.
``on_failure`` is a secondary plan that runs only when the main
``plan`` fails after retries exhaust. It uses the same
``PlanStep`` shape as the main plan and shares the same execution
semantics.
"""
model_config = ConfigDict(extra="forbid")
timeout_seconds: int = Field(
default=600,
gt=0,
description=(
"Hard wall-clock cap for the entire run. The executor "
"transitions the run to ``timed_out`` when this is "
"exceeded."
),
)
max_retries: int = Field(
default=2,
ge=0,
description=(
"Per-step retry budget applied when a step raises a "
"retryable error. Steps may override per-step."
),
)
retry_backoff: Literal["exponential", "linear", "none"] = Field(
default="exponential",
description="Backoff policy between retries.",
)
concurrency: Literal[
"drop_if_running", "queue", "always"
] = Field(
default="drop_if_running",
description=(
"Behaviour when a new fire arrives while a previous run "
"is still in progress. ``drop_if_running`` skips the new "
"fire, ``queue`` enqueues it, ``always`` runs it in "
"parallel."
),
)
budget_cap_usd: float | None = Field(
default=None,
gt=0,
description=(
"Optional mid-flight cost cap in USD. The executor kills "
"the run when accumulated cost exceeds this value. v1 "
"treats this as an advisory because cost tracking lands "
"with the executor in a later step."
),
)
on_failure: list[PlanStep] = Field(
default_factory=list,
description=(
"Secondary plan executed only when the main plan fails "
"after retries exhaust. Empty list means no fallback."
),
)

View file

@ -0,0 +1,43 @@
"""``InputsBlock`` — the ``inputs`` section of the automation definition."""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class InputsBlock(BaseModel):
"""The ``inputs`` block of an ``AutomationDefinition``.
Holds a JSON Schema describing what data the automation accepts at
fire time. The same schema is used by:
- The form editor (to render the manual-run dialog).
- The dispatcher (to validate trigger payloads before enqueueing
executor work).
- The template engine (to expose ``{{ inputs.* }}`` references in
plan-step configs).
The ``schema`` value is the JSON-Schema dict itself, not a
Pydantic model automations express their input contract in pure
JSON Schema so it round-trips losslessly through the database and
the NL generator.
"""
model_config = ConfigDict(
extra="forbid",
populate_by_name=True,
serialize_by_alias=True,
)
schema_: dict[str, Any] = Field(
...,
alias="schema",
description=(
"JSON Schema (draft-07 compatible) describing the inputs "
"this automation accepts. Properties may use the special "
"``$last_fired_at`` default literal to bind to the "
"trigger's last fire time."
),
)

View file

@ -0,0 +1,36 @@
"""``MetadataBlock`` — the ``metadata`` section of the automation definition."""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
class MetadataBlock(BaseModel):
"""Free-form metadata attached to the automation definition.
Unlike the rest of the envelope this block tolerates unknown keys
(``extra='allow'``) it's a deliberate extension point for
UI annotations, NL-generator breadcrumbs, custom tags, etc.
Two fields are first-class so the rest of the system can rely on
them without reaching into the loose extras:
``tags`` used by the UI for filtering and grouping.
``created_from_nl`` set by the NL generator so we can later
measure how many runs came from natural-language authoring.
"""
model_config = ConfigDict(extra="allow")
tags: list[str] = Field(
default_factory=list,
description="UI-facing tags. No semantic meaning to the engine.",
)
created_from_nl: bool = Field(
default=False,
description=(
"True when the definition was produced by the NL "
"generator (set automatically by the generator path; "
"human-authored definitions keep this false)."
),
)

View file

@ -0,0 +1,86 @@
"""``PlanStep`` — one entry in the envelope's ``plan`` array."""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class PlanStep(BaseModel):
"""One step in an automation's sequential plan.
Steps run in array order, no parallelism, no DAGs, no loops. The
``when`` Jinja expression provides conditional skip; branching is
achieved by ``when`` clauses on multiple steps. For looping or
parallel work, the user routes through ``agent_task`` and lets the
agent reason about it.
``config`` is dispatched against the action registry at
validation time its shape is determined by
``ActionDefinition.config_schema`` for the ``action`` value.
``output_as`` binds the step's typed output into the template
namespace for later steps, e.g. ``output_as: 'summary'`` then
``{{ summary.bullets }}`` in a downstream step's config.
"""
model_config = ConfigDict(extra="forbid")
step_id: str = Field(
...,
description=(
"Unique-within-plan identifier. Used in run logs and as "
"the default for ``output_as`` when not provided."
),
min_length=1,
)
action: str = Field(
...,
description=(
"Action-type discriminator (e.g., ``agent_task``). "
"Resolved against the action registry."
),
min_length=1,
)
when: str | None = Field(
default=None,
description=(
"Optional Jinja expression evaluated against the run "
"context. Step is skipped when the expression is "
"falsy."
),
)
config: dict[str, Any] = Field(
default_factory=dict,
description=(
"Action-type-specific config. Validated against the "
"registered ``ActionDefinition.config_schema`` for "
"``action`` at definition-save time. Jinja templates "
"inside config are rendered at step-execute time."
),
)
output_as: str | None = Field(
default=None,
description=(
"Name to bind the step output under for downstream "
"steps. Defaults to ``step_id`` when omitted."
),
)
max_retries: int | None = Field(
default=None,
ge=0,
description=(
"Per-step override of the automation-level ``max_retries``. "
"Omitted means inherit from execution block."
),
)
timeout_seconds: int | None = Field(
default=None,
gt=0,
description=(
"Per-step override of the automation-level "
"``timeout_seconds``. Omitted means inherit from "
"execution block."
),
)

View file

@ -0,0 +1,40 @@
"""``TriggerSpec`` — one entry in the envelope's ``triggers`` array."""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class TriggerSpec(BaseModel):
"""One trigger attached to an automation, as it appears in the definition.
The envelope keeps ``config`` as an untyped JSON object on purpose
the per-type config schemas live in
``app.automations.schemas.triggers`` and are dispatched at
validation time by looking up ``type`` in the trigger registry.
This mirrors the design's "definitions are pure data" principle:
the envelope describes shape, the registry resolves names to
behaviour.
"""
model_config = ConfigDict(extra="forbid")
type: str = Field(
...,
description=(
"Trigger-type discriminator (e.g., ``schedule``, ``manual``). "
"Resolved against the trigger registry."
),
min_length=1,
)
config: dict[str, Any] = Field(
default_factory=dict,
description=(
"Trigger-type-specific config. Validated against the "
"registered ``TriggerDefinition.config_schema`` for "
"``type`` at definition-save time."
),
)

View file

@ -2,4 +2,10 @@
from __future__ import annotations
__all__: list[str] = []
from .manual import ManualTriggerConfig
from .schedule import ScheduleTriggerConfig
__all__ = [
"ManualTriggerConfig",
"ScheduleTriggerConfig",
]

View file

@ -0,0 +1,21 @@
"""``ManualTriggerConfig`` — config for the ``manual`` trigger type (empty in v1)."""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict
class ManualTriggerConfig(BaseModel):
"""Config for the UI-driven ``manual`` trigger.
Validated against ``AutomationTrigger.config`` whenever the
persisted ``type`` is ``manual``. v1 carries no configurable
fields the "Run now" affordance simply fires this trigger with
an empty config object. The model exists so the registry dispatch
is uniform across all trigger types.
Future versions may add fields here (e.g., a fixed prompt to
pre-fill the run dialog with) without breaking v1 payloads.
"""
model_config = ConfigDict(extra="forbid")

View file

@ -0,0 +1,33 @@
"""``ScheduleTriggerConfig`` — config for the ``schedule`` trigger type."""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
class ScheduleTriggerConfig(BaseModel):
"""Config for a cron-driven trigger.
Validated against ``AutomationTrigger.config`` whenever the
persisted ``type`` is ``schedule``. The cron expression is
evaluated by Celery Beat's source; the timezone is an IANA name
(e.g., ``Africa/Kigali``) and is required so the user's cron is
unambiguous across DST boundaries.
"""
model_config = ConfigDict(extra="forbid")
cron: str = Field(
...,
description=(
"Five-field cron expression. Minimum resolution is one "
"minute; the form editor warns when intervals tighter "
"than 15 minutes are used."
),
examples=["0 9 * * 1-5"],
)
timezone: str = Field(
...,
description="IANA timezone name (e.g., 'Africa/Kigali', 'UTC').",
examples=["Africa/Kigali"],
)