From 27ab367a13492426f315fc7e40c8d987d6f28b74 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 27 May 2026 21:21:43 +0200 Subject: [PATCH] feat(automations): static_inputs on triggers + vertical-slice api/services --- automation-design-plan.md | 489 ++++++++---------- .../versions/144_add_automation_tables.py | 4 +- .../actions/agent_task/definition.py | 2 +- .../app/automations/actions/types.py | 8 +- .../app/automations/api/__init__.py | 4 + .../app/automations/api/automation.py | 83 ++- surfsense_backend/app/automations/api/run.py | 71 +++ .../app/automations/api/trigger.py | 55 ++ .../app/automations/dispatch/run.py | 18 +- .../app/automations/persistence/models/run.py | 5 +- .../automations/persistence/models/trigger.py | 4 + .../app/automations/runtime/executor.py | 2 +- .../{api/schemas => schemas/api}/__init__.py | 0 .../schemas => schemas/api}/automation.py | 0 .../{api/schemas => schemas/api}/run.py | 3 +- .../{api/schemas => schemas/api}/trigger.py | 3 + .../app/automations/services/__init__.py | 13 +- .../app/automations/services/automation.py | 159 +++++- .../app/automations/services/run.py | 93 ++++ .../app/automations/services/trigger.py | 143 +++++ .../app/automations/tasks/schedule_tick.py | 56 +- .../app/automations/templating/context.py | 4 +- .../automations/triggers/manual/definition.py | 3 +- .../automations/triggers/manual/dispatch.py | 11 +- .../triggers/schedule/definition.py | 7 +- .../automations/triggers/schedule/dispatch.py | 21 +- .../app/automations/triggers/types.py | 10 +- 27 files changed, 915 insertions(+), 356 deletions(-) create mode 100644 surfsense_backend/app/automations/api/run.py create mode 100644 surfsense_backend/app/automations/api/trigger.py rename surfsense_backend/app/automations/{api/schemas => schemas/api}/__init__.py (100%) rename surfsense_backend/app/automations/{api/schemas => schemas/api}/automation.py (100%) rename surfsense_backend/app/automations/{api/schemas => schemas/api}/run.py (92%) rename surfsense_backend/app/automations/{api/schemas => schemas/api}/trigger.py (87%) create mode 100644 surfsense_backend/app/automations/services/run.py create mode 100644 surfsense_backend/app/automations/services/trigger.py diff --git a/automation-design-plan.md b/automation-design-plan.md index f57385e31..db5f7a23c 100644 --- a/automation-design-plan.md +++ b/automation-design-plan.md @@ -34,24 +34,27 @@ system will survive feature growth: --- -## 2. The four-layer contract +## 2. The three-layer contract -The system is structured as four layers. Layers 1, 2, and 4 are defined by -SurfSense developers (at registration time). Layer 3 is what users write -(or the NL generator produces). The runtime reads all four to do its job. +The system is structured as three layers. Layers 1 and 3 are defined by +SurfSense developers (at registration time). Layer 2 is what users write +(or the NL generator produces). The runtime reads all three to do its job. | Layer | What it is | Defined by | | ----- | ---------- | ---------- | -| **1. Capability registry** | What this SurfSense instance can do | Developers, at startup | -| **2. Action contract** | Per-action input/output schema | Developers, at startup | -| **3. Automation definition** | One concrete saved automation | Users (or NL generator) | -| **4. Trigger contract** | Per-trigger config and payload schemas | Developers, at startup | +| **1. Action contract** | Per-action params and output schema | Developers, at startup | +| **2. Automation definition** | One concrete saved automation | Users (or NL generator) | +| **3. Trigger contract** | Per-trigger params and payload schemas | Developers, at startup | -Each layer constrains the one above. The runtime reads all four but doesn't -know what's in them ahead of time. That's how a new capability or trigger +Each layer constrains the next. The runtime reads all three but doesn't +know what's in them ahead of time. That's how a new action or trigger type becomes available across the engine without code changes outside its registration. +A unification layer below Layer 1 — one catalog of "things this SurfSense +instance can do," shared by automations, agents, and future surfaces — was +considered and deferred (§3). v1 actions are stand-alone. + ### Schema language Every shape in every layer is described in **JSON Schema (draft 2020-12).** @@ -66,167 +69,126 @@ extensions on top: --- -## 3. Capability registry (Layer 1) +## 3. Capability unification layer — deferred to post-v1 -A `Capability` is one discrete thing the SurfSense backend exposes — -"post a Slack message," "query the Search Space," "generate a podcast." It -is the atomic unit of "things automations can do." +Earlier drafts introduced a `Capability` registry as Layer 1: one catalog +of "things this SurfSense instance can do," shared by the automation +engine (as actions), the agent (as tools), and any future HTTP surface. +The motivation is real — one source of truth beats N parallel registries — +but v1 has a single action (`agent_task`) and a single consumer (the +automation engine). The five-field shape sketched earlier (`id`, +`description`, `input_schema`, `output_schema`, `handler`) cannot safely +host any non-trivial capability: it carries no caller identity, no +search-space scoping, and no authorization gate on tool delegation. +Building the abstraction with one consumer would lock in a shape that +doesn't survive the second consumer. -```python -@dataclass -class Capability: - id: str # "slack.post_message" - description: str # for the NL generator + UI label - input_schema: dict # JSON Schema - output_schema: dict # JSON Schema - handler: AsyncHandler -``` +The unification layer returns when the second consumer lands (Phase 2 +tight actions or Phase 4 MCP), redesigned from the start with: -### v1-minimum: five fields, nothing else +- A `CallContext` carrying caller user id, search space id, and run id, + passed to every handler invocation. +- Explicit scope declarations per capability (e.g. `reads:documents`, + `writes:slack`, `destructive`) for the authorization layer to read. +- A per-user, per-search-space filter consulted at both definition save + time (validating `agent_task.tools`) and run time (scoping the agent's + tool list to what the automation creator can delegate). -The Capability is **deliberately five fields in v1**. Every additional field -that earlier drafts considered (`name`, `required_credentials`, -`side_effects`, `expected_duration_seconds`, `cost_estimate`) has been -removed until a concrete consumer feature demands it. Authoring stays cheap -and the registry stays trivial to introspect: +Until then: -- `name` → folded into `description`. The UI can render a short label from - the first line of `description` or fall back to `id`. No separate field - needed in v1. -- `required_credentials` → returns when external-credential capabilities - ship (Phase 2). v1 capabilities run server-side with app config; nothing - to declare. -- `side_effects` → returns when RBAC inside automations or - `READ_ONLY`-only agent tool gating arrives. v1 capabilities are - hand-picked and all trusted code. -- `expected_duration_seconds` → returns when multi-queue routing ships. - Single Celery queue in v1. -- `cost_estimate` → never returns as a declared field; cost is measured - per run from a ledger, aggregated per Capability, and surfaced as a - historical average. Pre-flight checks are deferred. - -The runtime invariant: a Capability is **a typed, named, callable thing -the system can do.** Every consumer (executor, agent tool layer, future -HTTP API) sees the same five-field shape and uses it the same way. - -### Where capabilities live (v1) - -In v1, the capability registry is a single in-memory dict, populated at -process startup from native registrations in -`automations/registries/capabilities/`. Identical across all workers. -No database persistence, no closures rebuilt per worker. - -### MCP integration — deferred to Phase 4 - -The earlier two-tier registry (native + MCP-derived), the -`mcp_connections` / `mcp_tools` tables, the harvester, and the lazy -per-worker closure cache are **deferred to Phase 4** along with the -rest of the integration-tooling surface. They are removed from v1 -because: - -- v1 has no external connector capabilities (no Slack, Notion, Drive, - etc.). The only capabilities that will ship are server-side helpers - (search-space query / fetch) plus the loose `agent_task` action. -- Without external connectors, the lifecycle mismatch that motivates - the two-tier design (connect Monday, run Friday, workers restarted - in between) doesn't arise. A startup-time dict is sufficient. -- Phase 4 reintroduces this design as-is — the registry interface in - v1 is the same callable surface a Phase-4 MCP harvester will register - into. The deferral is additive, not a different design. - -See archived design at `docs/automation/archived/mcp-registry.md` once -v1 ships; for now the only consumer of the registry is the in-memory -native path. +- v1 actions are stand-alone units (Layer 1 below); the automation engine + reads its own action registry, nothing else. +- `agent_task.params.tools` is a forward-looking allowlist field with no + v1 semantics beyond "list of string identifiers." The handler's tool + resolution is opaque to the automation contract. ### Credentials — deferred to Phase 2 -The earlier per-call credential resolution pattern (`ctx.resolve_mcp_client`, -`ctx.resolve_http_client`, `ctx.resolve_llm`) is **deferred to Phase 2**. -v1 capabilities run server-side using app-level configuration; none of -the seven v1 capabilities needs per-user or per-connection auth. +External-credential handlers (Slack, email, etc.) require per-user or +per-connection auth. v1 actions run server-side with app-level +configuration. When tight actions ship in Phase 2, the credential design +lands as part of the unification redesign: connection IDs in the +definition (never tokens); credentials loaded per-call by the handler +context (never pre-loaded into worker memory); credentials never enter +LLM context. -When Phase 2 ships external-credential capabilities (Slack, email, etc.), -the three guarantees the original design promised are reintroduced -unchanged: +### MCP — deferred to Phase 4 -- Credentials never appear in the automation definition (connection IDs - only). -- Credentials never appear in the LLM's context (the host holds them - and uses them on the LLM's behalf when executing tool calls). -- Credentials are loaded per-call, not pre-loaded into worker memory. - -The Phase-2 design returns as-is; only the v1 surface is simplified. +External tool servers feeding tools into a shared registry land with the +rest of the integration tooling in Phase 4, after the unification layer +is in place. The two-tier registry, `mcp_connections` and `mcp_tools` +tables, and the harvester arrive as a single coherent step then. --- -## 4. Action contract (Layer 2) +## 4. Action contract -An `Action` is what a user references in a plan step. Most actions are -thin wrappers around one capability (e.g., `slack_post` wraps -`slack.post_message`). Some compose: `agent_task` is one action whose -handler invokes the LangGraph runtime, which in turn can call many -capabilities. +An `Action` is what a user references in a plan step. Some actions are +deterministic single-purpose handlers (`slack_post`, `send_email`); one +action (`agent_task`) hosts an LLM and a tool allowlist for cases where +judgment is needed. The contract is the same in both cases — only the +handler differs. ```python -@dataclass +@dataclass(frozen=True, slots=True) class ActionDefinition: - type: str # "agent_task", "slack_post" - name: str # for the UI - description: str # for the NL generator - config_schema: dict # JSON Schema for action.config - output_contract: dict | DynamicOutput # what it produces - uses_capabilities: list[str] # IDs from the registry - produces_artifacts: list[ArtifactSpec] # see §8 - handler: AsyncHandler + type: str # "agent_task", "slack_post" + name: str # short UI label + description: str # for the NL generator and the UI + params_schema: dict # JSON Schema for step.params + handler: ActionHandler ``` +This is the v1 shape: five fields, no handler context, no output +contract, no artifact declaration. The deferrals are intentional: + +- **`output_contract`** — Phase 2. Deterministic handlers will return + a fixed shape; v1's only action (`agent_task`) takes an + `output_schema` inside `params` and validates against that instead. +- **`produces_artifacts`** — Phase 5. Artifact lifecycle (storage, + signed URLs, retention) is its own design step; v1 handlers + persist their own outputs. +- **Handler context** — paired with the unification redesign (§3). + v1 handlers receive `(args)` only; per-user / per-search-space + behavior is not yet a v1 concern. + ### Tight vs loose actions Two patterns coexist by design: -- **Tight actions** (`slack_post`, `linear_create_issue`, `send_email`): - config_schema is fully specified, output_contract is fixed, handler is a - thin wrapper. ~20 LOC each. Used when the user knows exactly what they - want done — no LLM tokens spent on trivial work. +- **Tight actions** (`slack_post`, `linear_create_issue`, + `send_email`) — deterministic single-purpose handlers. ~20 LOC + each. **Phase 2.** +- **Loose actions** (`agent_task`) — params_schema accepts a `prompt`, + a `tools` allowlist, and an optional `output_schema` declaring what + the agent must return; the handler validates the agent's output + against it. **v1.** -- **Loose actions** (`agent_task`): config_schema accepts a `prompt` and a - `tools` allowlist; output_contract is *dynamic* — the user declares the - output shape they want via `output_schema` in the step config; the - handler asks the LLM to return that shape and validates. Used when - judgment is needed. - -The agent's tool list is **the same capabilities** that tight actions call -directly. One registry, two invocation modes. Adding a new MCP server gives -both modes access to its tools automatically. +The agent's `tools` allowlist resolves opaquely in v1; the redesigned +unification layer (§3) will give both invocation modes access to the +same vocabulary, with per-user authorization gating both. ### How names in the definition become function calls -The definition contains strings like `"action": "slack_post"`. The string is -just a name — it does not point to a function. At runtime, the executor -performs a **name-based lookup** against the action registry: +The definition contains strings like `"action": "agent_task"`. The +string is just a name — it does not point to a function. At runtime, +the executor performs a **name-based lookup** against the action +registry: ```python -# step.action is a string from the JSON definition, e.g. "slack_post" -action_def = _ACTION_REGISTRY[step.action] # dict lookup -handler = action_def.handler # Python callable -result = await handler(ctx, resolved_config) # invocation +action_def = action_registry.get(step.action) # dict lookup +handler = action_def.handler # Python callable +result = await handler(resolved_params) # invocation ``` -The registry is a Python dict (or a thin wrapper around one) populated at -process startup. Each entry in `automations/actions/*.py` calls a -`register_action(...)` function at module import time, putting its -`ActionDefinition` (including the handler function reference) into the -registry. +The registry is a Python dict populated at process startup. Each entry +in `automations/registries/actions/*.py` calls `register_action(...)` +at module import time, putting its `ActionDefinition` (including the +handler function reference) into the registry. -The same pattern applies to capabilities. The definition references -capabilities by ID (`"slack.post_message"`); the capability registry maps -the ID to a `Capability` object holding the handler. Definitions never -reference Python code directly — they reference names that the registry -resolves to code. - -This separation is what makes the contract portable. The definition is -pure data. The registry is the engine's runtime vocabulary. They meet at -name-based lookup; nothing else crosses the boundary. +The definition is pure data. The registry is the engine's runtime +vocabulary. They meet at name-based lookup; nothing else crosses the +boundary. ### The full expressive spectrum @@ -238,7 +200,7 @@ fully agentic. Six practical shapes worth recognizing: | **1. Direct call** | `slack_post` with literal channel and template | No LLM. ~200ms. Fractions of a cent. | | **2. Direct call with computed inputs** | `linear_create_issue` using `{{summary.title}}` from a prior step | No LLM for this step. Cheap. | | **3. Single-domain agent task** | `agent_task` with `tools: ["slack.*"]` only | One LLM, bounded toolset. | -| **4. Multi-domain agent task, narrow** | `agent_task` with `tools: ["github.list_pull_requests", "linear.create_issue"]` | One LLM, named capabilities. | +| **4. Multi-domain agent task, narrow** | `agent_task` with `tools: ["github.list_pull_requests", "linear.create_issue"]` | One LLM, named tools. | | **5. Multi-domain agent task, broad** | `agent_task` with `tools: ["slack.*", "github.*", "linear.*"]` | One LLM, large toolset, most agentic. | | **6. Composed plan** | `agent_task` (narrow) for thinking → `slack_post` + `linear_create_issue` for acting | Best cost-to-power ratio. | @@ -258,7 +220,7 @@ user's. --- -## 5. Automation definition (Layer 3) +## 5. Automation definition This is the JSON the user writes (or the NL generator produces). Stored in `automations.definition` as JSONB. @@ -287,7 +249,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in "triggers": [ { "type": "schedule", - "config": { "cron": "0 9 * * 1-5", "timezone": "Africa/Kigali" } + "params": { "cron": "0 9 * * 1-5", "timezone": "Africa/Kigali" } } ], @@ -295,7 +257,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in { "step_id": "research", "action": "agent_task", - "config": { + "params": { "prompt": "Find documents tagged {{inputs.tags}} indexed since {{inputs.since}}. Return JSON with bullets and source_doc_ids.", "tools": ["search_space.query", "search_space.fetch_document"], "model": "anthropic/claude-sonnet-4-7", @@ -313,7 +275,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in { "step_id": "deliver", "action": "slack_post", - "config": { + "params": { "channel_id": "C0123", "message_template": "*Competitor digest*\n\n{% for b in summary.bullets %}• {{b}}\n{% endfor %}" } @@ -325,11 +287,10 @@ This is the JSON the user writes (or the NL generator produces). Stored in "max_retries": 2, "retry_backoff": "exponential", "concurrency": "drop_if_running", - "budget_cap_usd": 1.50, "on_failure": [ /* steps to run if main plan fails after retries */ ] }, - "metadata": { "tags": ["digest"], "created_from_nl": true } + "metadata": { "tags": ["digest"] } } ``` @@ -340,7 +301,7 @@ This is the JSON the user writes (or the NL generator produces). Stored in "step_id": "...", // unique within plan "action": "...", // references an ActionDefinition.type "when": "{{ ... }}", // optional Jinja expr → bool; false = skip - "config": { ... }, // validated against action's config_schema + "params": { ... }, // validated against action's params_schema "output_as": "...", // binds output to this name for later steps "max_retries": 0, // optional, overrides automation default "timeout_seconds": 1200 // optional, overrides automation default @@ -354,7 +315,7 @@ about it, or they compose automations through events (§7.5). --- -## 6. Trigger contract (Layer 4) +## 6. Trigger contract Three trigger types. That's the entire taxonomy. @@ -363,23 +324,12 @@ Three trigger types. That's the entire taxonomy. ```python TriggerDefinition( type="schedule", - config_schema={ - "type": "object", - "required": ["cron", "timezone"], - "properties": { - "cron": { "type": "string" }, - "timezone": { "type": "string", "format": "iana-timezone" } - } - }, - payload_schema={ - "type": "object", - "properties": { - "fired_at": { "type": "string", "format": "date-time" }, - "scheduled_for": { "type": "string", "format": "date-time" }, - "last_fired_at": { "type": "string", "format": "date-time" } - } - } + params_model=ScheduleTriggerParams, # cron + timezone ) +# At fire time the schedule producer emits runtime inputs +# (fired_at, scheduled_for, last_fired_at) which are merged with the +# trigger row's static_inputs (static wins) and validated against +# automation.definition.inputs.schema_. ``` Implementation: extends `app/utils/periodic_scheduler.py`, which already @@ -395,7 +345,7 @@ want an event trigger instead. ```python TriggerDefinition( type="webhook", - config_schema={ + params_schema={ "type": "object", "properties": { "input_mapping": { @@ -422,7 +372,7 @@ Dedups against runs in the last 24 hours. ```python TriggerDefinition( type="event", - config_schema={ + params_schema={ "type": "object", "required": ["event_type"], "properties": { @@ -485,11 +435,13 @@ Common path (after a trigger has fired): 4. **Snapshot the resolved definition** into the run row (immutable history) 5. Enqueue executor task on the single `automations_default` Celery queue -The cost-estimate pre-check (originally step 3) is **deferred**. -v1 capabilities do not declare `cost_estimate`; pre-flight budgeting -returns when a historical-cost ledger exists. The mid-flight budget -cap (§7.2) still kills the run if accumulated cost crosses -`budget_cap_usd`. +The cost-estimate pre-check (originally step 3) is **deferred**. v1 +actions do not declare cost estimates, the run row has no `cost_usd` +column, and no handler reports tokens used — so neither pre-flight +prediction nor mid-flight accumulation can be enforced. `Execution` +therefore does not expose `budget_cap_usd` in v1; it returns as a single +field addition the day the cost ledger ships (per-action cost reporting ++ `automation_runs.cost_usd` column + executor accumulation). Queue routing by `expected_duration_seconds` is **deferred** until load patterns justify a second queue. v1 uses a single queue. @@ -510,15 +462,15 @@ async def execute_run(run_id: int) -> None: if step.when and not evaluate_predicate(step.when, context | step_outputs): record_step_skipped(run, step); continue - resolved_config = render_config(step.config, context | step_outputs) + resolved_params = render_params(step.params, context | step_outputs) action = action_registry.get(step.action) - validate(resolved_config, action.config_schema) + validate(resolved_params, action.params_schema) try: result = await with_retries( action.handler, ctx=build_action_context(run, action), - args=resolved_config, + args=resolved_params, policy=step.retry_policy or run.execution.retry_policy, ) validate(result, step.output_schema) @@ -541,14 +493,20 @@ validated dict come back; it doesn't know that step was "smart." ### 7.3 Action handlers -One handler per `ActionDefinition.type`. Receives `(ctx, args)`, returns -a dict matching `output_contract` (or matching the user-declared -`output_schema` for dynamic-output actions like `agent_task`). +One handler per `ActionDefinition.type`. Receives the validated `args` +dict and returns whatever the step's output validates against (a fixed +shape declared by tight actions, or a dynamic shape declared via +`output_schema` in the step params for `agent_task`). -Handlers handle their own credential resolution via `ctx.resolve_credentials`. -They do not know about retries, timeouts, or budget caps — those are the +Handlers do not know about retries or timeouts — those are the executor's concern. +In v1, handlers take `(args)` only. The `CallContext` parameter sketched +in §7.2's pseudo-code (caller user id, search space id, run id, +credential resolver) arrives with the unification layer redesign (§3); +v1's single action (`agent_task`) reads what it needs from app-level +configuration. + ### 7.4 Template engine #### Why it exists @@ -747,7 +705,7 @@ Three fields, per-automation defaults with optional per-step overrides: - `timeout_seconds`: integer Retries on: -- Capability handler exceptions +- Action handler exceptions - Output schema validation failures (for dynamic-output actions, the validation error is fed back to the LLM in the retry) @@ -755,12 +713,21 @@ Not retries: - `when:` evaluation failures (these are user errors, surface immediately) - Input validation failures (caught at dispatch, never reach the executor) -### Budget enforcement +### Budget enforcement *(deferred — not in v1)* -`budget_cap_usd` is per-run. The dispatcher refuses to enqueue if estimated -cost exceeds it. The executor kills the run if accumulated cost crosses it -mid-flight (the LLM ops handler reports tokens consumed back to the -executor between calls). +Future shape: `budget_cap_usd` on `Execution`, dispatcher refuses to +enqueue if estimated cost exceeds it, executor kills the run if +accumulated cost crosses it mid-flight (the LLM ops handler reports +tokens consumed back to the executor between calls). + +Prerequisites before this can land: +- Each action declares cost reporting (tokens × model price, API call + charges) — `ActionDefinition` has no such field today. +- `automation_runs.cost_usd` column + executor accumulates per step. +- A historical-cost ledger so pre-flight estimation can return useful + numbers (otherwise the dispatcher gate is guessing). + +Until all three exist, v1 has no surface for budget enforcement. ### On-failure handlers @@ -787,14 +754,13 @@ nightly Celery Beat task deletes expired artifacts). ### Duration classes and queue routing — deferred The original design routed runs to multiple Celery queues based on each -capability's declared `expected_duration_seconds`. v1 ships with **one -queue** (`automations_default`) and capabilities do not declare a -duration. Multi-queue routing returns when burst load on a single queue -actually justifies the operational complexity of independent worker -pools. +action's declared `expected_duration_seconds`. v1 ships with **one +queue** (`automations_default`) and actions do not declare a duration. +Multi-queue routing returns when burst load on a single queue actually +justifies the operational complexity of independent worker pools. Adding the second queue is a config change plus reintroducing -`expected_duration_seconds` on the `Capability` dataclass — both +`expected_duration_seconds` on the `ActionDefinition` dataclass — both mechanical, additive, and free of design rewrite. --- @@ -832,14 +798,16 @@ and an immutable run history. ### `automation_triggers` -| field | type | notes | -| --------------- | ----------------------------------------------------------------------------- | ------------------------------------------- | -| `id` | int PK | | -| `automation_id` | FK | | -| `type` | enum: `schedule`, `manual` (Phase 2/3 add `webhook`, `event`) | | -| `config` | jsonb | validated against trigger's `config_schema` | -| `enabled` | bool | | -| `last_fired_at` | timestamp | | +| field | type | notes | +| --------------- | ----------------------------------------------------------------------------- | ----------------------------------------------------------- | +| `id` | int PK | | +| `automation_id` | FK | | +| `type` | enum: `schedule`, `manual` (Phase 2/3 add `webhook`, `event`) | | +| `params` | jsonb | trigger-type config, validated against trigger's `params_schema` | +| `static_inputs` | jsonb | per-attachment domain values merged into every run (static wins on collision) | +| `enabled` | bool | | +| `last_fired_at` | timestamp | | +| `next_fire_at` | timestamp / null | precomputed next fire moment for schedule triggers | `secret_hash` (for webhook bearer tokens) is **deferred to Phase 2** with the webhook trigger. @@ -853,8 +821,7 @@ the webhook trigger. | `trigger_id` | FK / null | null = manual via UI | | `status` | enum | `pending`, `running`, `succeeded`, `failed`, `cancelled`, `timed_out` | | `definition_snapshot` | jsonb | the definition as it was when this run fired | -| `trigger_payload` | jsonb | | -| `resolved_inputs` | jsonb | | +| `inputs` | jsonb | merged & validated inputs (trigger.static_inputs ∪ producer runtime data, static wins) | | `step_results` | jsonb | array of per-step results with timing | | `output` | jsonb / null | | | `artifacts` | jsonb | references to created artifacts | @@ -863,7 +830,7 @@ the webhook trigger. | `agent_session_id`| str / null | link to LangGraph trace if agent_task was used | `cost_usd` (per-run accumulated cost) is **deferred** until at least one -v1 capability records token-level cost. When reintroduced it lands as a +action records token-level cost. When reintroduced it lands as a column-only migration. ### Deferred tables @@ -897,8 +864,8 @@ not "trusted authors only." User provides natural-language input. The Generator LLM is given: - The full schema set (input schema for definition, registry of action - types with their config_schemas, registry of trigger types, available - capabilities for this SearchSpace, list of allowed Jinja filters) + types with their params_schemas, registry of trigger types, list of + allowed Jinja filters) - A tool to list available connectors, channels, and other SearchSpace resources, so it doesn't invent names that don't exist - A few-shot set of examples @@ -918,13 +885,13 @@ Output: a structured proposal matching the automation definition schema. Server-side, before the proposal reaches the user: - Validate against JSON Schema (shape correctness) -- Verify every capability referenced exists in the registry (resource existence) +- Verify every action and trigger type referenced exists in the registry - Verify every connector/channel/resource referenced exists in this SearchSpace - Validate every template against the sandbox's allowlist (no underscore attributes, no unregistered filter names, length under cap) Failures here are deterministic errors, not warnings. A proposal that -references a non-existent capability or includes a template using +references a non-existent action or includes a template using `{{x.__class__}}` is rejected before the user sees it; the Generator is re-prompted with the validation error and asked to fix the proposal. @@ -947,7 +914,7 @@ produces two outputs for the user: - Action sequences that touch external systems without obvious benefit to the user - Cost estimates that seem high relative to the goal - - References to capabilities the user hasn't used before + - References to actions the user hasn't used before - Schedules tighter than 15 minutes (likely should be event triggers) The Review LLM is a **UX layer** that makes review actually useful. It is @@ -1009,33 +976,18 @@ always. surfsense_backend/app/ ├── automations/ # NEW: the engine │ ├── __init__.py -│ ├── models.py # SQLAlchemy models for 6 tables -│ ├── schemas.py # Pydantic schemas (definition envelope, etc.) +│ ├── persistence/ # SQLAlchemy models + enums for 3 tables +│ ├── schemas/ # Pydantic schemas (definition envelope, etc.) │ ├── routes.py # FastAPI router (/api/v1/automations) │ ├── service.py # CRUD + business logic -│ ├── dispatcher.py # trigger matching, cost check, run creation +│ ├── dispatcher.py # trigger matching, run creation │ ├── executor.py # the Celery task that runs a plan │ ├── templating.py # Jinja sandbox + filters │ ├── events.py # publish/subscribe for domain_events │ ├── filters.py # JSON filter grammar evaluator -│ ├── actions/ -│ │ ├── registry.py -│ │ ├── agent_task.py -│ │ ├── transform_data.py -│ │ ├── slack_post.py -│ │ ├── send_email.py -│ │ ├── notification.py -│ │ └── (more in Phase 5: podcast_generation, report_generation, ...) -│ ├── triggers/ -│ │ ├── registry.py -│ │ ├── schedule.py # Celery Beat hookup -│ │ ├── webhook.py # /fire endpoint -│ │ └── event.py # subscribes to domain_events -│ ├── capabilities/ -│ │ ├── registry.py -│ │ ├── native.py # native capability registrations -│ │ ├── mcp_harvester.py # registers MCP tools as capabilities (Phase 4) -│ │ └── (LLM ops registered alongside) +│ ├── registries/ # action and trigger registries +│ │ ├── actions/ # ActionDefinition + handler registration +│ │ └── triggers/ # TriggerDefinition │ └── nl/ # Phase 1 — primary user path │ ├── generator.py # Generator LLM │ ├── reviewer.py # Review LLM (summary + flagged items) @@ -1070,23 +1022,22 @@ automations in natural language. **Step 1 (current scope, this batch of commits):** - 3 tables (`automations`, `automation_triggers`, `automation_runs`) + Alembic migration -- Empty Capability, Action, Trigger registries (concrete entries land in - later steps when the consuming feature lands) +- Empty action and trigger registries under + `app/automations/registries/` (concrete entries land in later steps) - Pydantic schemas for the automation definition envelope, the two v1 - trigger configs (`schedule`, `manual`), and the one v1 action config - (`agent_task`) -- Module structure under `app/automations/` (data/, schemas/, + trigger params shapes (`schedule`, `manual`), and the one v1 action + params shape (`agent_task`) +- Module structure under `app/automations/` (persistence/, schemas/, registries/), fully isolated from the existing codebase **Step 2:** -- Register the `agent_task` action and the `schedule` / `manual` - triggers in the registries -- Capability registry populated with native deliverable-producing - capabilities (chosen when this step starts) +- The `agent_task` action handler and the `schedule` / `manual` triggers + registered in `app/automations/registries/`. Tool resolution for + `agent_task.params.tools` is opaque to the contract — the handler + decides what string identifiers it accepts and how they resolve. **Step 3:** -- Executor (single-queue Celery task) with retries, timeouts, budget - caps measured against `cost_usd` ledger on the run +- Executor (single-queue Celery task) with retries and timeouts - Template engine (Jinja sandbox + the v1 filter allowlist + runtime limits) - Manual "Run now" endpoint @@ -1122,19 +1073,23 @@ somewhere humans see, complex pipelines have proper error handling. **After Phase 3**: NL authoring is the polished primary surface; edit flows are conversational rather than form-only. -### Phase 4 — Event triggers +### Phase 4 — Event triggers + integration tooling - `domain_events` table and `events.py` module - Indexing pipeline publishes `connector.*` events (smallest change — just add publish calls to the existing flow) - Automations publish `automation.run.*` events on completion - `event` trigger with filter grammar -- MCP capability harvester (so MCP-backed events and tools both work) +- The unification layer redesign (see §3) — `CallContext`, scope + declarations, per-user authorization gating +- MCP integration on top of the unification layer (external tool servers + harvested into the shared catalog) **After Phase 4**: "do X when Y happens" automations work, including -automation-chaining through events. +automation-chaining through events; external MCP tools and SurfSense +actions share one vocabulary. ### Phase 5 — Wrapping existing features and sharing -- Wrap existing SurfSense capabilities as actions: `podcast_generation`, +- Wrap existing SurfSense features as actions: `podcast_generation`, `report_generation`, `indexing_sweep` - Artifact lifecycle implementation - `expected_duration_seconds` based queue routing (split `automations_long` @@ -1144,7 +1099,7 @@ automation-chaining through events. shift documented in §7.4's pre-Phase-5 gate - Cross-automation composition examples in the docs -**After Phase 5**: every existing SurfSense capability is automatable +**After Phase 5**: every existing SurfSense feature is automatable without any per-feature code, and automations can be shared between SearchSpaces and users. @@ -1156,13 +1111,12 @@ For reference — every decision made through the design process, in one place. ### Foundations -1. ✅ JSON Schema 2020-12 is the single schema language for everything +1. ✅ JSON Schema (draft 2020-12) is the single schema language for everything 2. ✅ Definition is the program; infrastructure is the interpreter 3. ✅ List of steps (not single action) in the plan, with `output_as` chaining -4. ✅ One capability registry serving native + MCP + LLM operations through the same interface -5. ✅ Capability IDs do not leak handler kind (`slack.post_message`, not `mcp.slack.post_message`) -6. ✅ Name-based resolution: definitions reference actions and capabilities by string ID. The registry is the runtime's vocabulary; lookup is a dict access. No code references in definitions. -7. ✅ The expressive spectrum runs from pure direct calls to broad agent_task; the NL generator proposes the cheapest shape that meets intent (Shape 6 from §4 by default) +4. ⏸ Capability unification layer (one catalog shared by automations, agents, and future surfaces) — **deferred to post-v1** (see §3). v1 ships actions only. +5. ✅ Name-based resolution: definitions reference action and trigger types by string ID. The registry is the runtime's vocabulary; lookup is a dict access. No code references in definitions. +6. ✅ The expressive spectrum runs from pure direct calls to broad agent_task; the NL generator proposes the cheapest shape that meets intent (Shape 6 from §4 by default) ### Trigger taxonomy 8. ✅ Three trigger types: `schedule`, `webhook`, `event` @@ -1183,7 +1137,7 @@ place. 19. ✅ No DAGs, no parallelism, no loops — composition via agent_task or events 20. ✅ `on_failure` part of execution policy from v1 21. ✅ Step-level retry and timeout overrides -22. ✅ Budget cap enforced pre-enqueue and mid-flight +22. ⏸ Budget cap enforced pre-enqueue and mid-flight — **deferred** until the cost ledger ships (see §8 Budget enforcement) ### Components 23. ✅ Dispatcher / executor / handlers / registry — distinct, each replaceable @@ -1197,25 +1151,22 @@ place. 29. ✅ Automations publish run events for composability 30. ✅ Publish/subscribe behind interface — no direct table access elsewhere -### Capability storage -31. ✅ Native capabilities registered in-memory at startup from the codebase. Identical across all workers. -32. ⏸ MCP capability metadata persisted in `mcp_connections` and `mcp_tools` tables — **deferred to Phase 4** -33. ⏸ MCP handler closures built lazily per worker from database state — **deferred to Phase 4** -34. ⏸ MCP server tool list re-harvested on a schedule — **deferred to Phase 4** -35. ⏸ MCP tools harvested into the capability registry at connection time — **deferred to Phase 4** -36. ⏸ Side effects inferred from MCP hints + naming + admin overrides — **deferred to Phase 4** -37. ⏸ MCP tools callable directly (no agent required) when caller knows args — **deferred to Phase 4** +### Capability unification — all deferred to post-v1 +31. ⏸ One shared catalog of "things this SurfSense instance can do" — **deferred**, see §3 +32. ⏸ Handler `CallContext` (caller user id, search space id, run id) — **deferred** with unification +33. ⏸ Per-capability scope declarations driving authorization — **deferred** with unification +34. ⏸ MCP integration on top of the unification layer (`mcp_connections`, `mcp_tools`, harvester) — **deferred to Phase 4** ### Credentials — all deferred to Phase 2 -38. ⏸ Credentials never appear in the automation definition — only connection IDs do — **Phase 2** -39. ⏸ Credentials never appear in the LLM's context — the host holds them — **Phase 2** -40. ⏸ Credentials resolved per-call by `ActionContext`, not pre-loaded into worker environment — **Phase 2** -41. ⏸ Tokens encrypted at rest; refresh handled automatically by `ActionContext.resolve_*_client` — **Phase 2** +35. ⏸ Credentials never appear in the automation definition — only connection IDs do — **Phase 2** +36. ⏸ Credentials never appear in the LLM's context — the host holds them — **Phase 2** +37. ⏸ Credentials resolved per-call by the handler context, not pre-loaded into worker environment — **Phase 2** +38. ⏸ Tokens encrypted at rest; refresh handled automatically by the handler context — **Phase 2** -### v1-minimum (new lock) -v1. ✅ `Capability` is exactly five fields: `id`, `description`, `input_schema`, `output_schema`, `handler`. Additional fields are added only when a concrete consumer feature requires them. -v2. ✅ Cost is **measured** from a per-run ledger, not declared. Pre-flight cost checks return when the ledger has enough history. -v3. ✅ Single `automations_default` Celery queue in v1. Multi-queue routing returns when load justifies it. +### v1-minimum +39. ✅ v1 ships actions only — no separate capability layer. `ActionDefinition` is five fields: `type`, `name`, `description`, `params_schema`, `handler`. Additional fields are added only when a concrete consumer feature requires them. +40. ✅ Cost is **measured** from a per-run ledger, not declared. Pre-flight cost checks return when the ledger has enough history. +41. ✅ Single `automations_default` Celery queue in v1. Multi-queue routing returns when load justifies it. ### NL authoring 42. ✅ LLM-authored templates is the primary path from day one — not a Phase 3 addition. Hand-authoring JSON is supported but secondary @@ -1227,7 +1178,7 @@ v3. ✅ Single `automations_default` Celery queue in v1. Multi-queue routing ret 48. ✅ NL drafts are transient storage, not a core table ### Data model -49. ✅ Six tables total — four for engine state, two for MCP persistence +49. ✅ v1 ships three tables (`automations`, `automation_triggers`, `automation_runs`). `domain_events` lands in Phase 3; `mcp_connections` and `mcp_tools` in Phase 4. 50. ✅ Run rows snapshot the definition (immutable history) 51. ✅ All entities scoped by `search_space_id` for RBAC 52. ✅ Editing an automation bumps `version`; existing runs unaffected @@ -1283,7 +1234,7 @@ Schemas spelled out concretely. Those follow mechanically from this plan. agent (cron and skills subsystems); n8n documentation on node types and workflow data model; the SurfSense repository and DeepWiki architecture notes (FastAPI + Celery Beat + Electric SQL + LangGraph Deep Agents + -Search Space RBAC); Model Context Protocol specification for capability -harvesting; AWS EventBridge for filter grammar; workflow-pattern +Search Space RBAC); Model Context Protocol specification for external +tool harvesting; AWS EventBridge for filter grammar; workflow-pattern literature (van der Aalst et al.) for the trigger / action / concurrency vocabulary.* diff --git a/surfsense_backend/alembic/versions/144_add_automation_tables.py b/surfsense_backend/alembic/versions/144_add_automation_tables.py index 6daf4075f..8d836095d 100644 --- a/surfsense_backend/alembic/versions/144_add_automation_tables.py +++ b/surfsense_backend/alembic/versions/144_add_automation_tables.py @@ -87,6 +87,7 @@ def upgrade() -> None: REFERENCES automations(id) ON DELETE CASCADE, type automation_trigger_type NOT NULL, params JSONB NOT NULL, + static_inputs JSONB NOT NULL DEFAULT '{}'::jsonb, enabled BOOLEAN NOT NULL DEFAULT true, last_fired_at TIMESTAMP WITH TIME ZONE, next_fire_at TIMESTAMP WITH TIME ZONE, @@ -129,8 +130,7 @@ def upgrade() -> None: REFERENCES automation_triggers(id) ON DELETE SET NULL, status automation_run_status NOT NULL DEFAULT 'pending', definition_snapshot JSONB NOT NULL, - trigger_payload JSONB, - resolved_inputs JSONB NOT NULL DEFAULT '{}'::jsonb, + inputs JSONB NOT NULL DEFAULT '{}'::jsonb, step_results JSONB NOT NULL DEFAULT '[]'::jsonb, output JSONB, artifacts JSONB NOT NULL DEFAULT '[]'::jsonb, diff --git a/surfsense_backend/app/automations/actions/agent_task/definition.py b/surfsense_backend/app/automations/actions/agent_task/definition.py index d7db5cfcd..7d14dc49e 100644 --- a/surfsense_backend/app/automations/actions/agent_task/definition.py +++ b/surfsense_backend/app/automations/actions/agent_task/definition.py @@ -11,7 +11,7 @@ AGENT_TASK_ACTION = ActionDefinition( type="agent_task", name="Agent task", description="Run a multi_agent_chat turn from an automation step.", - params_schema=AgentTaskActionParams.model_json_schema(), + params_model=AgentTaskActionParams, build_handler=build_handler, ) diff --git a/surfsense_backend/app/automations/actions/types.py b/surfsense_backend/app/automations/actions/types.py index 433c60841..2c4ffad8d 100644 --- a/surfsense_backend/app/automations/actions/types.py +++ b/surfsense_backend/app/automations/actions/types.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from typing import Any from uuid import UUID +from pydantic import BaseModel from sqlalchemy.ext.asyncio import AsyncSession @@ -30,5 +31,10 @@ class ActionDefinition: type: str name: str description: str - params_schema: dict[str, Any] + params_model: type[BaseModel] build_handler: ActionHandlerFactory + + @property + def params_schema(self) -> dict[str, Any]: + """JSON Schema (draft 2020-12) derived from ``params_model``.""" + return self.params_model.model_json_schema() diff --git a/surfsense_backend/app/automations/api/__init__.py b/surfsense_backend/app/automations/api/__init__.py index 459c6c1b4..a18e91a95 100644 --- a/surfsense_backend/app/automations/api/__init__.py +++ b/surfsense_backend/app/automations/api/__init__.py @@ -5,8 +5,12 @@ from __future__ import annotations from fastapi import APIRouter from .automation import router as automation_router +from .run import router as run_router +from .trigger import router as trigger_router router = APIRouter() router.include_router(automation_router) +router.include_router(trigger_router) +router.include_router(run_router) __all__ = ["router"] diff --git a/surfsense_backend/app/automations/api/automation.py b/surfsense_backend/app/automations/api/automation.py index 4d0ce7209..b67f0af09 100644 --- a/surfsense_backend/app/automations/api/automation.py +++ b/surfsense_backend/app/automations/api/automation.py @@ -1,23 +1,80 @@ -"""Routes for the ``Automation`` resource.""" +"""HTTP routes for the ``Automation`` resource.""" from __future__ import annotations -from typing import Any +from fastapi import APIRouter, Depends, Query, status -from fastapi import APIRouter, Body, Depends - -from app.automations.api.schemas import RunDispatched +from app.automations.schemas.api import ( + AutomationCreate, + AutomationDetail, + AutomationList, + AutomationSummary, + AutomationUpdate, +) from app.automations.services import AutomationService, get_automation_service router = APIRouter() -@router.post("/automations/{automation_id}/run", response_model=RunDispatched) -async def run_automation_now( - automation_id: int, - payload: dict[str, Any] | None = Body(default=None), +@router.post( + "/automations", + response_model=AutomationDetail, + status_code=status.HTTP_201_CREATED, +) +async def create_automation( + payload: AutomationCreate, service: AutomationService = Depends(get_automation_service), -) -> RunDispatched: - """Fire a manual run.""" - run = await service.run_now(automation_id=automation_id, payload=payload) - return RunDispatched(run_id=run.id, status=run.status) +) -> AutomationDetail: + """Create an automation, optionally with initial triggers (atomic).""" + automation = await service.create(payload) + return AutomationDetail.model_validate(automation) + + +@router.get("/automations", response_model=AutomationList) +async def list_automations( + search_space_id: int = Query(...), + limit: int = Query(default=50, ge=1, le=200), + offset: int = Query(default=0, ge=0), + service: AutomationService = Depends(get_automation_service), +) -> AutomationList: + """List automations in a search space.""" + items, total = await service.list( + search_space_id=search_space_id, limit=limit, offset=offset + ) + return AutomationList( + items=[AutomationSummary.model_validate(a) for a in items], + total=total, + ) + + +@router.get("/automations/{automation_id}", response_model=AutomationDetail) +async def get_automation( + automation_id: int, + service: AutomationService = Depends(get_automation_service), +) -> AutomationDetail: + """Get one automation with its definition and triggers.""" + automation = await service.get(automation_id) + return AutomationDetail.model_validate(automation) + + +@router.patch("/automations/{automation_id}", response_model=AutomationDetail) +async def update_automation( + automation_id: int, + patch: AutomationUpdate, + service: AutomationService = Depends(get_automation_service), +) -> AutomationDetail: + """Partially update an automation. Triggers are managed separately.""" + automation = await service.update(automation_id, patch) + return AutomationDetail.model_validate(automation) + + +@router.delete( + "/automations/{automation_id}", + status_code=status.HTTP_204_NO_CONTENT, +) +async def delete_automation( + automation_id: int, + service: AutomationService = Depends(get_automation_service), +) -> None: + """Delete an automation; triggers and runs are removed by FK cascade.""" + await service.delete(automation_id) diff --git a/surfsense_backend/app/automations/api/run.py b/surfsense_backend/app/automations/api/run.py new file mode 100644 index 000000000..d0d4bbfb7 --- /dev/null +++ b/surfsense_backend/app/automations/api/run.py @@ -0,0 +1,71 @@ +"""HTTP routes for automation runs (dispatch + history).""" + +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter, Body, Depends, Query, status + +from app.automations.schemas.api import ( + RunDetail, + RunDispatched, + RunList, + RunSummary, +) +from app.automations.services import RunService, get_run_service + +router = APIRouter() + + +@router.post( + "/automations/{automation_id}/run", + response_model=RunDispatched, + status_code=status.HTTP_202_ACCEPTED, +) +async def run_automation_now( + automation_id: int, + inputs: dict[str, Any] | None = Body(default=None), + service: RunService = Depends(get_run_service), +) -> RunDispatched: + """Fire a manual run. + + ``inputs`` is the runtime payload supplied by the caller; it is merged with + the manual trigger's ``static_inputs`` (static wins) and validated against + the automation's input schema. + """ + run = await service.dispatch_manual(automation_id=automation_id, runtime_inputs=inputs) + return RunDispatched(run_id=run.id, status=run.status) + + +@router.get( + "/automations/{automation_id}/runs", + response_model=RunList, +) +async def list_runs( + automation_id: int, + limit: int = Query(default=50, ge=1, le=200), + offset: int = Query(default=0, ge=0), + service: RunService = Depends(get_run_service), +) -> RunList: + """List run history for an automation, newest first.""" + items, total = await service.list( + automation_id=automation_id, limit=limit, offset=offset + ) + return RunList( + items=[RunSummary.model_validate(r) for r in items], + total=total, + ) + + +@router.get( + "/automations/{automation_id}/runs/{run_id}", + response_model=RunDetail, +) +async def get_run( + automation_id: int, + run_id: int, + service: RunService = Depends(get_run_service), +) -> RunDetail: + """Get the full record of a single run, including step results and artifacts.""" + run = await service.get(automation_id=automation_id, run_id=run_id) + return RunDetail.model_validate(run) diff --git a/surfsense_backend/app/automations/api/trigger.py b/surfsense_backend/app/automations/api/trigger.py new file mode 100644 index 000000000..40e47a86b --- /dev/null +++ b/surfsense_backend/app/automations/api/trigger.py @@ -0,0 +1,55 @@ +"""HTTP routes for triggers attached to an automation.""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends, status + +from app.automations.schemas.api import TriggerCreate, TriggerDetail, TriggerUpdate +from app.automations.services import TriggerService, get_trigger_service + +router = APIRouter() + + +@router.post( + "/automations/{automation_id}/triggers", + response_model=TriggerDetail, + status_code=status.HTTP_201_CREATED, +) +async def add_trigger( + automation_id: int, + payload: TriggerCreate, + service: TriggerService = Depends(get_trigger_service), +) -> TriggerDetail: + """Attach a new trigger to an automation.""" + trigger = await service.add(automation_id=automation_id, payload=payload) + return TriggerDetail.model_validate(trigger) + + +@router.patch( + "/automations/{automation_id}/triggers/{trigger_id}", + response_model=TriggerDetail, +) +async def update_trigger( + automation_id: int, + trigger_id: int, + patch: TriggerUpdate, + service: TriggerService = Depends(get_trigger_service), +) -> TriggerDetail: + """Toggle ``enabled`` or replace ``params``. Trigger type is immutable.""" + trigger = await service.update( + automation_id=automation_id, trigger_id=trigger_id, patch=patch + ) + return TriggerDetail.model_validate(trigger) + + +@router.delete( + "/automations/{automation_id}/triggers/{trigger_id}", + status_code=status.HTTP_204_NO_CONTENT, +) +async def remove_trigger( + automation_id: int, + trigger_id: int, + service: TriggerService = Depends(get_trigger_service), +) -> None: + """Detach a trigger from an automation.""" + await service.remove(automation_id=automation_id, trigger_id=trigger_id) diff --git a/surfsense_backend/app/automations/dispatch/run.py b/surfsense_backend/app/automations/dispatch/run.py index fd5107a18..e317a13b9 100644 --- a/surfsense_backend/app/automations/dispatch/run.py +++ b/surfsense_backend/app/automations/dispatch/run.py @@ -22,10 +22,14 @@ async def dispatch_run( session: AsyncSession, automation: Automation, trigger: AutomationTrigger, - payload: dict[str, Any] | None, + runtime_inputs: dict[str, Any] | None = None, ) -> AutomationRun: """Validate, snapshot the definition, persist an ``AutomationRun``, enqueue execution. + Final inputs = ``trigger.static_inputs`` merged with ``runtime_inputs``, + static winning on key collision. The merged dict is validated against + ``automation.definition.inputs.schema_`` and stored on the run. + Callers (trigger-specific adapters) are responsible for resolving ``automation`` and ``trigger`` and for the trigger-side ``ACTIVE`` / ``enabled`` guards. This function only handles what's identical across @@ -36,7 +40,8 @@ async def dispatch_run( except Exception as exc: raise DispatchError(f"invalid automation definition: {exc}") from exc - resolved_inputs = _validate_inputs(definition, payload or {}) + merged_inputs = {**(runtime_inputs or {}), **(trigger.static_inputs or {})} + validated_inputs = _validate_inputs(definition, merged_inputs) snapshot = definition.model_dump(mode="json", by_alias=True) run = AutomationRun( @@ -44,8 +49,7 @@ async def dispatch_run( trigger_id=trigger.id, status=RunStatus.PENDING, definition_snapshot=snapshot, - trigger_payload=payload, - resolved_inputs=resolved_inputs, + inputs=validated_inputs, step_results=[], artifacts=[], ) @@ -61,12 +65,12 @@ async def dispatch_run( def _validate_inputs( - definition: AutomationDefinition, payload: dict[str, Any] + definition: AutomationDefinition, inputs: dict[str, Any] ) -> dict[str, Any]: if definition.inputs is None or not definition.inputs.schema_: return {} try: - jsonschema.validate(instance=payload, schema=definition.inputs.schema_) + jsonschema.validate(instance=inputs, schema=definition.inputs.schema_) except jsonschema.ValidationError as exc: raise DispatchError(f"inputs: {exc.message}") from exc - return payload + return inputs diff --git a/surfsense_backend/app/automations/persistence/models/run.py b/surfsense_backend/app/automations/persistence/models/run.py index fdc355e8f..81b33c37c 100644 --- a/surfsense_backend/app/automations/persistence/models/run.py +++ b/surfsense_backend/app/automations/persistence/models/run.py @@ -45,8 +45,9 @@ class AutomationRun(BaseModel, TimestampMixin): # locked at fire time so historical runs always show the exact code path definition_snapshot = Column(JSONB, nullable=False) - trigger_payload = Column(JSONB, nullable=True) - resolved_inputs = Column(JSONB, nullable=False, server_default="{}") + # merged & validated inputs the run was dispatched with + # (trigger.static_inputs ∪ producer runtime data, static wins on collision) + inputs = Column(JSONB, nullable=False, server_default="{}") # one entry per executed step; agent_task entries carry their own # `agent_session_id` inside their entry step_results = Column(JSONB, nullable=False, server_default="[]") diff --git a/surfsense_backend/app/automations/persistence/models/trigger.py b/surfsense_backend/app/automations/persistence/models/trigger.py index b09bc3419..72d1d8d07 100644 --- a/surfsense_backend/app/automations/persistence/models/trigger.py +++ b/surfsense_backend/app/automations/persistence/models/trigger.py @@ -36,6 +36,10 @@ class AutomationTrigger(BaseModel, TimestampMixin): params = Column(JSONB, nullable=False) + # Per-attachment domain values merged into every dispatched run's inputs. + # Static wins over runtime data on key collision. + static_inputs = Column(JSONB, nullable=False, server_default="{}") + enabled = Column( Boolean, nullable=False, diff --git a/surfsense_backend/app/automations/runtime/executor.py b/surfsense_backend/app/automations/runtime/executor.py index ced44fb9b..b8a377e5b 100644 --- a/surfsense_backend/app/automations/runtime/executor.py +++ b/surfsense_backend/app/automations/runtime/executor.py @@ -106,7 +106,7 @@ def _build_template_ctx(run: AutomationRun, step_outputs: dict[str, Any]) -> dic trigger_type=trigger.type.value if trigger else None, started_at=run.started_at, attempt=1, - resolved_inputs=run.resolved_inputs or {}, + inputs=run.inputs or {}, step_outputs=step_outputs, ) diff --git a/surfsense_backend/app/automations/api/schemas/__init__.py b/surfsense_backend/app/automations/schemas/api/__init__.py similarity index 100% rename from surfsense_backend/app/automations/api/schemas/__init__.py rename to surfsense_backend/app/automations/schemas/api/__init__.py diff --git a/surfsense_backend/app/automations/api/schemas/automation.py b/surfsense_backend/app/automations/schemas/api/automation.py similarity index 100% rename from surfsense_backend/app/automations/api/schemas/automation.py rename to surfsense_backend/app/automations/schemas/api/automation.py diff --git a/surfsense_backend/app/automations/api/schemas/run.py b/surfsense_backend/app/automations/schemas/api/run.py similarity index 92% rename from surfsense_backend/app/automations/api/schemas/run.py rename to surfsense_backend/app/automations/schemas/api/run.py index 789b6f674..42ea7ac14 100644 --- a/surfsense_backend/app/automations/api/schemas/run.py +++ b/surfsense_backend/app/automations/schemas/api/run.py @@ -28,8 +28,7 @@ class RunDetail(RunSummary): """Full run view including snapshot, results and artifacts.""" definition_snapshot: dict[str, Any] - trigger_payload: dict[str, Any] | None = None - resolved_inputs: dict[str, Any] + inputs: dict[str, Any] step_results: list[dict[str, Any]] output: dict[str, Any] | None = None artifacts: list[dict[str, Any]] diff --git a/surfsense_backend/app/automations/api/schemas/trigger.py b/surfsense_backend/app/automations/schemas/api/trigger.py similarity index 87% rename from surfsense_backend/app/automations/api/schemas/trigger.py rename to surfsense_backend/app/automations/schemas/api/trigger.py index 32afe7c60..35176fb9f 100644 --- a/surfsense_backend/app/automations/api/schemas/trigger.py +++ b/surfsense_backend/app/automations/schemas/api/trigger.py @@ -17,6 +17,7 @@ class TriggerCreate(BaseModel): type: TriggerType params: dict[str, Any] = Field(default_factory=dict) + static_inputs: dict[str, Any] = Field(default_factory=dict) enabled: bool = True @@ -27,6 +28,7 @@ class TriggerUpdate(BaseModel): enabled: bool | None = None params: dict[str, Any] | None = None + static_inputs: dict[str, Any] | None = None class TriggerDetail(BaseModel): @@ -37,6 +39,7 @@ class TriggerDetail(BaseModel): id: int type: TriggerType params: dict[str, Any] + static_inputs: dict[str, Any] enabled: bool last_fired_at: datetime | None = None next_fire_at: datetime | None = None diff --git a/surfsense_backend/app/automations/services/__init__.py b/surfsense_backend/app/automations/services/__init__.py index f0a97d216..597aca98a 100644 --- a/surfsense_backend/app/automations/services/__init__.py +++ b/surfsense_backend/app/automations/services/__init__.py @@ -1,7 +1,16 @@ -"""Service layer for the automations feature.""" +"""Services for the automations HTTP layer (one service per resource).""" from __future__ import annotations from .automation import AutomationService, get_automation_service +from .run import RunService, get_run_service +from .trigger import TriggerService, get_trigger_service -__all__ = ["AutomationService", "get_automation_service"] +__all__ = [ + "AutomationService", + "RunService", + "TriggerService", + "get_automation_service", + "get_run_service", + "get_trigger_service", +] diff --git a/surfsense_backend/app/automations/services/automation.py b/surfsense_backend/app/automations/services/automation.py index 2a921e331..9140da3b5 100644 --- a/surfsense_backend/app/automations/services/automation.py +++ b/surfsense_backend/app/automations/services/automation.py @@ -2,54 +2,111 @@ from __future__ import annotations -from typing import Any +from datetime import UTC, datetime from fastapi import Depends, HTTPException +from pydantic import ValidationError +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload -from app.automations.dispatch import DispatchError +from app.automations.schemas.api import ( + AutomationCreate, + AutomationUpdate, + TriggerCreate, +) +from app.automations.persistence.enums.trigger_type import TriggerType from app.automations.persistence.models.automation import Automation -from app.automations.persistence.models.run import AutomationRun -from app.automations.triggers.manual import dispatch_manual_run +from app.automations.persistence.models.trigger import AutomationTrigger +from app.automations.triggers import get_trigger +from app.automations.triggers.schedule import compute_next_fire_at from app.db import Permission, User, get_async_session from app.users import current_active_user from app.utils.rbac import check_permission class AutomationService: - """Service for the ``Automation`` resource.""" + """Lifecycle of the ``Automation`` resource.""" def __init__(self, *, session: AsyncSession, user: User) -> None: self.session = session self.user = user - async def run_now( + async def create(self, payload: AutomationCreate) -> Automation: + """Create an automation and its initial triggers in one transaction.""" + await self._authorize(payload.search_space_id, Permission.AUTOMATIONS_CREATE.value) + + automation = Automation( + search_space_id=payload.search_space_id, + created_by_user_id=self.user.id, + name=payload.name, + description=payload.description, + definition=payload.definition.model_dump(mode="json", by_alias=True), + version=1, + ) + for spec in payload.triggers: + automation.triggers.append(_build_trigger(spec)) + + self.session.add(automation) + await self.session.commit() + return await self._get_with_triggers_or_raise(automation.id) + + async def list( self, *, - automation_id: int, - payload: dict[str, Any] | None, - ) -> AutomationRun: - """Fire a manual run for ``automation_id``.""" - automation = await self._get_automation_or_raise(automation_id) - await check_permission( - self.session, - self.user, - automation.search_space_id, - Permission.AUTOMATIONS_EXECUTE.value, - "You don't have permission to execute automations in this search space", + search_space_id: int, + limit: int, + offset: int, + ) -> tuple[list[Automation], int]: + """Return a page of automations and the total count.""" + await self._authorize(search_space_id, Permission.AUTOMATIONS_READ.value) + + base = select(Automation).where(Automation.search_space_id == search_space_id) + total = await self.session.scalar( + select(func.count()).select_from(base.subquery()) ) - try: - return await dispatch_manual_run( - session=self.session, - automation_id=automation_id, - payload=payload, + rows = ( + await self.session.execute( + base.order_by(Automation.created_at.desc()).limit(limit).offset(offset) ) - except DispatchError as exc: - raise HTTPException(status_code=422, detail=str(exc)) from exc + ).scalars().all() + return list(rows), int(total or 0) - async def _get_automation_or_raise(self, automation_id: int) -> Automation: - """Get the automation by id; 404 if missing.""" + async def get(self, automation_id: int) -> Automation: + """Get an automation with its triggers loaded.""" + automation = await self._get_with_triggers_or_raise(automation_id) + await self._authorize(automation.search_space_id, Permission.AUTOMATIONS_READ.value) + return automation + + async def update(self, automation_id: int, patch: AutomationUpdate) -> Automation: + """Patch fields. Bumps ``version`` when ``definition`` changes.""" + automation = await self._get_with_triggers_or_raise(automation_id) + await self._authorize(automation.search_space_id, Permission.AUTOMATIONS_UPDATE.value) + + data = patch.model_dump(exclude_unset=True) + + if "name" in data: + automation.name = data["name"] + if "description" in data: + automation.description = data["description"] + if "status" in data: + automation.status = data["status"] + if "definition" in data: + automation.definition = patch.definition.model_dump(mode="json", by_alias=True) + automation.version += 1 + + await self.session.commit() + return await self._get_with_triggers_or_raise(automation_id) + + async def delete(self, automation_id: int) -> None: + """Delete an automation; FK cascades remove triggers and runs.""" + automation = await self._get_or_raise(automation_id) + await self._authorize(automation.search_space_id, Permission.AUTOMATIONS_DELETE.value) + await self.session.delete(automation) + await self.session.commit() + + async def _get_or_raise(self, automation_id: int) -> Automation: automation = await self.session.get(Automation, automation_id) if automation is None: raise HTTPException( @@ -57,6 +114,56 @@ class AutomationService: ) return automation + async def _get_with_triggers_or_raise(self, automation_id: int) -> Automation: + stmt = ( + select(Automation) + .where(Automation.id == automation_id) + .options(selectinload(Automation.triggers)) + ) + automation = (await self.session.execute(stmt)).scalar_one_or_none() + if automation is None: + raise HTTPException( + status_code=404, detail=f"automation {automation_id} not found" + ) + return automation + + async def _authorize(self, search_space_id: int, permission: str) -> None: + await check_permission( + self.session, + self.user, + search_space_id, + permission, + f"You don't have permission to {permission.split(':')[1]} automations in this search space", + ) + + +def _build_trigger(spec: TriggerCreate) -> AutomationTrigger: + """Validate trigger params via its registered Pydantic model and build the ORM row.""" + definition = get_trigger(spec.type.value) + if definition is None: + raise HTTPException(status_code=422, detail=f"unknown trigger type {spec.type.value!r}") + + try: + validated = definition.params_model.model_validate(spec.params) + except ValidationError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc + + params = validated.model_dump(mode="json") + + next_fire_at = None + if spec.type == TriggerType.SCHEDULE and spec.enabled: + next_fire_at = compute_next_fire_at( + params["cron"], params["timezone"], after=datetime.now(UTC) + ) + + return AutomationTrigger( + type=spec.type, + params=params, + static_inputs=spec.static_inputs, + enabled=spec.enabled, + next_fire_at=next_fire_at, + ) + def get_automation_service( session: AsyncSession = Depends(get_async_session), diff --git a/surfsense_backend/app/automations/services/run.py b/surfsense_backend/app/automations/services/run.py new file mode 100644 index 000000000..92d79e9bc --- /dev/null +++ b/surfsense_backend/app/automations/services/run.py @@ -0,0 +1,93 @@ +"""``RunService`` — dispatch and history of automation runs.""" + +from __future__ import annotations + +from typing import Any + +from fastapi import Depends, HTTPException +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.automations.dispatch import DispatchError +from app.automations.persistence.models.automation import Automation +from app.automations.persistence.models.run import AutomationRun +from app.automations.triggers.manual import dispatch_manual_run +from app.db import Permission, User, get_async_session +from app.users import current_active_user +from app.utils.rbac import check_permission + + +class RunService: + """Lifecycle of the ``AutomationRun`` resource.""" + + def __init__(self, *, session: AsyncSession, user: User) -> None: + self.session = session + self.user = user + + async def dispatch_manual( + self, + *, + automation_id: int, + runtime_inputs: dict[str, Any] | None, + ) -> AutomationRun: + """Fire a manual run via the registered manual trigger.""" + await self._authorize(automation_id, Permission.AUTOMATIONS_EXECUTE.value) + try: + return await dispatch_manual_run( + session=self.session, + automation_id=automation_id, + runtime_inputs=runtime_inputs, + ) + except DispatchError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc + + async def list( + self, + *, + automation_id: int, + limit: int, + offset: int, + ) -> tuple[list[AutomationRun], int]: + """Return a page of runs for an automation, newest first.""" + await self._authorize(automation_id, Permission.AUTOMATIONS_READ.value) + + base = select(AutomationRun).where(AutomationRun.automation_id == automation_id) + total = await self.session.scalar( + select(func.count()).select_from(base.subquery()) + ) + + rows = ( + await self.session.execute( + base.order_by(AutomationRun.created_at.desc()).limit(limit).offset(offset) + ) + ).scalars().all() + return list(rows), int(total or 0) + + async def get(self, *, automation_id: int, run_id: int) -> AutomationRun: + await self._authorize(automation_id, Permission.AUTOMATIONS_READ.value) + run = await self.session.get(AutomationRun, run_id) + if run is None or run.automation_id != automation_id: + raise HTTPException(status_code=404, detail=f"run {run_id} not found") + return run + + async def _authorize(self, automation_id: int, permission: str) -> Automation: + automation = await self.session.get(Automation, automation_id) + if automation is None: + raise HTTPException( + status_code=404, detail=f"automation {automation_id} not found" + ) + await check_permission( + self.session, + self.user, + automation.search_space_id, + permission, + f"You don't have permission to {permission.split(':')[1]} automations in this search space", + ) + return automation + + +def get_run_service( + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +) -> RunService: + return RunService(session=session, user=user) diff --git a/surfsense_backend/app/automations/services/trigger.py b/surfsense_backend/app/automations/services/trigger.py new file mode 100644 index 000000000..33e9c1386 --- /dev/null +++ b/surfsense_backend/app/automations/services/trigger.py @@ -0,0 +1,143 @@ +"""``TriggerService`` — lifecycle of triggers attached to an automation.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from fastapi import Depends, HTTPException +from pydantic import ValidationError +from sqlalchemy.ext.asyncio import AsyncSession + +from app.automations.schemas.api import TriggerCreate, TriggerUpdate +from app.automations.persistence.enums.trigger_type import TriggerType +from app.automations.persistence.models.automation import Automation +from app.automations.persistence.models.trigger import AutomationTrigger +from app.automations.triggers import get_trigger +from app.automations.triggers.schedule import compute_next_fire_at +from app.db import Permission, User, get_async_session +from app.users import current_active_user +from app.utils.rbac import check_permission + + +class TriggerService: + """Lifecycle of the ``AutomationTrigger`` sub-resource.""" + + def __init__(self, *, session: AsyncSession, user: User) -> None: + self.session = session + self.user = user + + async def add( + self, *, automation_id: int, payload: TriggerCreate + ) -> AutomationTrigger: + automation = await self._authorize_automation( + automation_id, Permission.AUTOMATIONS_UPDATE.value + ) + + validated_params = _validate_params(payload.type, payload.params) + trigger = AutomationTrigger( + automation_id=automation.id, + type=payload.type, + params=validated_params, + static_inputs=payload.static_inputs, + enabled=payload.enabled, + next_fire_at=_initial_next_fire(payload.type, validated_params, payload.enabled), + ) + self.session.add(trigger) + await self.session.commit() + await self.session.refresh(trigger) + return trigger + + async def update( + self, + *, + automation_id: int, + trigger_id: int, + patch: TriggerUpdate, + ) -> AutomationTrigger: + await self._authorize_automation(automation_id, Permission.AUTOMATIONS_UPDATE.value) + trigger = await self._get_trigger_or_raise(automation_id, trigger_id) + + data = patch.model_dump(exclude_unset=True) + + if "params" in data: + trigger.params = _validate_params(trigger.type, data["params"]) + + if "static_inputs" in data: + trigger.static_inputs = data["static_inputs"] + + if "enabled" in data: + trigger.enabled = data["enabled"] + + # Recompute next_fire_at when schedule timing changed or the trigger was + # toggled back on. Manual triggers always have NULL next_fire_at. + if trigger.type == TriggerType.SCHEDULE: + trigger.next_fire_at = _initial_next_fire( + trigger.type, trigger.params, trigger.enabled + ) + + await self.session.commit() + await self.session.refresh(trigger) + return trigger + + async def remove(self, *, automation_id: int, trigger_id: int) -> None: + await self._authorize_automation(automation_id, Permission.AUTOMATIONS_UPDATE.value) + trigger = await self._get_trigger_or_raise(automation_id, trigger_id) + await self.session.delete(trigger) + await self.session.commit() + + async def _authorize_automation( + self, automation_id: int, permission: str + ) -> Automation: + automation = await self.session.get(Automation, automation_id) + if automation is None: + raise HTTPException( + status_code=404, detail=f"automation {automation_id} not found" + ) + await check_permission( + self.session, + self.user, + automation.search_space_id, + permission, + f"You don't have permission to {permission.split(':')[1]} automations in this search space", + ) + return automation + + async def _get_trigger_or_raise( + self, automation_id: int, trigger_id: int + ) -> AutomationTrigger: + trigger = await self.session.get(AutomationTrigger, trigger_id) + if trigger is None or trigger.automation_id != automation_id: + raise HTTPException( + status_code=404, detail=f"trigger {trigger_id} not found" + ) + return trigger + + +def _validate_params(trigger_type: TriggerType, raw: dict) -> dict: + definition = get_trigger(trigger_type.value) + if definition is None: + raise HTTPException( + status_code=422, detail=f"unknown trigger type {trigger_type.value!r}" + ) + try: + validated = definition.params_model.model_validate(raw) + except ValidationError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc + return validated.model_dump(mode="json") + + +def _initial_next_fire( + trigger_type: TriggerType, params: dict, enabled: bool +) -> datetime | None: + if trigger_type != TriggerType.SCHEDULE or not enabled: + return None + return compute_next_fire_at( + params["cron"], params["timezone"], after=datetime.now(UTC) + ) + + +def get_trigger_service( + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +) -> TriggerService: + return TriggerService(session=session, user=user) diff --git a/surfsense_backend/app/automations/tasks/schedule_tick.py b/surfsense_backend/app/automations/tasks/schedule_tick.py index cade621c7..385bd7242 100644 --- a/surfsense_backend/app/automations/tasks/schedule_tick.py +++ b/surfsense_backend/app/automations/tasks/schedule_tick.py @@ -15,6 +15,7 @@ Runs every minute. Each tick performs two passes: from __future__ import annotations import logging +from dataclasses import dataclass from datetime import UTC, datetime from sqlalchemy import select @@ -39,6 +40,15 @@ TASK_NAME = "automation_schedule_tick" _TICK_BATCH = 200 +@dataclass(frozen=True, slots=True) +class _Claim: + """Per-trigger fire context captured before row state is mutated.""" + + trigger_id: int + scheduled_for: datetime + previous_last_fired_at: datetime | None + + @celery_app.task(name=TASK_NAME) def automation_schedule_tick() -> None: """Tick once: self-heal NULL next_fire_at, claim due rows, fire each.""" @@ -52,12 +62,12 @@ async def _tick() -> None: await _self_heal_null_next_fire(session, now=now) - claimed_ids = await _claim_due_triggers(session, now=now) - if not claimed_ids: + claims = await _claim_due_triggers(session, now=now) + if not claims: return - for trigger_id in claimed_ids: - await _fire_one(session, trigger_id=trigger_id) + for claim in claims: + await _fire_one(session, claim=claim, fired_at=now) async def _self_heal_null_next_fire(session: AsyncSession, *, now: datetime) -> None: @@ -95,8 +105,8 @@ async def _self_heal_null_next_fire(session: AsyncSession, *, now: datetime) -> async def _claim_due_triggers( session: AsyncSession, *, now: datetime -) -> list[int]: - """Lock and advance due rows; return claimed trigger ids.""" +) -> list[_Claim]: + """Lock and advance due rows; return per-trigger fire context.""" stmt = ( select(AutomationTrigger) .where( @@ -113,8 +123,12 @@ async def _claim_due_triggers( if not triggers: return [] - claimed: list[int] = [] + claims: list[_Claim] = [] for trigger in triggers: + # Snapshot fire-context BEFORE we advance the row. + scheduled_for = trigger.next_fire_at + previous_last_fired_at = trigger.last_fired_at + try: trigger.next_fire_at = compute_next_fire_at( trigger.params["cron"], @@ -131,29 +145,43 @@ async def _claim_due_triggers( continue trigger.last_fired_at = now - claimed.append(trigger.id) + claims.append( + _Claim( + trigger_id=trigger.id, + scheduled_for=scheduled_for, + previous_last_fired_at=previous_last_fired_at, + ) + ) await session.commit() - return claimed + return claims -async def _fire_one(session: AsyncSession, *, trigger_id: int) -> None: +async def _fire_one( + session: AsyncSession, *, claim: _Claim, fired_at: datetime +) -> None: """Reload the trigger post-commit and dispatch a run for it.""" - trigger = await session.get(AutomationTrigger, trigger_id) + trigger = await session.get(AutomationTrigger, claim.trigger_id) if trigger is None: return try: - run = await dispatch_schedule_run(session=session, trigger=trigger) + run = await dispatch_schedule_run( + session=session, + trigger=trigger, + fired_at=fired_at, + scheduled_for=claim.scheduled_for, + previous_last_fired_at=claim.previous_last_fired_at, + ) logger.info( "scheduled fire: trigger=%d automation=%d run=%d", - trigger_id, + claim.trigger_id, trigger.automation_id, run.id, ) except Exception: logger.exception( "scheduled fire failed for trigger %d (next attempt at next match)", - trigger_id, + claim.trigger_id, ) await session.rollback() diff --git a/surfsense_backend/app/automations/templating/context.py b/surfsense_backend/app/automations/templating/context.py index 3ca87694c..96fdb02e9 100644 --- a/surfsense_backend/app/automations/templating/context.py +++ b/surfsense_backend/app/automations/templating/context.py @@ -19,7 +19,7 @@ def build_run_context( trigger_type: str | None, started_at: datetime | None, attempt: int, - resolved_inputs: Mapping[str, Any], + inputs: Mapping[str, Any], step_outputs: Mapping[str, Any], ) -> dict[str, Any]: """Build the ``{run, inputs, steps}`` namespace exposed to every template.""" @@ -36,6 +36,6 @@ def build_run_context( "started_at": started_at, "attempt": attempt, }, - "inputs": dict(resolved_inputs), + "inputs": dict(inputs), "steps": dict(step_outputs), } diff --git a/surfsense_backend/app/automations/triggers/manual/definition.py b/surfsense_backend/app/automations/triggers/manual/definition.py index 9eb0282af..5a3529116 100644 --- a/surfsense_backend/app/automations/triggers/manual/definition.py +++ b/surfsense_backend/app/automations/triggers/manual/definition.py @@ -9,8 +9,7 @@ from .params import ManualTriggerParams MANUAL_TRIGGER = TriggerDefinition( type="manual", description="Fire on a user-initiated 'Run now' invocation.", - params_schema=ManualTriggerParams.model_json_schema(), - payload_schema={"type": "object"}, + params_model=ManualTriggerParams, ) register_trigger(MANUAL_TRIGGER) diff --git a/surfsense_backend/app/automations/triggers/manual/dispatch.py b/surfsense_backend/app/automations/triggers/manual/dispatch.py index 750c99937..6c92317d0 100644 --- a/surfsense_backend/app/automations/triggers/manual/dispatch.py +++ b/surfsense_backend/app/automations/triggers/manual/dispatch.py @@ -19,9 +19,14 @@ async def dispatch_manual_run( *, session: AsyncSession, automation_id: int, - payload: dict[str, Any] | None, + runtime_inputs: dict[str, Any] | None, ) -> AutomationRun: - """Find the automation + its enabled manual trigger, then run the generic dispatch.""" + """Find the automation + its enabled manual trigger, then run the generic dispatch. + + ``runtime_inputs`` is the caller-supplied payload (e.g. an HTTP body for a + "Run now" API call); it is merged with the trigger's ``static_inputs`` by + the generic dispatcher, with static winning on key collision. + """ automation = await _load_automation(session, automation_id) if automation is None: raise DispatchError(f"automation {automation_id} not found") @@ -41,7 +46,7 @@ async def dispatch_manual_run( session=session, automation=automation, trigger=trigger, - payload=payload, + runtime_inputs=runtime_inputs, ) diff --git a/surfsense_backend/app/automations/triggers/schedule/definition.py b/surfsense_backend/app/automations/triggers/schedule/definition.py index 3f86d767c..605765307 100644 --- a/surfsense_backend/app/automations/triggers/schedule/definition.py +++ b/surfsense_backend/app/automations/triggers/schedule/definition.py @@ -9,12 +9,7 @@ from .params import ScheduleTriggerParams SCHEDULE_TRIGGER = TriggerDefinition( type="schedule", description="Fire on a cron schedule in a given timezone.", - params_schema=ScheduleTriggerParams.model_json_schema(), - payload_schema={ - "type": "object", - "additionalProperties": False, - "properties": {}, - }, + params_model=ScheduleTriggerParams, ) register_trigger(SCHEDULE_TRIGGER) diff --git a/surfsense_backend/app/automations/triggers/schedule/dispatch.py b/surfsense_backend/app/automations/triggers/schedule/dispatch.py index fb4fcf686..6d3d5fcb9 100644 --- a/surfsense_backend/app/automations/triggers/schedule/dispatch.py +++ b/surfsense_backend/app/automations/triggers/schedule/dispatch.py @@ -2,6 +2,8 @@ from __future__ import annotations +from datetime import datetime + from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -16,9 +18,18 @@ async def dispatch_schedule_run( *, session: AsyncSession, trigger: AutomationTrigger, + fired_at: datetime, + scheduled_for: datetime, + previous_last_fired_at: datetime | None, ) -> AutomationRun: """Fire one scheduled run for ``trigger``. + Emits calendar context as runtime inputs: + + - ``fired_at`` — actual fire time + - ``scheduled_for`` — cron-derived target time for this fire + - ``last_fired_at`` — fire time of the previous run, or null on first fire + The caller (the schedule tick) is responsible for selecting due triggers and advancing ``next_fire_at`` / ``last_fired_at`` before invoking this. """ @@ -33,11 +44,19 @@ async def dispatch_schedule_run( f"automation {trigger.automation_id} is {automation.status.value}, not active" ) + runtime_inputs = { + "fired_at": fired_at.isoformat(), + "scheduled_for": scheduled_for.isoformat(), + "last_fired_at": ( + previous_last_fired_at.isoformat() if previous_last_fired_at else None + ), + } + return await dispatch_run( session=session, automation=automation, trigger=trigger, - payload=None, + runtime_inputs=runtime_inputs, ) diff --git a/surfsense_backend/app/automations/triggers/types.py b/surfsense_backend/app/automations/triggers/types.py index 783bd7842..aa2808e4d 100644 --- a/surfsense_backend/app/automations/triggers/types.py +++ b/surfsense_backend/app/automations/triggers/types.py @@ -5,10 +5,16 @@ from __future__ import annotations from dataclasses import dataclass from typing import Any +from pydantic import BaseModel + @dataclass(frozen=True, slots=True) class TriggerDefinition: type: str description: str - params_schema: dict[str, Any] - payload_schema: dict[str, Any] + params_model: type[BaseModel] + + @property + def params_schema(self) -> dict[str, Any]: + """JSON Schema (draft 2020-12) derived from ``params_model``.""" + return self.params_model.model_json_schema()