feat(automations): static_inputs on triggers + vertical-slice api/services

This commit is contained in:
CREDO23 2026-05-27 21:21:43 +02:00
parent 84d99f19a2
commit 27ab367a13
27 changed files with 915 additions and 356 deletions

View file

@ -34,24 +34,27 @@ system will survive feature growth:
--- ---
## 2. The four-layer contract ## 2. The three-layer contract
The system is structured as four layers. Layers 1, 2, and 4 are defined by The system is structured as three layers. Layers 1 and 3 are defined by
SurfSense developers (at registration time). Layer 3 is what users write SurfSense developers (at registration time). Layer 2 is what users write
(or the NL generator produces). The runtime reads all four to do its job. (or the NL generator produces). The runtime reads all three to do its job.
| Layer | What it is | Defined by | | Layer | What it is | Defined by |
| ----- | ---------- | ---------- | | ----- | ---------- | ---------- |
| **1. Capability registry** | What this SurfSense instance can do | Developers, at startup | | **1. Action contract** | Per-action params and output schema | Developers, at startup |
| **2. Action contract** | Per-action input/output schema | Developers, at startup | | **2. Automation definition** | One concrete saved automation | Users (or NL generator) |
| **3. Automation definition** | One concrete saved automation | Users (or NL generator) | | **3. Trigger contract** | Per-trigger params and payload schemas | Developers, at startup |
| **4. Trigger contract** | Per-trigger config and payload schemas | Developers, at startup |
Each layer constrains the one above. The runtime reads all four but doesn't Each layer constrains the next. The runtime reads all three but doesn't
know what's in them ahead of time. That's how a new capability or trigger know what's in them ahead of time. That's how a new action or trigger
type becomes available across the engine without code changes outside its type becomes available across the engine without code changes outside its
registration. registration.
A unification layer below Layer 1 — one catalog of "things this SurfSense
instance can do," shared by automations, agents, and future surfaces — was
considered and deferred (§3). v1 actions are stand-alone.
### Schema language ### Schema language
Every shape in every layer is described in **JSON Schema (draft 2020-12).** Every shape in every layer is described in **JSON Schema (draft 2020-12).**
@ -66,167 +69,126 @@ extensions on top:
--- ---
## 3. Capability registry (Layer 1) ## 3. Capability unification layer — deferred to post-v1
A `Capability` is one discrete thing the SurfSense backend exposes — Earlier drafts introduced a `Capability` registry as Layer 1: one catalog
"post a Slack message," "query the Search Space," "generate a podcast." It of "things this SurfSense instance can do," shared by the automation
is the atomic unit of "things automations can do." engine (as actions), the agent (as tools), and any future HTTP surface.
The motivation is real — one source of truth beats N parallel registries —
but v1 has a single action (`agent_task`) and a single consumer (the
automation engine). The five-field shape sketched earlier (`id`,
`description`, `input_schema`, `output_schema`, `handler`) cannot safely
host any non-trivial capability: it carries no caller identity, no
search-space scoping, and no authorization gate on tool delegation.
Building the abstraction with one consumer would lock in a shape that
doesn't survive the second consumer.
```python The unification layer returns when the second consumer lands (Phase 2
@dataclass tight actions or Phase 4 MCP), redesigned from the start with:
class Capability:
id: str # "slack.post_message"
description: str # for the NL generator + UI label
input_schema: dict # JSON Schema
output_schema: dict # JSON Schema
handler: AsyncHandler
```
### v1-minimum: five fields, nothing else - A `CallContext` carrying caller user id, search space id, and run id,
passed to every handler invocation.
- Explicit scope declarations per capability (e.g. `reads:documents`,
`writes:slack`, `destructive`) for the authorization layer to read.
- A per-user, per-search-space filter consulted at both definition save
time (validating `agent_task.tools`) and run time (scoping the agent's
tool list to what the automation creator can delegate).
The Capability is **deliberately five fields in v1**. Every additional field Until then:
that earlier drafts considered (`name`, `required_credentials`,
`side_effects`, `expected_duration_seconds`, `cost_estimate`) has been
removed until a concrete consumer feature demands it. Authoring stays cheap
and the registry stays trivial to introspect:
- `name` → folded into `description`. The UI can render a short label from - v1 actions are stand-alone units (Layer 1 below); the automation engine
the first line of `description` or fall back to `id`. No separate field reads its own action registry, nothing else.
needed in v1. - `agent_task.params.tools` is a forward-looking allowlist field with no
- `required_credentials` → returns when external-credential capabilities v1 semantics beyond "list of string identifiers." The handler's tool
ship (Phase 2). v1 capabilities run server-side with app config; nothing resolution is opaque to the automation contract.
to declare.
- `side_effects` → returns when RBAC inside automations or
`READ_ONLY`-only agent tool gating arrives. v1 capabilities are
hand-picked and all trusted code.
- `expected_duration_seconds` → returns when multi-queue routing ships.
Single Celery queue in v1.
- `cost_estimate` → never returns as a declared field; cost is measured
per run from a ledger, aggregated per Capability, and surfaced as a
historical average. Pre-flight checks are deferred.
The runtime invariant: a Capability is **a typed, named, callable thing
the system can do.** Every consumer (executor, agent tool layer, future
HTTP API) sees the same five-field shape and uses it the same way.
### Where capabilities live (v1)
In v1, the capability registry is a single in-memory dict, populated at
process startup from native registrations in
`automations/registries/capabilities/`. Identical across all workers.
No database persistence, no closures rebuilt per worker.
### MCP integration — deferred to Phase 4
The earlier two-tier registry (native + MCP-derived), the
`mcp_connections` / `mcp_tools` tables, the harvester, and the lazy
per-worker closure cache are **deferred to Phase 4** along with the
rest of the integration-tooling surface. They are removed from v1
because:
- v1 has no external connector capabilities (no Slack, Notion, Drive,
etc.). The only capabilities that will ship are server-side helpers
(search-space query / fetch) plus the loose `agent_task` action.
- Without external connectors, the lifecycle mismatch that motivates
the two-tier design (connect Monday, run Friday, workers restarted
in between) doesn't arise. A startup-time dict is sufficient.
- Phase 4 reintroduces this design as-is — the registry interface in
v1 is the same callable surface a Phase-4 MCP harvester will register
into. The deferral is additive, not a different design.
See archived design at `docs/automation/archived/mcp-registry.md` once
v1 ships; for now the only consumer of the registry is the in-memory
native path.
### Credentials — deferred to Phase 2 ### Credentials — deferred to Phase 2
The earlier per-call credential resolution pattern (`ctx.resolve_mcp_client`, External-credential handlers (Slack, email, etc.) require per-user or
`ctx.resolve_http_client`, `ctx.resolve_llm`) is **deferred to Phase 2**. per-connection auth. v1 actions run server-side with app-level
v1 capabilities run server-side using app-level configuration; none of configuration. When tight actions ship in Phase 2, the credential design
the seven v1 capabilities needs per-user or per-connection auth. lands as part of the unification redesign: connection IDs in the
definition (never tokens); credentials loaded per-call by the handler
context (never pre-loaded into worker memory); credentials never enter
LLM context.
When Phase 2 ships external-credential capabilities (Slack, email, etc.), ### MCP — deferred to Phase 4
the three guarantees the original design promised are reintroduced
unchanged:
- Credentials never appear in the automation definition (connection IDs External tool servers feeding tools into a shared registry land with the
only). rest of the integration tooling in Phase 4, after the unification layer
- Credentials never appear in the LLM's context (the host holds them is in place. The two-tier registry, `mcp_connections` and `mcp_tools`
and uses them on the LLM's behalf when executing tool calls). tables, and the harvester arrive as a single coherent step then.
- Credentials are loaded per-call, not pre-loaded into worker memory.
The Phase-2 design returns as-is; only the v1 surface is simplified.
--- ---
## 4. Action contract (Layer 2) ## 4. Action contract
An `Action` is what a user references in a plan step. Most actions are An `Action` is what a user references in a plan step. Some actions are
thin wrappers around one capability (e.g., `slack_post` wraps deterministic single-purpose handlers (`slack_post`, `send_email`); one
`slack.post_message`). Some compose: `agent_task` is one action whose action (`agent_task`) hosts an LLM and a tool allowlist for cases where
handler invokes the LangGraph runtime, which in turn can call many judgment is needed. The contract is the same in both cases — only the
capabilities. handler differs.
```python ```python
@dataclass @dataclass(frozen=True, slots=True)
class ActionDefinition: class ActionDefinition:
type: str # "agent_task", "slack_post" type: str # "agent_task", "slack_post"
name: str # for the UI name: str # short UI label
description: str # for the NL generator description: str # for the NL generator and the UI
config_schema: dict # JSON Schema for action.config params_schema: dict # JSON Schema for step.params
output_contract: dict | DynamicOutput # what it produces handler: ActionHandler
uses_capabilities: list[str] # IDs from the registry
produces_artifacts: list[ArtifactSpec] # see §8
handler: AsyncHandler
``` ```
This is the v1 shape: five fields, no handler context, no output
contract, no artifact declaration. The deferrals are intentional:
- **`output_contract`** — Phase 2. Deterministic handlers will return
a fixed shape; v1's only action (`agent_task`) takes an
`output_schema` inside `params` and validates against that instead.
- **`produces_artifacts`** — Phase 5. Artifact lifecycle (storage,
signed URLs, retention) is its own design step; v1 handlers
persist their own outputs.
- **Handler context** — paired with the unification redesign (§3).
v1 handlers receive `(args)` only; per-user / per-search-space
behavior is not yet a v1 concern.
### Tight vs loose actions ### Tight vs loose actions
Two patterns coexist by design: Two patterns coexist by design:
- **Tight actions** (`slack_post`, `linear_create_issue`, `send_email`): - **Tight actions** (`slack_post`, `linear_create_issue`,
config_schema is fully specified, output_contract is fixed, handler is a `send_email`) — deterministic single-purpose handlers. ~20 LOC
thin wrapper. ~20 LOC each. Used when the user knows exactly what they each. **Phase 2.**
want done — no LLM tokens spent on trivial work. - **Loose actions** (`agent_task`) — params_schema accepts a `prompt`,
a `tools` allowlist, and an optional `output_schema` declaring what
the agent must return; the handler validates the agent's output
against it. **v1.**
- **Loose actions** (`agent_task`): config_schema accepts a `prompt` and a The agent's `tools` allowlist resolves opaquely in v1; the redesigned
`tools` allowlist; output_contract is *dynamic* — the user declares the unification layer (§3) will give both invocation modes access to the
output shape they want via `output_schema` in the step config; the same vocabulary, with per-user authorization gating both.
handler asks the LLM to return that shape and validates. Used when
judgment is needed.
The agent's tool list is **the same capabilities** that tight actions call
directly. One registry, two invocation modes. Adding a new MCP server gives
both modes access to its tools automatically.
### How names in the definition become function calls ### How names in the definition become function calls
The definition contains strings like `"action": "slack_post"`. The string is The definition contains strings like `"action": "agent_task"`. The
just a name — it does not point to a function. At runtime, the executor string is just a name — it does not point to a function. At runtime,
performs a **name-based lookup** against the action registry: the executor performs a **name-based lookup** against the action
registry:
```python ```python
# step.action is a string from the JSON definition, e.g. "slack_post" action_def = action_registry.get(step.action) # dict lookup
action_def = _ACTION_REGISTRY[step.action] # dict lookup
handler = action_def.handler # Python callable handler = action_def.handler # Python callable
result = await handler(ctx, resolved_config) # invocation result = await handler(resolved_params) # invocation
``` ```
The registry is a Python dict (or a thin wrapper around one) populated at The registry is a Python dict populated at process startup. Each entry
process startup. Each entry in `automations/actions/*.py` calls a in `automations/registries/actions/*.py` calls `register_action(...)`
`register_action(...)` function at module import time, putting its at module import time, putting its `ActionDefinition` (including the
`ActionDefinition` (including the handler function reference) into the handler function reference) into the registry.
registry.
The same pattern applies to capabilities. The definition references The definition is pure data. The registry is the engine's runtime
capabilities by ID (`"slack.post_message"`); the capability registry maps vocabulary. They meet at name-based lookup; nothing else crosses the
the ID to a `Capability` object holding the handler. Definitions never boundary.
reference Python code directly — they reference names that the registry
resolves to code.
This separation is what makes the contract portable. The definition is
pure data. The registry is the engine's runtime vocabulary. They meet at
name-based lookup; nothing else crosses the boundary.
### The full expressive spectrum ### The full expressive spectrum
@ -238,7 +200,7 @@ fully agentic. Six practical shapes worth recognizing:
| **1. Direct call** | `slack_post` with literal channel and template | No LLM. ~200ms. Fractions of a cent. | | **1. Direct call** | `slack_post` with literal channel and template | No LLM. ~200ms. Fractions of a cent. |
| **2. Direct call with computed inputs** | `linear_create_issue` using `{{summary.title}}` from a prior step | No LLM for this step. Cheap. | | **2. Direct call with computed inputs** | `linear_create_issue` using `{{summary.title}}` from a prior step | No LLM for this step. Cheap. |
| **3. Single-domain agent task** | `agent_task` with `tools: ["slack.*"]` only | One LLM, bounded toolset. | | **3. Single-domain agent task** | `agent_task` with `tools: ["slack.*"]` only | One LLM, bounded toolset. |
| **4. Multi-domain agent task, narrow** | `agent_task` with `tools: ["github.list_pull_requests", "linear.create_issue"]` | One LLM, named capabilities. | | **4. Multi-domain agent task, narrow** | `agent_task` with `tools: ["github.list_pull_requests", "linear.create_issue"]` | One LLM, named tools. |
| **5. Multi-domain agent task, broad** | `agent_task` with `tools: ["slack.*", "github.*", "linear.*"]` | One LLM, large toolset, most agentic. | | **5. Multi-domain agent task, broad** | `agent_task` with `tools: ["slack.*", "github.*", "linear.*"]` | One LLM, large toolset, most agentic. |
| **6. Composed plan** | `agent_task` (narrow) for thinking → `slack_post` + `linear_create_issue` for acting | Best cost-to-power ratio. | | **6. Composed plan** | `agent_task` (narrow) for thinking → `slack_post` + `linear_create_issue` for acting | Best cost-to-power ratio. |
@ -258,7 +220,7 @@ user's.
--- ---
## 5. Automation definition (Layer 3) ## 5. Automation definition
This is the JSON the user writes (or the NL generator produces). Stored in This is the JSON the user writes (or the NL generator produces). Stored in
`automations.definition` as JSONB. `automations.definition` as JSONB.
@ -287,7 +249,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in
"triggers": [ "triggers": [
{ {
"type": "schedule", "type": "schedule",
"config": { "cron": "0 9 * * 1-5", "timezone": "Africa/Kigali" } "params": { "cron": "0 9 * * 1-5", "timezone": "Africa/Kigali" }
} }
], ],
@ -295,7 +257,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in
{ {
"step_id": "research", "step_id": "research",
"action": "agent_task", "action": "agent_task",
"config": { "params": {
"prompt": "Find documents tagged {{inputs.tags}} indexed since {{inputs.since}}. Return JSON with bullets and source_doc_ids.", "prompt": "Find documents tagged {{inputs.tags}} indexed since {{inputs.since}}. Return JSON with bullets and source_doc_ids.",
"tools": ["search_space.query", "search_space.fetch_document"], "tools": ["search_space.query", "search_space.fetch_document"],
"model": "anthropic/claude-sonnet-4-7", "model": "anthropic/claude-sonnet-4-7",
@ -313,7 +275,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in
{ {
"step_id": "deliver", "step_id": "deliver",
"action": "slack_post", "action": "slack_post",
"config": { "params": {
"channel_id": "C0123", "channel_id": "C0123",
"message_template": "*Competitor digest*\n\n{% for b in summary.bullets %}• {{b}}\n{% endfor %}" "message_template": "*Competitor digest*\n\n{% for b in summary.bullets %}• {{b}}\n{% endfor %}"
} }
@ -325,11 +287,10 @@ This is the JSON the user writes (or the NL generator produces). Stored in
"max_retries": 2, "max_retries": 2,
"retry_backoff": "exponential", "retry_backoff": "exponential",
"concurrency": "drop_if_running", "concurrency": "drop_if_running",
"budget_cap_usd": 1.50,
"on_failure": [ /* steps to run if main plan fails after retries */ ] "on_failure": [ /* steps to run if main plan fails after retries */ ]
}, },
"metadata": { "tags": ["digest"], "created_from_nl": true } "metadata": { "tags": ["digest"] }
} }
``` ```
@ -340,7 +301,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in
"step_id": "...", // unique within plan "step_id": "...", // unique within plan
"action": "...", // references an ActionDefinition.type "action": "...", // references an ActionDefinition.type
"when": "{{ ... }}", // optional Jinja expr → bool; false = skip "when": "{{ ... }}", // optional Jinja expr → bool; false = skip
"config": { ... }, // validated against action's config_schema "params": { ... }, // validated against action's params_schema
"output_as": "...", // binds output to this name for later steps "output_as": "...", // binds output to this name for later steps
"max_retries": 0, // optional, overrides automation default "max_retries": 0, // optional, overrides automation default
"timeout_seconds": 1200 // optional, overrides automation default "timeout_seconds": 1200 // optional, overrides automation default
@ -354,7 +315,7 @@ about it, or they compose automations through events (§7.5).
--- ---
## 6. Trigger contract (Layer 4) ## 6. Trigger contract
Three trigger types. That's the entire taxonomy. Three trigger types. That's the entire taxonomy.
@ -363,23 +324,12 @@ Three trigger types. That's the entire taxonomy.
```python ```python
TriggerDefinition( TriggerDefinition(
type="schedule", type="schedule",
config_schema={ params_model=ScheduleTriggerParams, # cron + timezone
"type": "object",
"required": ["cron", "timezone"],
"properties": {
"cron": { "type": "string" },
"timezone": { "type": "string", "format": "iana-timezone" }
}
},
payload_schema={
"type": "object",
"properties": {
"fired_at": { "type": "string", "format": "date-time" },
"scheduled_for": { "type": "string", "format": "date-time" },
"last_fired_at": { "type": "string", "format": "date-time" }
}
}
) )
# At fire time the schedule producer emits runtime inputs
# (fired_at, scheduled_for, last_fired_at) which are merged with the
# trigger row's static_inputs (static wins) and validated against
# automation.definition.inputs.schema_.
``` ```
Implementation: extends `app/utils/periodic_scheduler.py`, which already Implementation: extends `app/utils/periodic_scheduler.py`, which already
@ -395,7 +345,7 @@ want an event trigger instead.
```python ```python
TriggerDefinition( TriggerDefinition(
type="webhook", type="webhook",
config_schema={ params_schema={
"type": "object", "type": "object",
"properties": { "properties": {
"input_mapping": { "input_mapping": {
@ -422,7 +372,7 @@ Dedups against runs in the last 24 hours.
```python ```python
TriggerDefinition( TriggerDefinition(
type="event", type="event",
config_schema={ params_schema={
"type": "object", "type": "object",
"required": ["event_type"], "required": ["event_type"],
"properties": { "properties": {
@ -485,11 +435,13 @@ Common path (after a trigger has fired):
4. **Snapshot the resolved definition** into the run row (immutable history) 4. **Snapshot the resolved definition** into the run row (immutable history)
5. Enqueue executor task on the single `automations_default` Celery queue 5. Enqueue executor task on the single `automations_default` Celery queue
The cost-estimate pre-check (originally step 3) is **deferred**. The cost-estimate pre-check (originally step 3) is **deferred**. v1
v1 capabilities do not declare `cost_estimate`; pre-flight budgeting actions do not declare cost estimates, the run row has no `cost_usd`
returns when a historical-cost ledger exists. The mid-flight budget column, and no handler reports tokens used — so neither pre-flight
cap (§7.2) still kills the run if accumulated cost crosses prediction nor mid-flight accumulation can be enforced. `Execution`
`budget_cap_usd`. therefore does not expose `budget_cap_usd` in v1; it returns as a single
field addition the day the cost ledger ships (per-action cost reporting
+ `automation_runs.cost_usd` column + executor accumulation).
Queue routing by `expected_duration_seconds` is **deferred** until load Queue routing by `expected_duration_seconds` is **deferred** until load
patterns justify a second queue. v1 uses a single queue. patterns justify a second queue. v1 uses a single queue.
@ -510,15 +462,15 @@ async def execute_run(run_id: int) -> None:
if step.when and not evaluate_predicate(step.when, context | step_outputs): if step.when and not evaluate_predicate(step.when, context | step_outputs):
record_step_skipped(run, step); continue record_step_skipped(run, step); continue
resolved_config = render_config(step.config, context | step_outputs) resolved_params = render_params(step.params, context | step_outputs)
action = action_registry.get(step.action) action = action_registry.get(step.action)
validate(resolved_config, action.config_schema) validate(resolved_params, action.params_schema)
try: try:
result = await with_retries( result = await with_retries(
action.handler, action.handler,
ctx=build_action_context(run, action), ctx=build_action_context(run, action),
args=resolved_config, args=resolved_params,
policy=step.retry_policy or run.execution.retry_policy, policy=step.retry_policy or run.execution.retry_policy,
) )
validate(result, step.output_schema) validate(result, step.output_schema)
@ -541,14 +493,20 @@ validated dict come back; it doesn't know that step was "smart."
### 7.3 Action handlers ### 7.3 Action handlers
One handler per `ActionDefinition.type`. Receives `(ctx, args)`, returns One handler per `ActionDefinition.type`. Receives the validated `args`
a dict matching `output_contract` (or matching the user-declared dict and returns whatever the step's output validates against (a fixed
`output_schema` for dynamic-output actions like `agent_task`). shape declared by tight actions, or a dynamic shape declared via
`output_schema` in the step params for `agent_task`).
Handlers handle their own credential resolution via `ctx.resolve_credentials`. Handlers do not know about retries or timeouts — those are the
They do not know about retries, timeouts, or budget caps — those are the
executor's concern. executor's concern.
In v1, handlers take `(args)` only. The `CallContext` parameter sketched
in §7.2's pseudo-code (caller user id, search space id, run id,
credential resolver) arrives with the unification layer redesign (§3);
v1's single action (`agent_task`) reads what it needs from app-level
configuration.
### 7.4 Template engine ### 7.4 Template engine
#### Why it exists #### Why it exists
@ -747,7 +705,7 @@ Three fields, per-automation defaults with optional per-step overrides:
- `timeout_seconds`: integer - `timeout_seconds`: integer
Retries on: Retries on:
- Capability handler exceptions - Action handler exceptions
- Output schema validation failures (for dynamic-output actions, the - Output schema validation failures (for dynamic-output actions, the
validation error is fed back to the LLM in the retry) validation error is fed back to the LLM in the retry)
@ -755,12 +713,21 @@ Not retries:
- `when:` evaluation failures (these are user errors, surface immediately) - `when:` evaluation failures (these are user errors, surface immediately)
- Input validation failures (caught at dispatch, never reach the executor) - Input validation failures (caught at dispatch, never reach the executor)
### Budget enforcement ### Budget enforcement *(deferred — not in v1)*
`budget_cap_usd` is per-run. The dispatcher refuses to enqueue if estimated Future shape: `budget_cap_usd` on `Execution`, dispatcher refuses to
cost exceeds it. The executor kills the run if accumulated cost crosses it enqueue if estimated cost exceeds it, executor kills the run if
mid-flight (the LLM ops handler reports tokens consumed back to the accumulated cost crosses it mid-flight (the LLM ops handler reports
executor between calls). tokens consumed back to the executor between calls).
Prerequisites before this can land:
- Each action declares cost reporting (tokens × model price, API call
charges) — `ActionDefinition` has no such field today.
- `automation_runs.cost_usd` column + executor accumulates per step.
- A historical-cost ledger so pre-flight estimation can return useful
numbers (otherwise the dispatcher gate is guessing).
Until all three exist, v1 has no surface for budget enforcement.
### On-failure handlers ### On-failure handlers
@ -787,14 +754,13 @@ nightly Celery Beat task deletes expired artifacts).
### Duration classes and queue routing — deferred ### Duration classes and queue routing — deferred
The original design routed runs to multiple Celery queues based on each The original design routed runs to multiple Celery queues based on each
capability's declared `expected_duration_seconds`. v1 ships with **one action's declared `expected_duration_seconds`. v1 ships with **one
queue** (`automations_default`) and capabilities do not declare a queue** (`automations_default`) and actions do not declare a duration.
duration. Multi-queue routing returns when burst load on a single queue Multi-queue routing returns when burst load on a single queue actually
actually justifies the operational complexity of independent worker justifies the operational complexity of independent worker pools.
pools.
Adding the second queue is a config change plus reintroducing Adding the second queue is a config change plus reintroducing
`expected_duration_seconds` on the `Capability` dataclass — both `expected_duration_seconds` on the `ActionDefinition` dataclass — both
mechanical, additive, and free of design rewrite. mechanical, additive, and free of design rewrite.
--- ---
@ -833,13 +799,15 @@ and an immutable run history.
### `automation_triggers` ### `automation_triggers`
| field | type | notes | | field | type | notes |
| --------------- | ----------------------------------------------------------------------------- | ------------------------------------------- | | --------------- | ----------------------------------------------------------------------------- | ----------------------------------------------------------- |
| `id` | int PK | | | `id` | int PK | |
| `automation_id` | FK | | | `automation_id` | FK | |
| `type` | enum: `schedule`, `manual` (Phase 2/3 add `webhook`, `event`) | | | `type` | enum: `schedule`, `manual` (Phase 2/3 add `webhook`, `event`) | |
| `config` | jsonb | validated against trigger's `config_schema` | | `params` | jsonb | trigger-type config, validated against trigger's `params_schema` |
| `static_inputs` | jsonb | per-attachment domain values merged into every run (static wins on collision) |
| `enabled` | bool | | | `enabled` | bool | |
| `last_fired_at` | timestamp | | | `last_fired_at` | timestamp | |
| `next_fire_at` | timestamp / null | precomputed next fire moment for schedule triggers |
`secret_hash` (for webhook bearer tokens) is **deferred to Phase 2** with `secret_hash` (for webhook bearer tokens) is **deferred to Phase 2** with
the webhook trigger. the webhook trigger.
@ -853,8 +821,7 @@ the webhook trigger.
| `trigger_id` | FK / null | null = manual via UI | | `trigger_id` | FK / null | null = manual via UI |
| `status` | enum | `pending`, `running`, `succeeded`, `failed`, `cancelled`, `timed_out` | | `status` | enum | `pending`, `running`, `succeeded`, `failed`, `cancelled`, `timed_out` |
| `definition_snapshot` | jsonb | the definition as it was when this run fired | | `definition_snapshot` | jsonb | the definition as it was when this run fired |
| `trigger_payload` | jsonb | | | `inputs` | jsonb | merged & validated inputs (trigger.static_inputs producer runtime data, static wins) |
| `resolved_inputs` | jsonb | |
| `step_results` | jsonb | array of per-step results with timing | | `step_results` | jsonb | array of per-step results with timing |
| `output` | jsonb / null | | | `output` | jsonb / null | |
| `artifacts` | jsonb | references to created artifacts | | `artifacts` | jsonb | references to created artifacts |
@ -863,7 +830,7 @@ the webhook trigger.
| `agent_session_id`| str / null | link to LangGraph trace if agent_task was used | | `agent_session_id`| str / null | link to LangGraph trace if agent_task was used |
`cost_usd` (per-run accumulated cost) is **deferred** until at least one `cost_usd` (per-run accumulated cost) is **deferred** until at least one
v1 capability records token-level cost. When reintroduced it lands as a action records token-level cost. When reintroduced it lands as a
column-only migration. column-only migration.
### Deferred tables ### Deferred tables
@ -897,8 +864,8 @@ not "trusted authors only."
User provides natural-language input. The Generator LLM is given: User provides natural-language input. The Generator LLM is given:
- The full schema set (input schema for definition, registry of action - The full schema set (input schema for definition, registry of action
types with their config_schemas, registry of trigger types, available types with their params_schemas, registry of trigger types, list of
capabilities for this SearchSpace, list of allowed Jinja filters) allowed Jinja filters)
- A tool to list available connectors, channels, and other SearchSpace - A tool to list available connectors, channels, and other SearchSpace
resources, so it doesn't invent names that don't exist resources, so it doesn't invent names that don't exist
- A few-shot set of examples - A few-shot set of examples
@ -918,13 +885,13 @@ Output: a structured proposal matching the automation definition schema.
Server-side, before the proposal reaches the user: Server-side, before the proposal reaches the user:
- Validate against JSON Schema (shape correctness) - Validate against JSON Schema (shape correctness)
- Verify every capability referenced exists in the registry (resource existence) - Verify every action and trigger type referenced exists in the registry
- Verify every connector/channel/resource referenced exists in this SearchSpace - Verify every connector/channel/resource referenced exists in this SearchSpace
- Validate every template against the sandbox's allowlist (no underscore - Validate every template against the sandbox's allowlist (no underscore
attributes, no unregistered filter names, length under cap) attributes, no unregistered filter names, length under cap)
Failures here are deterministic errors, not warnings. A proposal that Failures here are deterministic errors, not warnings. A proposal that
references a non-existent capability or includes a template using references a non-existent action or includes a template using
`{{x.__class__}}` is rejected before the user sees it; the Generator is `{{x.__class__}}` is rejected before the user sees it; the Generator is
re-prompted with the validation error and asked to fix the proposal. re-prompted with the validation error and asked to fix the proposal.
@ -947,7 +914,7 @@ produces two outputs for the user:
- Action sequences that touch external systems without obvious benefit - Action sequences that touch external systems without obvious benefit
to the user to the user
- Cost estimates that seem high relative to the goal - Cost estimates that seem high relative to the goal
- References to capabilities the user hasn't used before - References to actions the user hasn't used before
- Schedules tighter than 15 minutes (likely should be event triggers) - Schedules tighter than 15 minutes (likely should be event triggers)
The Review LLM is a **UX layer** that makes review actually useful. It is The Review LLM is a **UX layer** that makes review actually useful. It is
@ -1009,33 +976,18 @@ always.
surfsense_backend/app/ surfsense_backend/app/
├── automations/ # NEW: the engine ├── automations/ # NEW: the engine
│ ├── __init__.py │ ├── __init__.py
│ ├── models.py # SQLAlchemy models for 6 tables │ ├── persistence/ # SQLAlchemy models + enums for 3 tables
│ ├── schemas.py # Pydantic schemas (definition envelope, etc.) │ ├── schemas/ # Pydantic schemas (definition envelope, etc.)
│ ├── routes.py # FastAPI router (/api/v1/automations) │ ├── routes.py # FastAPI router (/api/v1/automations)
│ ├── service.py # CRUD + business logic │ ├── service.py # CRUD + business logic
│ ├── dispatcher.py # trigger matching, cost check, run creation │ ├── dispatcher.py # trigger matching, run creation
│ ├── executor.py # the Celery task that runs a plan │ ├── executor.py # the Celery task that runs a plan
│ ├── templating.py # Jinja sandbox + filters │ ├── templating.py # Jinja sandbox + filters
│ ├── events.py # publish/subscribe for domain_events │ ├── events.py # publish/subscribe for domain_events
│ ├── filters.py # JSON filter grammar evaluator │ ├── filters.py # JSON filter grammar evaluator
│ ├── actions/ │ ├── registries/ # action and trigger registries
│ │ ├── registry.py │ │ ├── actions/ # ActionDefinition + handler registration
│ │ ├── agent_task.py │ │ └── triggers/ # TriggerDefinition
│ │ ├── transform_data.py
│ │ ├── slack_post.py
│ │ ├── send_email.py
│ │ ├── notification.py
│ │ └── (more in Phase 5: podcast_generation, report_generation, ...)
│ ├── triggers/
│ │ ├── registry.py
│ │ ├── schedule.py # Celery Beat hookup
│ │ ├── webhook.py # /fire endpoint
│ │ └── event.py # subscribes to domain_events
│ ├── capabilities/
│ │ ├── registry.py
│ │ ├── native.py # native capability registrations
│ │ ├── mcp_harvester.py # registers MCP tools as capabilities (Phase 4)
│ │ └── (LLM ops registered alongside)
│ └── nl/ # Phase 1 — primary user path │ └── nl/ # Phase 1 — primary user path
│ ├── generator.py # Generator LLM │ ├── generator.py # Generator LLM
│ ├── reviewer.py # Review LLM (summary + flagged items) │ ├── reviewer.py # Review LLM (summary + flagged items)
@ -1070,23 +1022,22 @@ automations in natural language.
**Step 1 (current scope, this batch of commits):** **Step 1 (current scope, this batch of commits):**
- 3 tables (`automations`, `automation_triggers`, `automation_runs`) + - 3 tables (`automations`, `automation_triggers`, `automation_runs`) +
Alembic migration Alembic migration
- Empty Capability, Action, Trigger registries (concrete entries land in - Empty action and trigger registries under
later steps when the consuming feature lands) `app/automations/registries/` (concrete entries land in later steps)
- Pydantic schemas for the automation definition envelope, the two v1 - Pydantic schemas for the automation definition envelope, the two v1
trigger configs (`schedule`, `manual`), and the one v1 action config trigger params shapes (`schedule`, `manual`), and the one v1 action
(`agent_task`) params shape (`agent_task`)
- Module structure under `app/automations/` (data/, schemas/, - Module structure under `app/automations/` (persistence/, schemas/,
registries/), fully isolated from the existing codebase registries/), fully isolated from the existing codebase
**Step 2:** **Step 2:**
- Register the `agent_task` action and the `schedule` / `manual` - The `agent_task` action handler and the `schedule` / `manual` triggers
triggers in the registries registered in `app/automations/registries/`. Tool resolution for
- Capability registry populated with native deliverable-producing `agent_task.params.tools` is opaque to the contract — the handler
capabilities (chosen when this step starts) decides what string identifiers it accepts and how they resolve.
**Step 3:** **Step 3:**
- Executor (single-queue Celery task) with retries, timeouts, budget - Executor (single-queue Celery task) with retries and timeouts
caps measured against `cost_usd` ledger on the run
- Template engine (Jinja sandbox + the v1 filter allowlist + runtime - Template engine (Jinja sandbox + the v1 filter allowlist + runtime
limits) limits)
- Manual "Run now" endpoint - Manual "Run now" endpoint
@ -1122,19 +1073,23 @@ somewhere humans see, complex pipelines have proper error handling.
**After Phase 3**: NL authoring is the polished primary surface; edit **After Phase 3**: NL authoring is the polished primary surface; edit
flows are conversational rather than form-only. flows are conversational rather than form-only.
### Phase 4 — Event triggers ### Phase 4 — Event triggers + integration tooling
- `domain_events` table and `events.py` module - `domain_events` table and `events.py` module
- Indexing pipeline publishes `connector.*` events (smallest change — just - Indexing pipeline publishes `connector.*` events (smallest change — just
add publish calls to the existing flow) add publish calls to the existing flow)
- Automations publish `automation.run.*` events on completion - Automations publish `automation.run.*` events on completion
- `event` trigger with filter grammar - `event` trigger with filter grammar
- MCP capability harvester (so MCP-backed events and tools both work) - The unification layer redesign (see §3) — `CallContext`, scope
declarations, per-user authorization gating
- MCP integration on top of the unification layer (external tool servers
harvested into the shared catalog)
**After Phase 4**: "do X when Y happens" automations work, including **After Phase 4**: "do X when Y happens" automations work, including
automation-chaining through events. automation-chaining through events; external MCP tools and SurfSense
actions share one vocabulary.
### Phase 5 — Wrapping existing features and sharing ### Phase 5 — Wrapping existing features and sharing
- Wrap existing SurfSense capabilities as actions: `podcast_generation`, - Wrap existing SurfSense features as actions: `podcast_generation`,
`report_generation`, `indexing_sweep` `report_generation`, `indexing_sweep`
- Artifact lifecycle implementation - Artifact lifecycle implementation
- `expected_duration_seconds` based queue routing (split `automations_long` - `expected_duration_seconds` based queue routing (split `automations_long`
@ -1144,7 +1099,7 @@ automation-chaining through events.
shift documented in §7.4's pre-Phase-5 gate shift documented in §7.4's pre-Phase-5 gate
- Cross-automation composition examples in the docs - Cross-automation composition examples in the docs
**After Phase 5**: every existing SurfSense capability is automatable **After Phase 5**: every existing SurfSense feature is automatable
without any per-feature code, and automations can be shared between without any per-feature code, and automations can be shared between
SearchSpaces and users. SearchSpaces and users.
@ -1156,13 +1111,12 @@ For reference — every decision made through the design process, in one
place. place.
### Foundations ### Foundations
1. ✅ JSON Schema 2020-12 is the single schema language for everything 1. ✅ JSON Schema (draft 2020-12) is the single schema language for everything
2. ✅ Definition is the program; infrastructure is the interpreter 2. ✅ Definition is the program; infrastructure is the interpreter
3. ✅ List of steps (not single action) in the plan, with `output_as` chaining 3. ✅ List of steps (not single action) in the plan, with `output_as` chaining
4. ✅ One capability registry serving native + MCP + LLM operations through the same interface 4. ⏸ Capability unification layer (one catalog shared by automations, agents, and future surfaces) — **deferred to post-v1** (see §3). v1 ships actions only.
5. ✅ Capability IDs do not leak handler kind (`slack.post_message`, not `mcp.slack.post_message`) 5. ✅ Name-based resolution: definitions reference action and trigger types by string ID. The registry is the runtime's vocabulary; lookup is a dict access. No code references in definitions.
6. ✅ Name-based resolution: definitions reference actions and capabilities by string ID. The registry is the runtime's vocabulary; lookup is a dict access. No code references in definitions. 6. ✅ The expressive spectrum runs from pure direct calls to broad agent_task; the NL generator proposes the cheapest shape that meets intent (Shape 6 from §4 by default)
7. ✅ The expressive spectrum runs from pure direct calls to broad agent_task; the NL generator proposes the cheapest shape that meets intent (Shape 6 from §4 by default)
### Trigger taxonomy ### Trigger taxonomy
8. ✅ Three trigger types: `schedule`, `webhook`, `event` 8. ✅ Three trigger types: `schedule`, `webhook`, `event`
@ -1183,7 +1137,7 @@ place.
19. ✅ No DAGs, no parallelism, no loops — composition via agent_task or events 19. ✅ No DAGs, no parallelism, no loops — composition via agent_task or events
20. ✅ `on_failure` part of execution policy from v1 20. ✅ `on_failure` part of execution policy from v1
21. ✅ Step-level retry and timeout overrides 21. ✅ Step-level retry and timeout overrides
22. ✅ Budget cap enforced pre-enqueue and mid-flight 22. ⏸ Budget cap enforced pre-enqueue and mid-flight — **deferred** until the cost ledger ships (see §8 Budget enforcement)
### Components ### Components
23. ✅ Dispatcher / executor / handlers / registry — distinct, each replaceable 23. ✅ Dispatcher / executor / handlers / registry — distinct, each replaceable
@ -1197,25 +1151,22 @@ place.
29. ✅ Automations publish run events for composability 29. ✅ Automations publish run events for composability
30. ✅ Publish/subscribe behind interface — no direct table access elsewhere 30. ✅ Publish/subscribe behind interface — no direct table access elsewhere
### Capability storage ### Capability unification — all deferred to post-v1
31. ✅ Native capabilities registered in-memory at startup from the codebase. Identical across all workers. 31. ⏸ One shared catalog of "things this SurfSense instance can do" — **deferred**, see §3
32. ⏸ MCP capability metadata persisted in `mcp_connections` and `mcp_tools` tables — **deferred to Phase 4** 32. ⏸ Handler `CallContext` (caller user id, search space id, run id) — **deferred** with unification
33. ⏸ MCP handler closures built lazily per worker from database state — **deferred to Phase 4** 33. ⏸ Per-capability scope declarations driving authorization — **deferred** with unification
34. ⏸ MCP server tool list re-harvested on a schedule — **deferred to Phase 4** 34. ⏸ MCP integration on top of the unification layer (`mcp_connections`, `mcp_tools`, harvester) — **deferred to Phase 4**
35. ⏸ MCP tools harvested into the capability registry at connection time — **deferred to Phase 4**
36. ⏸ Side effects inferred from MCP hints + naming + admin overrides — **deferred to Phase 4**
37. ⏸ MCP tools callable directly (no agent required) when caller knows args — **deferred to Phase 4**
### Credentials — all deferred to Phase 2 ### Credentials — all deferred to Phase 2
38. ⏸ Credentials never appear in the automation definition — only connection IDs do — **Phase 2** 35. ⏸ Credentials never appear in the automation definition — only connection IDs do — **Phase 2**
39. ⏸ Credentials never appear in the LLM's context — the host holds them — **Phase 2** 36. ⏸ Credentials never appear in the LLM's context — the host holds them — **Phase 2**
40. ⏸ Credentials resolved per-call by `ActionContext`, not pre-loaded into worker environment — **Phase 2** 37. ⏸ Credentials resolved per-call by the handler context, not pre-loaded into worker environment — **Phase 2**
41. ⏸ Tokens encrypted at rest; refresh handled automatically by `ActionContext.resolve_*_client` — **Phase 2** 38. ⏸ Tokens encrypted at rest; refresh handled automatically by the handler context — **Phase 2**
### v1-minimum (new lock) ### v1-minimum
v1. ✅ `Capability` is exactly five fields: `id`, `description`, `input_schema`, `output_schema`, `handler`. Additional fields are added only when a concrete consumer feature requires them. 39. ✅ v1 ships actions only — no separate capability layer. `ActionDefinition` is five fields: `type`, `name`, `description`, `params_schema`, `handler`. Additional fields are added only when a concrete consumer feature requires them.
v2. ✅ Cost is **measured** from a per-run ledger, not declared. Pre-flight cost checks return when the ledger has enough history. 40. ✅ Cost is **measured** from a per-run ledger, not declared. Pre-flight cost checks return when the ledger has enough history.
v3. ✅ Single `automations_default` Celery queue in v1. Multi-queue routing returns when load justifies it. 41. ✅ Single `automations_default` Celery queue in v1. Multi-queue routing returns when load justifies it.
### NL authoring ### NL authoring
42. ✅ LLM-authored templates is the primary path from day one — not a Phase 3 addition. Hand-authoring JSON is supported but secondary 42. ✅ LLM-authored templates is the primary path from day one — not a Phase 3 addition. Hand-authoring JSON is supported but secondary
@ -1227,7 +1178,7 @@ v3. ✅ Single `automations_default` Celery queue in v1. Multi-queue routing ret
48. ✅ NL drafts are transient storage, not a core table 48. ✅ NL drafts are transient storage, not a core table
### Data model ### Data model
49. ✅ Six tables total — four for engine state, two for MCP persistence 49. ✅ v1 ships three tables (`automations`, `automation_triggers`, `automation_runs`). `domain_events` lands in Phase 3; `mcp_connections` and `mcp_tools` in Phase 4.
50. ✅ Run rows snapshot the definition (immutable history) 50. ✅ Run rows snapshot the definition (immutable history)
51. ✅ All entities scoped by `search_space_id` for RBAC 51. ✅ All entities scoped by `search_space_id` for RBAC
52. ✅ Editing an automation bumps `version`; existing runs unaffected 52. ✅ Editing an automation bumps `version`; existing runs unaffected
@ -1283,7 +1234,7 @@ Schemas spelled out concretely. Those follow mechanically from this plan.
agent (cron and skills subsystems); n8n documentation on node types and agent (cron and skills subsystems); n8n documentation on node types and
workflow data model; the SurfSense repository and DeepWiki architecture workflow data model; the SurfSense repository and DeepWiki architecture
notes (FastAPI + Celery Beat + Electric SQL + LangGraph Deep Agents + notes (FastAPI + Celery Beat + Electric SQL + LangGraph Deep Agents +
Search Space RBAC); Model Context Protocol specification for capability Search Space RBAC); Model Context Protocol specification for external
harvesting; AWS EventBridge for filter grammar; workflow-pattern tool harvesting; AWS EventBridge for filter grammar; workflow-pattern
literature (van der Aalst et al.) for the trigger / action / concurrency literature (van der Aalst et al.) for the trigger / action / concurrency
vocabulary.* vocabulary.*

View file

@ -87,6 +87,7 @@ def upgrade() -> None:
REFERENCES automations(id) ON DELETE CASCADE, REFERENCES automations(id) ON DELETE CASCADE,
type automation_trigger_type NOT NULL, type automation_trigger_type NOT NULL,
params JSONB NOT NULL, params JSONB NOT NULL,
static_inputs JSONB NOT NULL DEFAULT '{}'::jsonb,
enabled BOOLEAN NOT NULL DEFAULT true, enabled BOOLEAN NOT NULL DEFAULT true,
last_fired_at TIMESTAMP WITH TIME ZONE, last_fired_at TIMESTAMP WITH TIME ZONE,
next_fire_at TIMESTAMP WITH TIME ZONE, next_fire_at TIMESTAMP WITH TIME ZONE,
@ -129,8 +130,7 @@ def upgrade() -> None:
REFERENCES automation_triggers(id) ON DELETE SET NULL, REFERENCES automation_triggers(id) ON DELETE SET NULL,
status automation_run_status NOT NULL DEFAULT 'pending', status automation_run_status NOT NULL DEFAULT 'pending',
definition_snapshot JSONB NOT NULL, definition_snapshot JSONB NOT NULL,
trigger_payload JSONB, inputs JSONB NOT NULL DEFAULT '{}'::jsonb,
resolved_inputs JSONB NOT NULL DEFAULT '{}'::jsonb,
step_results JSONB NOT NULL DEFAULT '[]'::jsonb, step_results JSONB NOT NULL DEFAULT '[]'::jsonb,
output JSONB, output JSONB,
artifacts JSONB NOT NULL DEFAULT '[]'::jsonb, artifacts JSONB NOT NULL DEFAULT '[]'::jsonb,

View file

@ -11,7 +11,7 @@ AGENT_TASK_ACTION = ActionDefinition(
type="agent_task", type="agent_task",
name="Agent task", name="Agent task",
description="Run a multi_agent_chat turn from an automation step.", description="Run a multi_agent_chat turn from an automation step.",
params_schema=AgentTaskActionParams.model_json_schema(), params_model=AgentTaskActionParams,
build_handler=build_handler, build_handler=build_handler,
) )

View file

@ -7,6 +7,7 @@ from dataclasses import dataclass
from typing import Any from typing import Any
from uuid import UUID from uuid import UUID
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@ -30,5 +31,10 @@ class ActionDefinition:
type: str type: str
name: str name: str
description: str description: str
params_schema: dict[str, Any] params_model: type[BaseModel]
build_handler: ActionHandlerFactory build_handler: ActionHandlerFactory
@property
def params_schema(self) -> dict[str, Any]:
"""JSON Schema (draft 2020-12) derived from ``params_model``."""
return self.params_model.model_json_schema()

View file

@ -5,8 +5,12 @@ from __future__ import annotations
from fastapi import APIRouter from fastapi import APIRouter
from .automation import router as automation_router from .automation import router as automation_router
from .run import router as run_router
from .trigger import router as trigger_router
router = APIRouter() router = APIRouter()
router.include_router(automation_router) router.include_router(automation_router)
router.include_router(trigger_router)
router.include_router(run_router)
__all__ = ["router"] __all__ = ["router"]

View file

@ -1,23 +1,80 @@
"""Routes for the ``Automation`` resource.""" """HTTP routes for the ``Automation`` resource."""
from __future__ import annotations from __future__ import annotations
from typing import Any from fastapi import APIRouter, Depends, Query, status
from fastapi import APIRouter, Body, Depends from app.automations.schemas.api import (
AutomationCreate,
from app.automations.api.schemas import RunDispatched AutomationDetail,
AutomationList,
AutomationSummary,
AutomationUpdate,
)
from app.automations.services import AutomationService, get_automation_service from app.automations.services import AutomationService, get_automation_service
router = APIRouter() router = APIRouter()
@router.post("/automations/{automation_id}/run", response_model=RunDispatched) @router.post(
async def run_automation_now( "/automations",
automation_id: int, response_model=AutomationDetail,
payload: dict[str, Any] | None = Body(default=None), status_code=status.HTTP_201_CREATED,
)
async def create_automation(
payload: AutomationCreate,
service: AutomationService = Depends(get_automation_service), service: AutomationService = Depends(get_automation_service),
) -> RunDispatched: ) -> AutomationDetail:
"""Fire a manual run.""" """Create an automation, optionally with initial triggers (atomic)."""
run = await service.run_now(automation_id=automation_id, payload=payload) automation = await service.create(payload)
return RunDispatched(run_id=run.id, status=run.status) return AutomationDetail.model_validate(automation)
@router.get("/automations", response_model=AutomationList)
async def list_automations(
search_space_id: int = Query(...),
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
service: AutomationService = Depends(get_automation_service),
) -> AutomationList:
"""List automations in a search space."""
items, total = await service.list(
search_space_id=search_space_id, limit=limit, offset=offset
)
return AutomationList(
items=[AutomationSummary.model_validate(a) for a in items],
total=total,
)
@router.get("/automations/{automation_id}", response_model=AutomationDetail)
async def get_automation(
automation_id: int,
service: AutomationService = Depends(get_automation_service),
) -> AutomationDetail:
"""Get one automation with its definition and triggers."""
automation = await service.get(automation_id)
return AutomationDetail.model_validate(automation)
@router.patch("/automations/{automation_id}", response_model=AutomationDetail)
async def update_automation(
automation_id: int,
patch: AutomationUpdate,
service: AutomationService = Depends(get_automation_service),
) -> AutomationDetail:
"""Partially update an automation. Triggers are managed separately."""
automation = await service.update(automation_id, patch)
return AutomationDetail.model_validate(automation)
@router.delete(
"/automations/{automation_id}",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_automation(
automation_id: int,
service: AutomationService = Depends(get_automation_service),
) -> None:
"""Delete an automation; triggers and runs are removed by FK cascade."""
await service.delete(automation_id)

View file

@ -0,0 +1,71 @@
"""HTTP routes for automation runs (dispatch + history)."""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Body, Depends, Query, status
from app.automations.schemas.api import (
RunDetail,
RunDispatched,
RunList,
RunSummary,
)
from app.automations.services import RunService, get_run_service
router = APIRouter()
@router.post(
"/automations/{automation_id}/run",
response_model=RunDispatched,
status_code=status.HTTP_202_ACCEPTED,
)
async def run_automation_now(
automation_id: int,
inputs: dict[str, Any] | None = Body(default=None),
service: RunService = Depends(get_run_service),
) -> RunDispatched:
"""Fire a manual run.
``inputs`` is the runtime payload supplied by the caller; it is merged with
the manual trigger's ``static_inputs`` (static wins) and validated against
the automation's input schema.
"""
run = await service.dispatch_manual(automation_id=automation_id, runtime_inputs=inputs)
return RunDispatched(run_id=run.id, status=run.status)
@router.get(
"/automations/{automation_id}/runs",
response_model=RunList,
)
async def list_runs(
automation_id: int,
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
service: RunService = Depends(get_run_service),
) -> RunList:
"""List run history for an automation, newest first."""
items, total = await service.list(
automation_id=automation_id, limit=limit, offset=offset
)
return RunList(
items=[RunSummary.model_validate(r) for r in items],
total=total,
)
@router.get(
"/automations/{automation_id}/runs/{run_id}",
response_model=RunDetail,
)
async def get_run(
automation_id: int,
run_id: int,
service: RunService = Depends(get_run_service),
) -> RunDetail:
"""Get the full record of a single run, including step results and artifacts."""
run = await service.get(automation_id=automation_id, run_id=run_id)
return RunDetail.model_validate(run)

View file

@ -0,0 +1,55 @@
"""HTTP routes for triggers attached to an automation."""
from __future__ import annotations
from fastapi import APIRouter, Depends, status
from app.automations.schemas.api import TriggerCreate, TriggerDetail, TriggerUpdate
from app.automations.services import TriggerService, get_trigger_service
router = APIRouter()
@router.post(
"/automations/{automation_id}/triggers",
response_model=TriggerDetail,
status_code=status.HTTP_201_CREATED,
)
async def add_trigger(
automation_id: int,
payload: TriggerCreate,
service: TriggerService = Depends(get_trigger_service),
) -> TriggerDetail:
"""Attach a new trigger to an automation."""
trigger = await service.add(automation_id=automation_id, payload=payload)
return TriggerDetail.model_validate(trigger)
@router.patch(
"/automations/{automation_id}/triggers/{trigger_id}",
response_model=TriggerDetail,
)
async def update_trigger(
automation_id: int,
trigger_id: int,
patch: TriggerUpdate,
service: TriggerService = Depends(get_trigger_service),
) -> TriggerDetail:
"""Toggle ``enabled`` or replace ``params``. Trigger type is immutable."""
trigger = await service.update(
automation_id=automation_id, trigger_id=trigger_id, patch=patch
)
return TriggerDetail.model_validate(trigger)
@router.delete(
"/automations/{automation_id}/triggers/{trigger_id}",
status_code=status.HTTP_204_NO_CONTENT,
)
async def remove_trigger(
automation_id: int,
trigger_id: int,
service: TriggerService = Depends(get_trigger_service),
) -> None:
"""Detach a trigger from an automation."""
await service.remove(automation_id=automation_id, trigger_id=trigger_id)

View file

@ -22,10 +22,14 @@ async def dispatch_run(
session: AsyncSession, session: AsyncSession,
automation: Automation, automation: Automation,
trigger: AutomationTrigger, trigger: AutomationTrigger,
payload: dict[str, Any] | None, runtime_inputs: dict[str, Any] | None = None,
) -> AutomationRun: ) -> AutomationRun:
"""Validate, snapshot the definition, persist an ``AutomationRun``, enqueue execution. """Validate, snapshot the definition, persist an ``AutomationRun``, enqueue execution.
Final inputs = ``trigger.static_inputs`` merged with ``runtime_inputs``,
static winning on key collision. The merged dict is validated against
``automation.definition.inputs.schema_`` and stored on the run.
Callers (trigger-specific adapters) are responsible for resolving Callers (trigger-specific adapters) are responsible for resolving
``automation`` and ``trigger`` and for the trigger-side ``ACTIVE`` / ``automation`` and ``trigger`` and for the trigger-side ``ACTIVE`` /
``enabled`` guards. This function only handles what's identical across ``enabled`` guards. This function only handles what's identical across
@ -36,7 +40,8 @@ async def dispatch_run(
except Exception as exc: except Exception as exc:
raise DispatchError(f"invalid automation definition: {exc}") from exc raise DispatchError(f"invalid automation definition: {exc}") from exc
resolved_inputs = _validate_inputs(definition, payload or {}) merged_inputs = {**(runtime_inputs or {}), **(trigger.static_inputs or {})}
validated_inputs = _validate_inputs(definition, merged_inputs)
snapshot = definition.model_dump(mode="json", by_alias=True) snapshot = definition.model_dump(mode="json", by_alias=True)
run = AutomationRun( run = AutomationRun(
@ -44,8 +49,7 @@ async def dispatch_run(
trigger_id=trigger.id, trigger_id=trigger.id,
status=RunStatus.PENDING, status=RunStatus.PENDING,
definition_snapshot=snapshot, definition_snapshot=snapshot,
trigger_payload=payload, inputs=validated_inputs,
resolved_inputs=resolved_inputs,
step_results=[], step_results=[],
artifacts=[], artifacts=[],
) )
@ -61,12 +65,12 @@ async def dispatch_run(
def _validate_inputs( def _validate_inputs(
definition: AutomationDefinition, payload: dict[str, Any] definition: AutomationDefinition, inputs: dict[str, Any]
) -> dict[str, Any]: ) -> dict[str, Any]:
if definition.inputs is None or not definition.inputs.schema_: if definition.inputs is None or not definition.inputs.schema_:
return {} return {}
try: try:
jsonschema.validate(instance=payload, schema=definition.inputs.schema_) jsonschema.validate(instance=inputs, schema=definition.inputs.schema_)
except jsonschema.ValidationError as exc: except jsonschema.ValidationError as exc:
raise DispatchError(f"inputs: {exc.message}") from exc raise DispatchError(f"inputs: {exc.message}") from exc
return payload return inputs

View file

@ -45,8 +45,9 @@ class AutomationRun(BaseModel, TimestampMixin):
# locked at fire time so historical runs always show the exact code path # locked at fire time so historical runs always show the exact code path
definition_snapshot = Column(JSONB, nullable=False) definition_snapshot = Column(JSONB, nullable=False)
trigger_payload = Column(JSONB, nullable=True) # merged & validated inputs the run was dispatched with
resolved_inputs = Column(JSONB, nullable=False, server_default="{}") # (trigger.static_inputs producer runtime data, static wins on collision)
inputs = Column(JSONB, nullable=False, server_default="{}")
# one entry per executed step; agent_task entries carry their own # one entry per executed step; agent_task entries carry their own
# `agent_session_id` inside their entry # `agent_session_id` inside their entry
step_results = Column(JSONB, nullable=False, server_default="[]") step_results = Column(JSONB, nullable=False, server_default="[]")

View file

@ -36,6 +36,10 @@ class AutomationTrigger(BaseModel, TimestampMixin):
params = Column(JSONB, nullable=False) params = Column(JSONB, nullable=False)
# Per-attachment domain values merged into every dispatched run's inputs.
# Static wins over runtime data on key collision.
static_inputs = Column(JSONB, nullable=False, server_default="{}")
enabled = Column( enabled = Column(
Boolean, Boolean,
nullable=False, nullable=False,

View file

@ -106,7 +106,7 @@ def _build_template_ctx(run: AutomationRun, step_outputs: dict[str, Any]) -> dic
trigger_type=trigger.type.value if trigger else None, trigger_type=trigger.type.value if trigger else None,
started_at=run.started_at, started_at=run.started_at,
attempt=1, attempt=1,
resolved_inputs=run.resolved_inputs or {}, inputs=run.inputs or {},
step_outputs=step_outputs, step_outputs=step_outputs,
) )

View file

@ -28,8 +28,7 @@ class RunDetail(RunSummary):
"""Full run view including snapshot, results and artifacts.""" """Full run view including snapshot, results and artifacts."""
definition_snapshot: dict[str, Any] definition_snapshot: dict[str, Any]
trigger_payload: dict[str, Any] | None = None inputs: dict[str, Any]
resolved_inputs: dict[str, Any]
step_results: list[dict[str, Any]] step_results: list[dict[str, Any]]
output: dict[str, Any] | None = None output: dict[str, Any] | None = None
artifacts: list[dict[str, Any]] artifacts: list[dict[str, Any]]

View file

@ -17,6 +17,7 @@ class TriggerCreate(BaseModel):
type: TriggerType type: TriggerType
params: dict[str, Any] = Field(default_factory=dict) params: dict[str, Any] = Field(default_factory=dict)
static_inputs: dict[str, Any] = Field(default_factory=dict)
enabled: bool = True enabled: bool = True
@ -27,6 +28,7 @@ class TriggerUpdate(BaseModel):
enabled: bool | None = None enabled: bool | None = None
params: dict[str, Any] | None = None params: dict[str, Any] | None = None
static_inputs: dict[str, Any] | None = None
class TriggerDetail(BaseModel): class TriggerDetail(BaseModel):
@ -37,6 +39,7 @@ class TriggerDetail(BaseModel):
id: int id: int
type: TriggerType type: TriggerType
params: dict[str, Any] params: dict[str, Any]
static_inputs: dict[str, Any]
enabled: bool enabled: bool
last_fired_at: datetime | None = None last_fired_at: datetime | None = None
next_fire_at: datetime | None = None next_fire_at: datetime | None = None

View file

@ -1,7 +1,16 @@
"""Service layer for the automations feature.""" """Services for the automations HTTP layer (one service per resource)."""
from __future__ import annotations from __future__ import annotations
from .automation import AutomationService, get_automation_service from .automation import AutomationService, get_automation_service
from .run import RunService, get_run_service
from .trigger import TriggerService, get_trigger_service
__all__ = ["AutomationService", "get_automation_service"] __all__ = [
"AutomationService",
"RunService",
"TriggerService",
"get_automation_service",
"get_run_service",
"get_trigger_service",
]

View file

@ -2,54 +2,111 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from datetime import UTC, datetime
from fastapi import Depends, HTTPException from fastapi import Depends, HTTPException
from pydantic import ValidationError
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.automations.dispatch import DispatchError from app.automations.schemas.api import (
AutomationCreate,
AutomationUpdate,
TriggerCreate,
)
from app.automations.persistence.enums.trigger_type import TriggerType
from app.automations.persistence.models.automation import Automation from app.automations.persistence.models.automation import Automation
from app.automations.persistence.models.run import AutomationRun from app.automations.persistence.models.trigger import AutomationTrigger
from app.automations.triggers.manual import dispatch_manual_run from app.automations.triggers import get_trigger
from app.automations.triggers.schedule import compute_next_fire_at
from app.db import Permission, User, get_async_session from app.db import Permission, User, get_async_session
from app.users import current_active_user from app.users import current_active_user
from app.utils.rbac import check_permission from app.utils.rbac import check_permission
class AutomationService: class AutomationService:
"""Service for the ``Automation`` resource.""" """Lifecycle of the ``Automation`` resource."""
def __init__(self, *, session: AsyncSession, user: User) -> None: def __init__(self, *, session: AsyncSession, user: User) -> None:
self.session = session self.session = session
self.user = user self.user = user
async def run_now( async def create(self, payload: AutomationCreate) -> Automation:
"""Create an automation and its initial triggers in one transaction."""
await self._authorize(payload.search_space_id, Permission.AUTOMATIONS_CREATE.value)
automation = Automation(
search_space_id=payload.search_space_id,
created_by_user_id=self.user.id,
name=payload.name,
description=payload.description,
definition=payload.definition.model_dump(mode="json", by_alias=True),
version=1,
)
for spec in payload.triggers:
automation.triggers.append(_build_trigger(spec))
self.session.add(automation)
await self.session.commit()
return await self._get_with_triggers_or_raise(automation.id)
async def list(
self, self,
*, *,
automation_id: int, search_space_id: int,
payload: dict[str, Any] | None, limit: int,
) -> AutomationRun: offset: int,
"""Fire a manual run for ``automation_id``.""" ) -> tuple[list[Automation], int]:
automation = await self._get_automation_or_raise(automation_id) """Return a page of automations and the total count."""
await check_permission( await self._authorize(search_space_id, Permission.AUTOMATIONS_READ.value)
self.session,
self.user, base = select(Automation).where(Automation.search_space_id == search_space_id)
automation.search_space_id, total = await self.session.scalar(
Permission.AUTOMATIONS_EXECUTE.value, select(func.count()).select_from(base.subquery())
"You don't have permission to execute automations in this search space",
) )
try: rows = (
return await dispatch_manual_run( await self.session.execute(
session=self.session, base.order_by(Automation.created_at.desc()).limit(limit).offset(offset)
automation_id=automation_id,
payload=payload,
) )
except DispatchError as exc: ).scalars().all()
raise HTTPException(status_code=422, detail=str(exc)) from exc return list(rows), int(total or 0)
async def _get_automation_or_raise(self, automation_id: int) -> Automation: async def get(self, automation_id: int) -> Automation:
"""Get the automation by id; 404 if missing.""" """Get an automation with its triggers loaded."""
automation = await self._get_with_triggers_or_raise(automation_id)
await self._authorize(automation.search_space_id, Permission.AUTOMATIONS_READ.value)
return automation
async def update(self, automation_id: int, patch: AutomationUpdate) -> Automation:
"""Patch fields. Bumps ``version`` when ``definition`` changes."""
automation = await self._get_with_triggers_or_raise(automation_id)
await self._authorize(automation.search_space_id, Permission.AUTOMATIONS_UPDATE.value)
data = patch.model_dump(exclude_unset=True)
if "name" in data:
automation.name = data["name"]
if "description" in data:
automation.description = data["description"]
if "status" in data:
automation.status = data["status"]
if "definition" in data:
automation.definition = patch.definition.model_dump(mode="json", by_alias=True)
automation.version += 1
await self.session.commit()
return await self._get_with_triggers_or_raise(automation_id)
async def delete(self, automation_id: int) -> None:
"""Delete an automation; FK cascades remove triggers and runs."""
automation = await self._get_or_raise(automation_id)
await self._authorize(automation.search_space_id, Permission.AUTOMATIONS_DELETE.value)
await self.session.delete(automation)
await self.session.commit()
async def _get_or_raise(self, automation_id: int) -> Automation:
automation = await self.session.get(Automation, automation_id) automation = await self.session.get(Automation, automation_id)
if automation is None: if automation is None:
raise HTTPException( raise HTTPException(
@ -57,6 +114,56 @@ class AutomationService:
) )
return automation return automation
async def _get_with_triggers_or_raise(self, automation_id: int) -> Automation:
stmt = (
select(Automation)
.where(Automation.id == automation_id)
.options(selectinload(Automation.triggers))
)
automation = (await self.session.execute(stmt)).scalar_one_or_none()
if automation is None:
raise HTTPException(
status_code=404, detail=f"automation {automation_id} not found"
)
return automation
async def _authorize(self, search_space_id: int, permission: str) -> None:
await check_permission(
self.session,
self.user,
search_space_id,
permission,
f"You don't have permission to {permission.split(':')[1]} automations in this search space",
)
def _build_trigger(spec: TriggerCreate) -> AutomationTrigger:
"""Validate trigger params via its registered Pydantic model and build the ORM row."""
definition = get_trigger(spec.type.value)
if definition is None:
raise HTTPException(status_code=422, detail=f"unknown trigger type {spec.type.value!r}")
try:
validated = definition.params_model.model_validate(spec.params)
except ValidationError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
params = validated.model_dump(mode="json")
next_fire_at = None
if spec.type == TriggerType.SCHEDULE and spec.enabled:
next_fire_at = compute_next_fire_at(
params["cron"], params["timezone"], after=datetime.now(UTC)
)
return AutomationTrigger(
type=spec.type,
params=params,
static_inputs=spec.static_inputs,
enabled=spec.enabled,
next_fire_at=next_fire_at,
)
def get_automation_service( def get_automation_service(
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),

View file

@ -0,0 +1,93 @@
"""``RunService`` — dispatch and history of automation runs."""
from __future__ import annotations
from typing import Any
from fastapi import Depends, HTTPException
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.automations.dispatch import DispatchError
from app.automations.persistence.models.automation import Automation
from app.automations.persistence.models.run import AutomationRun
from app.automations.triggers.manual import dispatch_manual_run
from app.db import Permission, User, get_async_session
from app.users import current_active_user
from app.utils.rbac import check_permission
class RunService:
"""Lifecycle of the ``AutomationRun`` resource."""
def __init__(self, *, session: AsyncSession, user: User) -> None:
self.session = session
self.user = user
async def dispatch_manual(
self,
*,
automation_id: int,
runtime_inputs: dict[str, Any] | None,
) -> AutomationRun:
"""Fire a manual run via the registered manual trigger."""
await self._authorize(automation_id, Permission.AUTOMATIONS_EXECUTE.value)
try:
return await dispatch_manual_run(
session=self.session,
automation_id=automation_id,
runtime_inputs=runtime_inputs,
)
except DispatchError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
async def list(
self,
*,
automation_id: int,
limit: int,
offset: int,
) -> tuple[list[AutomationRun], int]:
"""Return a page of runs for an automation, newest first."""
await self._authorize(automation_id, Permission.AUTOMATIONS_READ.value)
base = select(AutomationRun).where(AutomationRun.automation_id == automation_id)
total = await self.session.scalar(
select(func.count()).select_from(base.subquery())
)
rows = (
await self.session.execute(
base.order_by(AutomationRun.created_at.desc()).limit(limit).offset(offset)
)
).scalars().all()
return list(rows), int(total or 0)
async def get(self, *, automation_id: int, run_id: int) -> AutomationRun:
await self._authorize(automation_id, Permission.AUTOMATIONS_READ.value)
run = await self.session.get(AutomationRun, run_id)
if run is None or run.automation_id != automation_id:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
return run
async def _authorize(self, automation_id: int, permission: str) -> Automation:
automation = await self.session.get(Automation, automation_id)
if automation is None:
raise HTTPException(
status_code=404, detail=f"automation {automation_id} not found"
)
await check_permission(
self.session,
self.user,
automation.search_space_id,
permission,
f"You don't have permission to {permission.split(':')[1]} automations in this search space",
)
return automation
def get_run_service(
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
) -> RunService:
return RunService(session=session, user=user)

View file

@ -0,0 +1,143 @@
"""``TriggerService`` — lifecycle of triggers attached to an automation."""
from __future__ import annotations
from datetime import UTC, datetime
from fastapi import Depends, HTTPException
from pydantic import ValidationError
from sqlalchemy.ext.asyncio import AsyncSession
from app.automations.schemas.api import TriggerCreate, TriggerUpdate
from app.automations.persistence.enums.trigger_type import TriggerType
from app.automations.persistence.models.automation import Automation
from app.automations.persistence.models.trigger import AutomationTrigger
from app.automations.triggers import get_trigger
from app.automations.triggers.schedule import compute_next_fire_at
from app.db import Permission, User, get_async_session
from app.users import current_active_user
from app.utils.rbac import check_permission
class TriggerService:
"""Lifecycle of the ``AutomationTrigger`` sub-resource."""
def __init__(self, *, session: AsyncSession, user: User) -> None:
self.session = session
self.user = user
async def add(
self, *, automation_id: int, payload: TriggerCreate
) -> AutomationTrigger:
automation = await self._authorize_automation(
automation_id, Permission.AUTOMATIONS_UPDATE.value
)
validated_params = _validate_params(payload.type, payload.params)
trigger = AutomationTrigger(
automation_id=automation.id,
type=payload.type,
params=validated_params,
static_inputs=payload.static_inputs,
enabled=payload.enabled,
next_fire_at=_initial_next_fire(payload.type, validated_params, payload.enabled),
)
self.session.add(trigger)
await self.session.commit()
await self.session.refresh(trigger)
return trigger
async def update(
self,
*,
automation_id: int,
trigger_id: int,
patch: TriggerUpdate,
) -> AutomationTrigger:
await self._authorize_automation(automation_id, Permission.AUTOMATIONS_UPDATE.value)
trigger = await self._get_trigger_or_raise(automation_id, trigger_id)
data = patch.model_dump(exclude_unset=True)
if "params" in data:
trigger.params = _validate_params(trigger.type, data["params"])
if "static_inputs" in data:
trigger.static_inputs = data["static_inputs"]
if "enabled" in data:
trigger.enabled = data["enabled"]
# Recompute next_fire_at when schedule timing changed or the trigger was
# toggled back on. Manual triggers always have NULL next_fire_at.
if trigger.type == TriggerType.SCHEDULE:
trigger.next_fire_at = _initial_next_fire(
trigger.type, trigger.params, trigger.enabled
)
await self.session.commit()
await self.session.refresh(trigger)
return trigger
async def remove(self, *, automation_id: int, trigger_id: int) -> None:
await self._authorize_automation(automation_id, Permission.AUTOMATIONS_UPDATE.value)
trigger = await self._get_trigger_or_raise(automation_id, trigger_id)
await self.session.delete(trigger)
await self.session.commit()
async def _authorize_automation(
self, automation_id: int, permission: str
) -> Automation:
automation = await self.session.get(Automation, automation_id)
if automation is None:
raise HTTPException(
status_code=404, detail=f"automation {automation_id} not found"
)
await check_permission(
self.session,
self.user,
automation.search_space_id,
permission,
f"You don't have permission to {permission.split(':')[1]} automations in this search space",
)
return automation
async def _get_trigger_or_raise(
self, automation_id: int, trigger_id: int
) -> AutomationTrigger:
trigger = await self.session.get(AutomationTrigger, trigger_id)
if trigger is None or trigger.automation_id != automation_id:
raise HTTPException(
status_code=404, detail=f"trigger {trigger_id} not found"
)
return trigger
def _validate_params(trigger_type: TriggerType, raw: dict) -> dict:
definition = get_trigger(trigger_type.value)
if definition is None:
raise HTTPException(
status_code=422, detail=f"unknown trigger type {trigger_type.value!r}"
)
try:
validated = definition.params_model.model_validate(raw)
except ValidationError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
return validated.model_dump(mode="json")
def _initial_next_fire(
trigger_type: TriggerType, params: dict, enabled: bool
) -> datetime | None:
if trigger_type != TriggerType.SCHEDULE or not enabled:
return None
return compute_next_fire_at(
params["cron"], params["timezone"], after=datetime.now(UTC)
)
def get_trigger_service(
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
) -> TriggerService:
return TriggerService(session=session, user=user)

View file

@ -15,6 +15,7 @@ Runs every minute. Each tick performs two passes:
from __future__ import annotations from __future__ import annotations
import logging import logging
from dataclasses import dataclass
from datetime import UTC, datetime from datetime import UTC, datetime
from sqlalchemy import select from sqlalchemy import select
@ -39,6 +40,15 @@ TASK_NAME = "automation_schedule_tick"
_TICK_BATCH = 200 _TICK_BATCH = 200
@dataclass(frozen=True, slots=True)
class _Claim:
"""Per-trigger fire context captured before row state is mutated."""
trigger_id: int
scheduled_for: datetime
previous_last_fired_at: datetime | None
@celery_app.task(name=TASK_NAME) @celery_app.task(name=TASK_NAME)
def automation_schedule_tick() -> None: def automation_schedule_tick() -> None:
"""Tick once: self-heal NULL next_fire_at, claim due rows, fire each.""" """Tick once: self-heal NULL next_fire_at, claim due rows, fire each."""
@ -52,12 +62,12 @@ async def _tick() -> None:
await _self_heal_null_next_fire(session, now=now) await _self_heal_null_next_fire(session, now=now)
claimed_ids = await _claim_due_triggers(session, now=now) claims = await _claim_due_triggers(session, now=now)
if not claimed_ids: if not claims:
return return
for trigger_id in claimed_ids: for claim in claims:
await _fire_one(session, trigger_id=trigger_id) await _fire_one(session, claim=claim, fired_at=now)
async def _self_heal_null_next_fire(session: AsyncSession, *, now: datetime) -> None: async def _self_heal_null_next_fire(session: AsyncSession, *, now: datetime) -> None:
@ -95,8 +105,8 @@ async def _self_heal_null_next_fire(session: AsyncSession, *, now: datetime) ->
async def _claim_due_triggers( async def _claim_due_triggers(
session: AsyncSession, *, now: datetime session: AsyncSession, *, now: datetime
) -> list[int]: ) -> list[_Claim]:
"""Lock and advance due rows; return claimed trigger ids.""" """Lock and advance due rows; return per-trigger fire context."""
stmt = ( stmt = (
select(AutomationTrigger) select(AutomationTrigger)
.where( .where(
@ -113,8 +123,12 @@ async def _claim_due_triggers(
if not triggers: if not triggers:
return [] return []
claimed: list[int] = [] claims: list[_Claim] = []
for trigger in triggers: for trigger in triggers:
# Snapshot fire-context BEFORE we advance the row.
scheduled_for = trigger.next_fire_at
previous_last_fired_at = trigger.last_fired_at
try: try:
trigger.next_fire_at = compute_next_fire_at( trigger.next_fire_at = compute_next_fire_at(
trigger.params["cron"], trigger.params["cron"],
@ -131,29 +145,43 @@ async def _claim_due_triggers(
continue continue
trigger.last_fired_at = now trigger.last_fired_at = now
claimed.append(trigger.id) claims.append(
_Claim(
trigger_id=trigger.id,
scheduled_for=scheduled_for,
previous_last_fired_at=previous_last_fired_at,
)
)
await session.commit() await session.commit()
return claimed return claims
async def _fire_one(session: AsyncSession, *, trigger_id: int) -> None: async def _fire_one(
session: AsyncSession, *, claim: _Claim, fired_at: datetime
) -> None:
"""Reload the trigger post-commit and dispatch a run for it.""" """Reload the trigger post-commit and dispatch a run for it."""
trigger = await session.get(AutomationTrigger, trigger_id) trigger = await session.get(AutomationTrigger, claim.trigger_id)
if trigger is None: if trigger is None:
return return
try: try:
run = await dispatch_schedule_run(session=session, trigger=trigger) run = await dispatch_schedule_run(
session=session,
trigger=trigger,
fired_at=fired_at,
scheduled_for=claim.scheduled_for,
previous_last_fired_at=claim.previous_last_fired_at,
)
logger.info( logger.info(
"scheduled fire: trigger=%d automation=%d run=%d", "scheduled fire: trigger=%d automation=%d run=%d",
trigger_id, claim.trigger_id,
trigger.automation_id, trigger.automation_id,
run.id, run.id,
) )
except Exception: except Exception:
logger.exception( logger.exception(
"scheduled fire failed for trigger %d (next attempt at next match)", "scheduled fire failed for trigger %d (next attempt at next match)",
trigger_id, claim.trigger_id,
) )
await session.rollback() await session.rollback()

View file

@ -19,7 +19,7 @@ def build_run_context(
trigger_type: str | None, trigger_type: str | None,
started_at: datetime | None, started_at: datetime | None,
attempt: int, attempt: int,
resolved_inputs: Mapping[str, Any], inputs: Mapping[str, Any],
step_outputs: Mapping[str, Any], step_outputs: Mapping[str, Any],
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Build the ``{run, inputs, steps}`` namespace exposed to every template.""" """Build the ``{run, inputs, steps}`` namespace exposed to every template."""
@ -36,6 +36,6 @@ def build_run_context(
"started_at": started_at, "started_at": started_at,
"attempt": attempt, "attempt": attempt,
}, },
"inputs": dict(resolved_inputs), "inputs": dict(inputs),
"steps": dict(step_outputs), "steps": dict(step_outputs),
} }

View file

@ -9,8 +9,7 @@ from .params import ManualTriggerParams
MANUAL_TRIGGER = TriggerDefinition( MANUAL_TRIGGER = TriggerDefinition(
type="manual", type="manual",
description="Fire on a user-initiated 'Run now' invocation.", description="Fire on a user-initiated 'Run now' invocation.",
params_schema=ManualTriggerParams.model_json_schema(), params_model=ManualTriggerParams,
payload_schema={"type": "object"},
) )
register_trigger(MANUAL_TRIGGER) register_trigger(MANUAL_TRIGGER)

View file

@ -19,9 +19,14 @@ async def dispatch_manual_run(
*, *,
session: AsyncSession, session: AsyncSession,
automation_id: int, automation_id: int,
payload: dict[str, Any] | None, runtime_inputs: dict[str, Any] | None,
) -> AutomationRun: ) -> AutomationRun:
"""Find the automation + its enabled manual trigger, then run the generic dispatch.""" """Find the automation + its enabled manual trigger, then run the generic dispatch.
``runtime_inputs`` is the caller-supplied payload (e.g. an HTTP body for a
"Run now" API call); it is merged with the trigger's ``static_inputs`` by
the generic dispatcher, with static winning on key collision.
"""
automation = await _load_automation(session, automation_id) automation = await _load_automation(session, automation_id)
if automation is None: if automation is None:
raise DispatchError(f"automation {automation_id} not found") raise DispatchError(f"automation {automation_id} not found")
@ -41,7 +46,7 @@ async def dispatch_manual_run(
session=session, session=session,
automation=automation, automation=automation,
trigger=trigger, trigger=trigger,
payload=payload, runtime_inputs=runtime_inputs,
) )

View file

@ -9,12 +9,7 @@ from .params import ScheduleTriggerParams
SCHEDULE_TRIGGER = TriggerDefinition( SCHEDULE_TRIGGER = TriggerDefinition(
type="schedule", type="schedule",
description="Fire on a cron schedule in a given timezone.", description="Fire on a cron schedule in a given timezone.",
params_schema=ScheduleTriggerParams.model_json_schema(), params_model=ScheduleTriggerParams,
payload_schema={
"type": "object",
"additionalProperties": False,
"properties": {},
},
) )
register_trigger(SCHEDULE_TRIGGER) register_trigger(SCHEDULE_TRIGGER)

View file

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@ -16,9 +18,18 @@ async def dispatch_schedule_run(
*, *,
session: AsyncSession, session: AsyncSession,
trigger: AutomationTrigger, trigger: AutomationTrigger,
fired_at: datetime,
scheduled_for: datetime,
previous_last_fired_at: datetime | None,
) -> AutomationRun: ) -> AutomationRun:
"""Fire one scheduled run for ``trigger``. """Fire one scheduled run for ``trigger``.
Emits calendar context as runtime inputs:
- ``fired_at`` actual fire time
- ``scheduled_for`` cron-derived target time for this fire
- ``last_fired_at`` fire time of the previous run, or null on first fire
The caller (the schedule tick) is responsible for selecting due triggers The caller (the schedule tick) is responsible for selecting due triggers
and advancing ``next_fire_at`` / ``last_fired_at`` before invoking this. and advancing ``next_fire_at`` / ``last_fired_at`` before invoking this.
""" """
@ -33,11 +44,19 @@ async def dispatch_schedule_run(
f"automation {trigger.automation_id} is {automation.status.value}, not active" f"automation {trigger.automation_id} is {automation.status.value}, not active"
) )
runtime_inputs = {
"fired_at": fired_at.isoformat(),
"scheduled_for": scheduled_for.isoformat(),
"last_fired_at": (
previous_last_fired_at.isoformat() if previous_last_fired_at else None
),
}
return await dispatch_run( return await dispatch_run(
session=session, session=session,
automation=automation, automation=automation,
trigger=trigger, trigger=trigger,
payload=None, runtime_inputs=runtime_inputs,
) )

View file

@ -5,10 +5,16 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
from pydantic import BaseModel
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class TriggerDefinition: class TriggerDefinition:
type: str type: str
description: str description: str
params_schema: dict[str, Any] params_model: type[BaseModel]
payload_schema: dict[str, Any]
@property
def params_schema(self) -> dict[str, Any]:
"""JSON Schema (draft 2020-12) derived from ``params_model``."""
return self.params_model.model_json_schema()