From 30fff9e52fb8bfc5fc4941e4a7647d37fa48b732 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 29 May 2026 18:13:09 +0200 Subject: [PATCH] refactor(automations): move agent_task to builtin and restructure dispatch --- .../app/automations/actions/__init__.py | 2 +- .../automations/actions/builtin/__init__.py | 5 ++ .../{ => builtin}/agent_task/__init__.py | 0 .../{ => builtin}/agent_task/auto_decide.py | 0 .../{ => builtin}/agent_task/definition.py | 4 +- .../{ => builtin}/agent_task/dependencies.py | 0 .../{ => builtin}/agent_task/factory.py | 2 +- .../{ => builtin}/agent_task/finalize.py | 0 .../{ => builtin}/agent_task/invoke.py | 2 +- .../{ => builtin}/agent_task/params.py | 0 .../app/automations/dispatch/__init__.py | 5 +- .../app/automations/dispatch/inputs.py | 43 ++++++++++ .../app/automations/dispatch/launch.py | 60 ++++++++++++++ .../dispatch/{start.py => resolve.py} | 32 ++----- .../app/automations/dispatch/run.py | 83 ------------------- .../triggers/builtin/event/selector.py | 4 +- .../triggers/builtin/schedule/selector.py | 4 +- .../{agent_task => builtin}/__init__.py | 0 .../actions/builtin/agent_task/__init__.py | 0 .../agent_task/test_auto_decide.py | 2 +- .../{ => builtin}/agent_task/test_finalize.py | 4 +- ...test_validate_inputs.py => test_inputs.py} | 23 +++-- 22 files changed, 142 insertions(+), 133 deletions(-) create mode 100644 surfsense_backend/app/automations/actions/builtin/__init__.py rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/__init__.py (100%) rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/auto_decide.py (100%) rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/definition.py (85%) rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/dependencies.py (100%) rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/factory.py (95%) rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/finalize.py (100%) rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/invoke.py (99%) rename surfsense_backend/app/automations/actions/{ => builtin}/agent_task/params.py (100%) create mode 100644 surfsense_backend/app/automations/dispatch/inputs.py create mode 100644 surfsense_backend/app/automations/dispatch/launch.py rename surfsense_backend/app/automations/dispatch/{start.py => resolve.py} (56%) delete mode 100644 surfsense_backend/app/automations/dispatch/run.py rename surfsense_backend/tests/unit/automations/actions/{agent_task => builtin}/__init__.py (100%) create mode 100644 surfsense_backend/tests/unit/automations/actions/builtin/agent_task/__init__.py rename surfsense_backend/tests/unit/automations/actions/{ => builtin}/agent_task/test_auto_decide.py (97%) rename surfsense_backend/tests/unit/automations/actions/{ => builtin}/agent_task/test_finalize.py (96%) rename surfsense_backend/tests/unit/automations/dispatch/{test_validate_inputs.py => test_inputs.py} (74%) diff --git a/surfsense_backend/app/automations/actions/__init__.py b/surfsense_backend/app/automations/actions/__init__.py index 72669532f..ac5a07ac4 100644 --- a/surfsense_backend/app/automations/actions/__init__.py +++ b/surfsense_backend/app/automations/actions/__init__.py @@ -21,4 +21,4 @@ __all__ = [ ] # Built-in actions self-register at import time. -from . import agent_task # noqa: F401 +from . import builtin # noqa: F401 diff --git a/surfsense_backend/app/automations/actions/builtin/__init__.py b/surfsense_backend/app/automations/actions/builtin/__init__.py new file mode 100644 index 000000000..f3d21a044 --- /dev/null +++ b/surfsense_backend/app/automations/actions/builtin/__init__.py @@ -0,0 +1,5 @@ +"""Built-in action types — each in its own subpackage, self-registering at import.""" + +from __future__ import annotations + +from . import agent_task # noqa: F401 diff --git a/surfsense_backend/app/automations/actions/agent_task/__init__.py b/surfsense_backend/app/automations/actions/builtin/agent_task/__init__.py similarity index 100% rename from surfsense_backend/app/automations/actions/agent_task/__init__.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/__init__.py diff --git a/surfsense_backend/app/automations/actions/agent_task/auto_decide.py b/surfsense_backend/app/automations/actions/builtin/agent_task/auto_decide.py similarity index 100% rename from surfsense_backend/app/automations/actions/agent_task/auto_decide.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/auto_decide.py diff --git a/surfsense_backend/app/automations/actions/agent_task/definition.py b/surfsense_backend/app/automations/actions/builtin/agent_task/definition.py similarity index 85% rename from surfsense_backend/app/automations/actions/agent_task/definition.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/definition.py index 7d14dc49e..cc3fd563a 100644 --- a/surfsense_backend/app/automations/actions/agent_task/definition.py +++ b/surfsense_backend/app/automations/actions/builtin/agent_task/definition.py @@ -2,8 +2,8 @@ from __future__ import annotations -from ..store import register_action -from ..types import ActionDefinition +from ...store import register_action +from ...types import ActionDefinition from .factory import build_handler from .params import AgentTaskActionParams diff --git a/surfsense_backend/app/automations/actions/agent_task/dependencies.py b/surfsense_backend/app/automations/actions/builtin/agent_task/dependencies.py similarity index 100% rename from surfsense_backend/app/automations/actions/agent_task/dependencies.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/dependencies.py diff --git a/surfsense_backend/app/automations/actions/agent_task/factory.py b/surfsense_backend/app/automations/actions/builtin/agent_task/factory.py similarity index 95% rename from surfsense_backend/app/automations/actions/agent_task/factory.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/factory.py index dec75dce8..f4f5d7d37 100644 --- a/surfsense_backend/app/automations/actions/agent_task/factory.py +++ b/surfsense_backend/app/automations/actions/builtin/agent_task/factory.py @@ -4,7 +4,7 @@ from __future__ import annotations from typing import Any -from ..types import ActionContext, ActionHandler +from ...types import ActionContext, ActionHandler from .invoke import run_agent_task from .params import AgentTaskActionParams diff --git a/surfsense_backend/app/automations/actions/agent_task/finalize.py b/surfsense_backend/app/automations/actions/builtin/agent_task/finalize.py similarity index 100% rename from surfsense_backend/app/automations/actions/agent_task/finalize.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/finalize.py diff --git a/surfsense_backend/app/automations/actions/agent_task/invoke.py b/surfsense_backend/app/automations/actions/builtin/agent_task/invoke.py similarity index 99% rename from surfsense_backend/app/automations/actions/agent_task/invoke.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/invoke.py index fa02d263f..088140374 100644 --- a/surfsense_backend/app/automations/actions/agent_task/invoke.py +++ b/surfsense_backend/app/automations/actions/builtin/agent_task/invoke.py @@ -16,7 +16,7 @@ from app.agents.new_chat.mention_resolver import resolve_mentions, substitute_in from app.db import ChatVisibility, async_session_maker from app.schemas.new_chat import MentionedDocumentInfo -from ..types import ActionContext +from ...types import ActionContext from .auto_decide import build_auto_decisions from .dependencies import build_dependencies from .finalize import extract_final_assistant_message diff --git a/surfsense_backend/app/automations/actions/agent_task/params.py b/surfsense_backend/app/automations/actions/builtin/agent_task/params.py similarity index 100% rename from surfsense_backend/app/automations/actions/agent_task/params.py rename to surfsense_backend/app/automations/actions/builtin/agent_task/params.py diff --git a/surfsense_backend/app/automations/dispatch/__init__.py b/surfsense_backend/app/automations/dispatch/__init__.py index 9b8f3dc0b..bab1d122e 100644 --- a/surfsense_backend/app/automations/dispatch/__init__.py +++ b/surfsense_backend/app/automations/dispatch/__init__.py @@ -3,7 +3,6 @@ from __future__ import annotations from .errors import DispatchError -from .run import dispatch_run -from .start import start_run +from .launch import launch_run -__all__ = ["DispatchError", "dispatch_run", "start_run"] +__all__ = ["DispatchError", "launch_run"] diff --git a/surfsense_backend/app/automations/dispatch/inputs.py b/surfsense_backend/app/automations/dispatch/inputs.py new file mode 100644 index 000000000..61546b993 --- /dev/null +++ b/surfsense_backend/app/automations/dispatch/inputs.py @@ -0,0 +1,43 @@ +"""Merge and validate the inputs a run starts with.""" + +from __future__ import annotations + +from typing import Any + +import jsonschema + +from app.automations.persistence.models.trigger import AutomationTrigger +from app.automations.schemas.definition.envelope import AutomationDefinition + +from .errors import DispatchError + + +def prepare_inputs( + definition: AutomationDefinition, + trigger: AutomationTrigger, + runtime_inputs: dict[str, Any] | None, +) -> dict[str, Any]: + """Merge ``trigger.static_inputs`` over ``runtime_inputs``, then validate. + + Static inputs win on key collision. + """ + merged = {**(runtime_inputs or {}), **(trigger.static_inputs or {})} + return validate_inputs(definition, merged) + + +def validate_inputs( + definition: AutomationDefinition, inputs: dict[str, Any] +) -> dict[str, Any]: + """Validate ``inputs`` against the definition's optional declared schema. + + No declared schema → pass through unchanged so runtime keys (``fired_at``, + ``last_fired_at``, ...) still reach the template context. A declared schema + that the inputs violate is surfaced as ``DispatchError``. + """ + if definition.inputs is None or not definition.inputs.schema_: + return inputs + try: + jsonschema.validate(instance=inputs, schema=definition.inputs.schema_) + except jsonschema.ValidationError as exc: + raise DispatchError(f"inputs: {exc.message}") from exc + return inputs diff --git a/surfsense_backend/app/automations/dispatch/launch.py b/surfsense_backend/app/automations/dispatch/launch.py new file mode 100644 index 000000000..cf7fb53d8 --- /dev/null +++ b/surfsense_backend/app/automations/dispatch/launch.py @@ -0,0 +1,60 @@ +"""Launch a run for a trigger that fired: resolve, validate, persist, enqueue. + +The trigger-facing entry every selector calls. A selector builds the runtime +inputs and hands one trigger row here; this resolves and guards its automation, +snapshots the definition onto a PENDING run, and enqueues execution. The +snapshot makes the run immune to later edits of the automation. +""" + +from __future__ import annotations + +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.automations.persistence.enums.run_status import RunStatus +from app.automations.persistence.models.run import AutomationRun +from app.automations.persistence.models.trigger import AutomationTrigger +from app.automations.schemas.definition.envelope import AutomationDefinition +from app.automations.tasks.execute_run import automation_run_execute + +from .errors import DispatchError +from .inputs import prepare_inputs +from .resolve import resolve_active_automation + + +async def launch_run( + *, + session: AsyncSession, + trigger: AutomationTrigger, + runtime_inputs: dict[str, Any] | None = None, +) -> AutomationRun: + """Resolve ``trigger``'s active automation and enqueue a PENDING run for it.""" + automation = await resolve_active_automation(session, trigger) + + try: + definition = AutomationDefinition.model_validate(automation.definition) + except Exception as exc: + raise DispatchError(f"invalid automation definition: {exc}") from exc + + inputs = prepare_inputs(definition, trigger, runtime_inputs) + snapshot = definition.model_dump(mode="json", by_alias=True) + + run = AutomationRun( + automation_id=automation.id, + trigger_id=trigger.id, + status=RunStatus.PENDING, + definition_snapshot=snapshot, + inputs=inputs, + step_results=[], + artifacts=[], + ) + session.add(run) + await session.commit() + await session.refresh(run) + + automation_run_execute.apply_async( + args=[run.id], + time_limit=definition.execution.timeout_seconds, + ) + return run diff --git a/surfsense_backend/app/automations/dispatch/start.py b/surfsense_backend/app/automations/dispatch/resolve.py similarity index 56% rename from surfsense_backend/app/automations/dispatch/start.py rename to surfsense_backend/app/automations/dispatch/resolve.py index 96c74c691..13efd15ee 100644 --- a/surfsense_backend/app/automations/dispatch/start.py +++ b/surfsense_backend/app/automations/dispatch/resolve.py @@ -1,33 +1,24 @@ -"""Start one run for a trigger: resolve its automation, guard ``ACTIVE``, dispatch. - -Shared by every trigger type. A type's selector builds the runtime inputs and -hands one trigger row here; this resolves and guards the automation, then calls -the generic ``dispatch_run``. -""" +"""Resolve the automation behind a trigger and guard that it may run.""" from __future__ import annotations -from typing import Any - from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.automations.persistence.enums.automation_status import AutomationStatus from app.automations.persistence.models.automation import Automation -from app.automations.persistence.models.run import AutomationRun from app.automations.persistence.models.trigger import AutomationTrigger from .errors import DispatchError -from .run import dispatch_run -async def start_run( - *, - session: AsyncSession, - trigger: AutomationTrigger, - runtime_inputs: dict[str, Any] | None = None, -) -> AutomationRun: - """Resolve ``trigger``'s automation, require it ``ACTIVE``, dispatch a run.""" +async def resolve_active_automation( + session: AsyncSession, trigger: AutomationTrigger +) -> Automation: + """Load ``trigger``'s automation and require it ``ACTIVE``. + + Raises ``DispatchError`` if the automation is missing or not active. + """ automation = await _load_automation(session, trigger.automation_id) if automation is None: raise DispatchError( @@ -39,12 +30,7 @@ async def start_run( f"automation {trigger.automation_id} is {automation.status.value}, not active" ) - return await dispatch_run( - session=session, - automation=automation, - trigger=trigger, - runtime_inputs=runtime_inputs, - ) + return automation async def _load_automation( diff --git a/surfsense_backend/app/automations/dispatch/run.py b/surfsense_backend/app/automations/dispatch/run.py deleted file mode 100644 index 02d0b0356..000000000 --- a/surfsense_backend/app/automations/dispatch/run.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Generic run dispatch: validate, snapshot, persist, enqueue. Shared by every trigger.""" - -from __future__ import annotations - -from typing import Any - -import jsonschema -from sqlalchemy.ext.asyncio import AsyncSession - -from app.automations.persistence.enums.run_status import RunStatus -from app.automations.persistence.models.automation import Automation -from app.automations.persistence.models.run import AutomationRun -from app.automations.persistence.models.trigger import AutomationTrigger -from app.automations.schemas.definition.envelope import AutomationDefinition -from app.automations.tasks.execute_run import automation_run_execute - -from .errors import DispatchError - - -async def dispatch_run( - *, - session: AsyncSession, - automation: Automation, - trigger: AutomationTrigger, - runtime_inputs: dict[str, Any] | None = None, -) -> AutomationRun: - """Validate, snapshot the definition, persist an ``AutomationRun``, enqueue execution. - - Final inputs = ``trigger.static_inputs`` merged with ``runtime_inputs``, - static winning on key collision. The merged dict is validated against - ``automation.definition.inputs.schema_`` and stored on the run. - - Callers (trigger-specific adapters) are responsible for resolving - ``automation`` and ``trigger`` and for the trigger-side ``ACTIVE`` / - ``enabled`` guards. This function only handles what's identical across - every trigger type. - """ - try: - definition = AutomationDefinition.model_validate(automation.definition) - except Exception as exc: - raise DispatchError(f"invalid automation definition: {exc}") from exc - - merged_inputs = {**(runtime_inputs or {}), **(trigger.static_inputs or {})} - validated_inputs = _validate_inputs(definition, merged_inputs) - snapshot = definition.model_dump(mode="json", by_alias=True) - - run = AutomationRun( - automation_id=automation.id, - trigger_id=trigger.id, - status=RunStatus.PENDING, - definition_snapshot=snapshot, - inputs=validated_inputs, - step_results=[], - artifacts=[], - ) - session.add(run) - await session.commit() - await session.refresh(run) - - automation_run_execute.apply_async( - args=[run.id], - time_limit=definition.execution.timeout_seconds, - ) - return run - - -def _validate_inputs( - definition: AutomationDefinition, inputs: dict[str, Any] -) -> dict[str, Any]: - """Validate merged inputs against the optional declared schema. - - No declared schema → pass through (runtime inputs like ``fired_at`` / - ``last_fired_at`` and trigger ``static_inputs`` must still reach the - template context). Returning ``{}`` here strips them and makes Jinja - blow up on any ``{{ inputs.* }}`` reference. - """ - if definition.inputs is None or not definition.inputs.schema_: - return inputs - try: - jsonschema.validate(instance=inputs, schema=definition.inputs.schema_) - except jsonschema.ValidationError as exc: - raise DispatchError(f"inputs: {exc.message}") from exc - return inputs diff --git a/surfsense_backend/app/automations/triggers/builtin/event/selector.py b/surfsense_backend/app/automations/triggers/builtin/event/selector.py index 11d9ae7f5..9c000e716 100644 --- a/surfsense_backend/app/automations/triggers/builtin/event/selector.py +++ b/surfsense_backend/app/automations/triggers/builtin/event/selector.py @@ -13,7 +13,7 @@ from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from app.automations.dispatch import start_run +from app.automations.dispatch import launch_run from app.automations.persistence.enums.trigger_type import TriggerType from app.automations.persistence.models.trigger import AutomationTrigger from app.celery_app import celery_app @@ -58,7 +58,7 @@ async def _start_one( session: AsyncSession, *, trigger: AutomationTrigger, event: Event ) -> None: try: - run = await start_run( + run = await launch_run( session=session, trigger=trigger, runtime_inputs=event_runtime_inputs(event), diff --git a/surfsense_backend/app/automations/triggers/builtin/schedule/selector.py b/surfsense_backend/app/automations/triggers/builtin/schedule/selector.py index 9f52bc9f0..be592fe12 100644 --- a/surfsense_backend/app/automations/triggers/builtin/schedule/selector.py +++ b/surfsense_backend/app/automations/triggers/builtin/schedule/selector.py @@ -18,7 +18,7 @@ from datetime import UTC, datetime from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from app.automations.dispatch import start_run +from app.automations.dispatch import launch_run from app.automations.persistence.enums.trigger_type import TriggerType from app.automations.persistence.models.trigger import AutomationTrigger from app.celery_app import celery_app @@ -159,7 +159,7 @@ async def _start_one( return try: - run = await start_run( + run = await launch_run( session=session, trigger=trigger, runtime_inputs=schedule_runtime_inputs( diff --git a/surfsense_backend/tests/unit/automations/actions/agent_task/__init__.py b/surfsense_backend/tests/unit/automations/actions/builtin/__init__.py similarity index 100% rename from surfsense_backend/tests/unit/automations/actions/agent_task/__init__.py rename to surfsense_backend/tests/unit/automations/actions/builtin/__init__.py diff --git a/surfsense_backend/tests/unit/automations/actions/builtin/agent_task/__init__.py b/surfsense_backend/tests/unit/automations/actions/builtin/agent_task/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/tests/unit/automations/actions/agent_task/test_auto_decide.py b/surfsense_backend/tests/unit/automations/actions/builtin/agent_task/test_auto_decide.py similarity index 97% rename from surfsense_backend/tests/unit/automations/actions/agent_task/test_auto_decide.py rename to surfsense_backend/tests/unit/automations/actions/builtin/agent_task/test_auto_decide.py index 439f32e41..d8f45eadf 100644 --- a/surfsense_backend/tests/unit/automations/actions/agent_task/test_auto_decide.py +++ b/surfsense_backend/tests/unit/automations/actions/builtin/agent_task/test_auto_decide.py @@ -13,7 +13,7 @@ from typing import Any import pytest -from app.automations.actions.agent_task.auto_decide import build_auto_decisions +from app.automations.actions.builtin.agent_task.auto_decide import build_auto_decisions pytestmark = pytest.mark.unit diff --git a/surfsense_backend/tests/unit/automations/actions/agent_task/test_finalize.py b/surfsense_backend/tests/unit/automations/actions/builtin/agent_task/test_finalize.py similarity index 96% rename from surfsense_backend/tests/unit/automations/actions/agent_task/test_finalize.py rename to surfsense_backend/tests/unit/automations/actions/builtin/agent_task/test_finalize.py index aa6c74549..9e2143438 100644 --- a/surfsense_backend/tests/unit/automations/actions/agent_task/test_finalize.py +++ b/surfsense_backend/tests/unit/automations/actions/builtin/agent_task/test_finalize.py @@ -10,7 +10,9 @@ from __future__ import annotations import pytest from langchain_core.messages import AIMessage, HumanMessage, ToolMessage -from app.automations.actions.agent_task.finalize import extract_final_assistant_message +from app.automations.actions.builtin.agent_task.finalize import ( + extract_final_assistant_message, +) pytestmark = pytest.mark.unit diff --git a/surfsense_backend/tests/unit/automations/dispatch/test_validate_inputs.py b/surfsense_backend/tests/unit/automations/dispatch/test_inputs.py similarity index 74% rename from surfsense_backend/tests/unit/automations/dispatch/test_validate_inputs.py rename to surfsense_backend/tests/unit/automations/dispatch/test_inputs.py index ec99e51c2..2744982a0 100644 --- a/surfsense_backend/tests/unit/automations/dispatch/test_validate_inputs.py +++ b/surfsense_backend/tests/unit/automations/dispatch/test_inputs.py @@ -1,10 +1,8 @@ -"""Lock the input-validation contract used by ``dispatch_run``. +"""Lock the input-validation contract enforced before a run is enqueued. -``_validate_inputs`` is module-internal by convention (underscore), but it -encodes a real behavior contract the rest of the system depends on, and the -public alternative (``dispatch_run``) requires a real DB session. Tests -target the pure function directly; the contract — not the symbol — is what's -locked. +``validate_inputs`` is the pure schema check that ``enqueue_run`` runs against +merged inputs. ``enqueue_run`` itself needs a real DB session, so tests target +this pure function directly; the contract — not the symbol — is what's locked. """ from __future__ import annotations @@ -12,7 +10,7 @@ from __future__ import annotations import pytest from app.automations.dispatch.errors import DispatchError -from app.automations.dispatch.run import _validate_inputs +from app.automations.dispatch.inputs import validate_inputs from app.automations.schemas.definition.envelope import AutomationDefinition from app.automations.schemas.definition.inputs import Inputs from app.automations.schemas.definition.plan_step import PlanStep @@ -42,7 +40,7 @@ def test_validate_inputs_passes_through_when_no_schema_is_declared() -> None: "static_key": "value", } - assert _validate_inputs(definition, runtime_inputs) == runtime_inputs + assert validate_inputs(definition, runtime_inputs) == runtime_inputs def test_validate_inputs_returns_inputs_when_they_match_declared_schema() -> None: @@ -58,14 +56,13 @@ def test_validate_inputs_returns_inputs_when_they_match_declared_schema() -> Non inputs = {"topic": "weekly report"} - assert _validate_inputs(definition, inputs) == inputs + assert validate_inputs(definition, inputs) == inputs def test_validate_inputs_raises_dispatch_error_when_inputs_violate_schema() -> None: """Inputs that don't match the declared schema must surface as - ``DispatchError`` (not the raw ``jsonschema.ValidationError``), so the - schedule tick and any other caller can handle one dispatch-domain - exception type uniformly.""" + ``DispatchError`` (not the raw ``jsonschema.ValidationError``), so every + caller can handle one dispatch-domain exception type uniformly.""" schema = { "type": "object", "properties": {"topic": {"type": "string"}}, @@ -74,4 +71,4 @@ def test_validate_inputs_raises_dispatch_error_when_inputs_violate_schema() -> N definition = _minimal_definition(inputs=Inputs(schema=schema)) with pytest.raises(DispatchError): - _validate_inputs(definition, {"topic": 42}) # type violates string + validate_inputs(definition, {"topic": 42}) # type violates string