diff --git a/AGENTS.md b/AGENTS.md index 5236f89..b74bf7a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -43,5 +43,16 @@ On Windows (PowerShell): ## Environment Configuration -- `api/.env` - Backend environment variables +- `api/.env` - Backend environment variables. Source this when running diagnostic scripts or one-off services against the dev DB (e.g. `python -m api.services.admin_utils.local_exec`). +- `api/.env.test` - Test-only environment variables. Source this when running pytest so tests hit the test DB and never the dev/prod credentials in `api/.env`. - `ui/.env` - Frontend environment variables + +Typical invocation: + +```bash +# Tests +source venv/bin/activate && set -a && source api/.env.test && set +a && python -m pytest api/tests/... + +# Diagnostics / scripts +source venv/bin/activate && set -a && source api/.env && set +a && python -m api.services.admin_utils.local_exec +``` diff --git a/api/mcp_server/tools/create_workflow.py b/api/mcp_server/tools/create_workflow.py index 9934b3e..38e4037 100644 --- a/api/mcp_server/tools/create_workflow.py +++ b/api/mcp_server/tools/create_workflow.py @@ -34,7 +34,7 @@ from api.mcp_server.ts_bridge import TsBridgeError, parse_code from api.services.posthog_client import capture_event from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.layout import reconcile_positions -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph def _error_result(code: str, message: str, **extra: Any) -> dict[str, Any]: diff --git a/api/mcp_server/tools/save_workflow.py b/api/mcp_server/tools/save_workflow.py index 13b60cf..41130d7 100644 --- a/api/mcp_server/tools/save_workflow.py +++ b/api/mcp_server/tools/save_workflow.py @@ -36,7 +36,7 @@ from api.mcp_server.tracing import traced_tool from api.mcp_server.ts_bridge import TsBridgeError, parse_code from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.layout import reconcile_positions -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph async def _previous_workflow_json(workflow: Any) -> dict[str, Any] | None: diff --git a/api/routes/campaign.py b/api/routes/campaign.py index b5ce233..407cef7 100644 --- a/api/routes/campaign.py +++ b/api/routes/campaign.py @@ -369,7 +369,7 @@ async def create_campaign( ) if workflow: from api.services.workflow.dto import ReactFlowDTO - from api.services.workflow.workflow import WorkflowGraph + from api.services.workflow.workflow_graph import WorkflowGraph workflow_def = workflow.released_definition.workflow_json if workflow_def: diff --git a/api/routes/workflow.py b/api/routes/workflow.py index eef41ce..4b2bb6d 100644 --- a/api/routes/workflow.py +++ b/api/routes/workflow.py @@ -32,7 +32,7 @@ from api.services.storage import storage_fs from api.services.workflow.dto import ReactFlowDTO, sanitize_workflow_definition from api.services.workflow.duplicate import duplicate_workflow from api.services.workflow.errors import ItemKind, WorkflowError -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph def extract_trigger_paths(workflow_definition: dict) -> List[str]: diff --git a/api/services/looptalk/core/pipeline_builder.py b/api/services/looptalk/core/pipeline_builder.py index 77e634f..49adc72 100644 --- a/api/services/looptalk/core/pipeline_builder.py +++ b/api/services/looptalk/core/pipeline_builder.py @@ -26,7 +26,7 @@ from api.services.pipecat.service_factory import ( ) from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph class LoopTalkPipelineBuilder: diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 74cd23f..de9cb18 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -48,7 +48,7 @@ from api.services.pipecat.ws_sender_registry import get_ws_sender from api.services.telephony import registry as telephony_registry from api.services.workflow.dto import ReactFlowDTO from api.services.workflow.pipecat_engine import PipecatEngine -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer diff --git a/api/services/workflow/node_specs/qa.py b/api/services/workflow/node_specs/qa.py index ebb95ab..f140297 100644 --- a/api/services/workflow/node_specs/qa.py +++ b/api/services/workflow/node_specs/qa.py @@ -3,6 +3,7 @@ transcript after completion.""" from api.services.workflow.node_specs._base import ( DisplayOptions, + GraphConstraints, NodeCategory, NodeExample, NodeSpec, @@ -193,4 +194,10 @@ SPEC = NodeSpec( }, ), ], + # QA runs post-call against the saved transcript (run_integrations + # scans by type), never as a graph step. Reject any edge into or out + # of a QA node. + graph_constraints=GraphConstraints( + min_incoming=0, max_incoming=0, min_outgoing=0, max_outgoing=0 + ), ) diff --git a/api/services/workflow/node_specs/start_call.py b/api/services/workflow/node_specs/start_call.py index b0f3075..07cd870 100644 --- a/api/services/workflow/node_specs/start_call.py +++ b/api/services/workflow/node_specs/start_call.py @@ -240,9 +240,11 @@ SPEC = NodeSpec( }, ), ], + # `min_outgoing` is intentionally unset: a startCall is allowed to + # sit on the canvas without an outgoing edge (e.g. a workflow with + # just a greeting). Only constraint: nothing flows INTO the start. graph_constraints=GraphConstraints( min_incoming=0, max_incoming=0, - min_outgoing=1, ), ) diff --git a/api/services/workflow/node_specs/webhook.py b/api/services/workflow/node_specs/webhook.py index ec71d12..2844878 100644 --- a/api/services/workflow/node_specs/webhook.py +++ b/api/services/workflow/node_specs/webhook.py @@ -2,6 +2,7 @@ after the workflow completes.""" from api.services.workflow.node_specs._base import ( + GraphConstraints, NodeCategory, NodeExample, NodeSpec, @@ -132,4 +133,10 @@ SPEC = NodeSpec( }, ), ], + # Webhooks fire post-call (run_integrations scans nodes by type), + # never as a graph step. Reject any edge into or out of a webhook so + # the editor can't wire one into the conversation flow. + graph_constraints=GraphConstraints( + min_incoming=0, max_incoming=0, min_outgoing=0, max_outgoing=0 + ), ) diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index b4f00cb..d73d9be 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -18,7 +18,7 @@ from pipecat.utils.enums import EndTaskReason from api.db import db_client from api.services.pipecat.audio_playback import play_audio from api.services.workflow.disposition_mapper import apply_disposition_mapping -from api.services.workflow.workflow import Node, WorkflowGraph +from api.services.workflow.workflow_graph import Node, WorkflowGraph if TYPE_CHECKING: from pipecat.frames.frames import Frame diff --git a/api/services/workflow/pipecat_engine_context_composer.py b/api/services/workflow/pipecat_engine_context_composer.py index ffdfd77..03c253a 100644 --- a/api/services/workflow/pipecat_engine_context_composer.py +++ b/api/services/workflow/pipecat_engine_context_composer.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Callable, Optional if TYPE_CHECKING: from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager - from api.services.workflow.workflow import Node, WorkflowGraph + from api.services.workflow.workflow_graph import Node, WorkflowGraph from api.services.workflow.pipecat_engine_custom_tools import get_function_schema from api.services.workflow.tools.knowledge_base import get_knowledge_base_tool diff --git a/api/services/workflow/workflow.py b/api/services/workflow/workflow_graph.py similarity index 81% rename from api/services/workflow/workflow.py rename to api/services/workflow/workflow_graph.py index d15e95a..0770bc3 100644 --- a/api/services/workflow/workflow.py +++ b/api/services/workflow/workflow_graph.py @@ -4,6 +4,7 @@ from typing import Dict, List, Set from api.services.workflow.dto import EdgeDataDTO, NodeType, ReactFlowDTO from api.services.workflow.errors import ItemKind, WorkflowError +from api.services.workflow.node_specs import REGISTRY # Regex for matching {{ variable }} template placeholders. # Captures: group(1) = variable path, group(2) = filter name, group(3) = filter value. @@ -259,41 +260,67 @@ class WorkflowGraph: return errors def _assert_connection_counts(self): + """Enforce per-type incoming/outgoing edge constraints. + + Driven by `NodeSpec.graph_constraints` so a single source of truth + in the spec dictates what's legal. Types without a `graph_constraints` + block are unconstrained (e.g. agentNode on the outgoing side). + """ errors: list[WorkflowError] = [] out_deg = Counter() in_deg = Counter() - for n in self.nodes.values(): # init counters + for n in self.nodes.values(): out_deg[n.id] = in_deg[n.id] = 0 - for src, n in self.nodes.items(): # compute degrees + for src, n in self.nodes.items(): for m in n.out.values(): out_deg[src] += 1 in_deg[m.id] += 1 for n in self.nodes.values(): + spec = REGISTRY.get(n.node_type.value) + if spec is None or spec.graph_constraints is None: + continue + gc = spec.graph_constraints in_d, out_d = in_deg[n.id], out_deg[n.id] + label = spec.display_name - match n.node_type: - case NodeType.endNode: - if in_d < 1 or out_d != 0: - errors.append( - WorkflowError( - kind=ItemKind.node, - id=n.id, - field=None, - message=f"EndNode must have at least 1 incoming edge", - ) - ) - case NodeType.agentNode: - if in_d < 1: - errors.append( - WorkflowError( - kind=ItemKind.node, - id=n.id, - field=None, - message=f"Worker must have at least 1 incoming edge", - ) - ) + if gc.max_incoming is not None and in_d > gc.max_incoming: + msg = ( + f"{label} cannot have incoming edges" + if gc.max_incoming == 0 + else f"{label} can have at most {gc.max_incoming} incoming edge(s)" + ) + errors.append( + WorkflowError(kind=ItemKind.node, id=n.id, field=None, message=msg) + ) + if gc.min_incoming is not None and in_d < gc.min_incoming: + errors.append( + WorkflowError( + kind=ItemKind.node, + id=n.id, + field=None, + message=f"{label} must have at least {gc.min_incoming} incoming edge(s)", + ) + ) + if gc.max_outgoing is not None and out_d > gc.max_outgoing: + msg = ( + f"{label} cannot have outgoing edges" + if gc.max_outgoing == 0 + else f"{label} can have at most {gc.max_outgoing} outgoing edge(s)" + ) + errors.append( + WorkflowError(kind=ItemKind.node, id=n.id, field=None, message=msg) + ) + if gc.min_outgoing is not None and out_d < gc.min_outgoing: + errors.append( + WorkflowError( + kind=ItemKind.node, + id=n.id, + field=None, + message=f"{label} must have at least {gc.min_outgoing} outgoing edge(s)", + ) + ) return errors diff --git a/api/tests/conftest.py b/api/tests/conftest.py index 6cf047d..2c03283 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -27,7 +27,7 @@ from api.services.workflow.dto import ( StartCallRFNode, VariableType, ) -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph START_CALL_SYSTEM_PROMPT = "Start Call System Prompt" AGENT_SYSTEM_PROMPT = "Agent Node System Prompt" diff --git a/api/tests/dto_fixtures/bad_edge_into_start.json b/api/tests/dto_fixtures/bad_edge_into_start.json new file mode 100644 index 0000000..19fa172 --- /dev/null +++ b/api/tests/dto_fixtures/bad_edge_into_start.json @@ -0,0 +1,24 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": 0, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller."} + }, + { + "id": "a", + "type": "agentNode", + "position": {"x": 200, "y": 0}, + "data": {"name": "Agent", "prompt": "Continue the conversation."} + } + ], + "edges": [ + { + "id": "a-s", + "source": "a", + "target": "s", + "data": {"label": "loop back to start", "condition": "always"} + } + ] +} diff --git a/api/tests/dto_fixtures/bad_edge_into_webhook.json b/api/tests/dto_fixtures/bad_edge_into_webhook.json new file mode 100644 index 0000000..97f8aff --- /dev/null +++ b/api/tests/dto_fixtures/bad_edge_into_webhook.json @@ -0,0 +1,30 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": -200, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller."} + }, + { + "id": "a", + "type": "agentNode", + "position": {"x": 0, "y": 0}, + "data": {"name": "Agent", "prompt": "Talk to the caller."} + }, + { + "id": "w", + "type": "webhook", + "position": {"x": 200, "y": 0}, + "data": {"name": "Notify CRM"} + } + ], + "edges": [ + { + "id": "a-w", + "source": "a", + "target": "w", + "data": {"label": "post to webhook", "condition": "always"} + } + ] +} diff --git a/api/tests/dto_fixtures/bad_edge_out_of_globalnode.json b/api/tests/dto_fixtures/bad_edge_out_of_globalnode.json new file mode 100644 index 0000000..0951dfc --- /dev/null +++ b/api/tests/dto_fixtures/bad_edge_out_of_globalnode.json @@ -0,0 +1,30 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": -200, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller."} + }, + { + "id": "g", + "type": "globalNode", + "position": {"x": 0, "y": 0}, + "data": {"name": "Global", "prompt": "Shared system prompt."} + }, + { + "id": "a", + "type": "agentNode", + "position": {"x": 200, "y": 0}, + "data": {"name": "Agent", "prompt": "Talk to the caller."} + } + ], + "edges": [ + { + "id": "g-a", + "source": "g", + "target": "a", + "data": {"label": "global to agent", "condition": "always"} + } + ] +} diff --git a/api/tests/dto_fixtures/bad_edge_out_of_webhook.json b/api/tests/dto_fixtures/bad_edge_out_of_webhook.json new file mode 100644 index 0000000..24f5918 --- /dev/null +++ b/api/tests/dto_fixtures/bad_edge_out_of_webhook.json @@ -0,0 +1,30 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": -200, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller."} + }, + { + "id": "w", + "type": "webhook", + "position": {"x": 0, "y": 0}, + "data": {"name": "Notify CRM"} + }, + { + "id": "e", + "type": "endCall", + "position": {"x": 200, "y": 0}, + "data": {"name": "End", "prompt": "Say goodbye."} + } + ], + "edges": [ + { + "id": "w-e", + "source": "w", + "target": "e", + "data": {"label": "after webhook", "condition": "always"} + } + ] +} diff --git a/api/tests/dto_fixtures/bad_edge_source_missing.json b/api/tests/dto_fixtures/bad_edge_source_missing.json new file mode 100644 index 0000000..c665132 --- /dev/null +++ b/api/tests/dto_fixtures/bad_edge_source_missing.json @@ -0,0 +1,24 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": -200, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller."} + }, + { + "id": "e", + "type": "endCall", + "position": {"x": 0, "y": 0}, + "data": {"name": "End", "prompt": "Say goodbye."} + } + ], + "edges": [ + { + "id": "ghost-e", + "source": "ghost-node", + "target": "e", + "data": {"label": "from nowhere", "condition": "always"} + } + ] +} diff --git a/api/tests/dto_fixtures/bad_edge_target_missing.json b/api/tests/dto_fixtures/bad_edge_target_missing.json new file mode 100644 index 0000000..b233dae --- /dev/null +++ b/api/tests/dto_fixtures/bad_edge_target_missing.json @@ -0,0 +1,18 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": 0, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller."} + } + ], + "edges": [ + { + "id": "s-ghost", + "source": "s", + "target": "ghost-node", + "data": {"label": "to nowhere", "condition": "always"} + } + ] +} diff --git a/api/tests/dto_fixtures/clean.json b/api/tests/dto_fixtures/clean.json new file mode 100644 index 0000000..c27c932 --- /dev/null +++ b/api/tests/dto_fixtures/clean.json @@ -0,0 +1,36 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": 0, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller."} + }, + { + "id": "a", + "type": "agentNode", + "position": {"x": 200, "y": 0}, + "data": {"name": "Agent", "prompt": "Continue the conversation."} + }, + { + "id": "e", + "type": "endCall", + "position": {"x": 400, "y": 0}, + "data": {"name": "End", "prompt": "Say goodbye."} + } + ], + "edges": [ + { + "id": "s-a", + "source": "s", + "target": "a", + "data": {"label": "start to agent", "condition": "always"} + }, + { + "id": "a-e", + "source": "a", + "target": "e", + "data": {"label": "agent to end", "condition": "always"} + } + ] +} diff --git a/api/tests/dto_fixtures/no_start_node.json b/api/tests/dto_fixtures/no_start_node.json new file mode 100644 index 0000000..fd630fd --- /dev/null +++ b/api/tests/dto_fixtures/no_start_node.json @@ -0,0 +1,24 @@ +{ + "nodes": [ + { + "id": "a", + "type": "agentNode", + "position": {"x": 0, "y": 0}, + "data": {"name": "Agent", "prompt": "Talk to the caller."} + }, + { + "id": "e", + "type": "endCall", + "position": {"x": 200, "y": 0}, + "data": {"name": "End", "prompt": "Say goodbye."} + } + ], + "edges": [ + { + "id": "a-e", + "source": "a", + "target": "e", + "data": {"label": "agent to end", "condition": "always"} + } + ] +} diff --git a/api/tests/definitions/rf-1.json b/api/tests/dto_fixtures/sample_branching_workflow.json similarity index 100% rename from api/tests/definitions/rf-1.json rename to api/tests/dto_fixtures/sample_branching_workflow.json diff --git a/api/tests/dto_fixtures/start_only.json b/api/tests/dto_fixtures/start_only.json new file mode 100644 index 0000000..911aa89 --- /dev/null +++ b/api/tests/dto_fixtures/start_only.json @@ -0,0 +1,11 @@ +{ + "nodes": [ + { + "id": "s", + "type": "startCall", + "position": {"x": 0, "y": 0}, + "data": {"name": "Start", "prompt": "Greet the caller and hang up."} + } + ], + "edges": [] +} diff --git a/api/tests/test_dto.py b/api/tests/test_dto.py index aa4aabd..512252b 100644 --- a/api/tests/test_dto.py +++ b/api/tests/test_dto.py @@ -4,14 +4,14 @@ import pytest from api.services.workflow.dto import ReactFlowDTO, sanitize_workflow_definition -_FIXTURES_DIR = Path(__file__).parent / "definitions" +_FIXTURES_DIR = Path(__file__).parent / "dto_fixtures" @pytest.mark.asyncio async def test_dto(): # Path resolved relative to this test file so the test works regardless # of the cwd pytest is invoked from. - with open(_FIXTURES_DIR / "rf-1.json", "r") as f: + with open(_FIXTURES_DIR / "sample_branching_workflow.json", "r") as f: dto = ReactFlowDTO.model_validate_json(f.read()) assert dto is not None diff --git a/api/tests/test_pipecat_engine_context_update.py b/api/tests/test_pipecat_engine_context_update.py index e22a575..9235b22 100644 --- a/api/tests/test_pipecat_engine_context_update.py +++ b/api/tests/test_pipecat_engine_context_update.py @@ -31,7 +31,7 @@ from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams from api.services.workflow.pipecat_engine import PipecatEngine -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import ( AGENT_SYSTEM_PROMPT, END_CALL_SYSTEM_PROMPT, diff --git a/api/tests/test_pipecat_engine_end_call.py b/api/tests/test_pipecat_engine_end_call.py index a0f8ac1..2ba4570 100644 --- a/api/tests/test_pipecat_engine_end_call.py +++ b/api/tests/test_pipecat_engine_end_call.py @@ -57,7 +57,7 @@ from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, ) -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import END_CALL_SYSTEM_PROMPT, START_CALL_SYSTEM_PROMPT from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py index a19843b..82a6f55 100644 --- a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py +++ b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py @@ -49,7 +49,7 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 from api.services.workflow.pipecat_engine import PipecatEngine -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_pipecat_engine_tool_calls.py b/api/tests/test_pipecat_engine_tool_calls.py index aef2df6..ec04b49 100644 --- a/api/tests/test_pipecat_engine_tool_calls.py +++ b/api/tests/test_pipecat_engine_tool_calls.py @@ -22,7 +22,7 @@ from pipecat.tests.mock_transport import MockTransport from pipecat.transports.base_transport import TransportParams from api.services.workflow.pipecat_engine import PipecatEngine -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from api.tests.conftest import END_CALL_SYSTEM_PROMPT from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_pipecat_engine_transition_mute.py b/api/tests/test_pipecat_engine_transition_mute.py index 3cc5220..9ce0271 100644 --- a/api/tests/test_pipecat_engine_transition_mute.py +++ b/api/tests/test_pipecat_engine_transition_mute.py @@ -35,7 +35,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, ) -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_pipecat_engine_variable_extraction.py b/api/tests/test_pipecat_engine_variable_extraction.py index 29581d7..823592c 100644 --- a/api/tests/test_pipecat_engine_variable_extraction.py +++ b/api/tests/test_pipecat_engine_variable_extraction.py @@ -32,7 +32,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, ) -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_text_and_audio_playback.py b/api/tests/test_text_and_audio_playback.py index a950c9b..39b77aa 100644 --- a/api/tests/test_text_and_audio_playback.py +++ b/api/tests/test_text_and_audio_playback.py @@ -43,7 +43,7 @@ from api.services.workflow.dto import ( ) from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService # ─── Constants ────────────────────────────────────────────────── diff --git a/api/tests/test_tts_endframe_with_audio_write_failure.py b/api/tests/test_tts_endframe_with_audio_write_failure.py index 56f9ac6..4914f9b 100644 --- a/api/tests/test_tts_endframe_with_audio_write_failure.py +++ b/api/tests/test_tts_endframe_with_audio_write_failure.py @@ -54,7 +54,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, ) -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_user_idle_handler.py b/api/tests/test_user_idle_handler.py index 47d8eee..77dfb2c 100644 --- a/api/tests/test_user_idle_handler.py +++ b/api/tests/test_user_idle_handler.py @@ -44,7 +44,7 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 from api.services.workflow.pipecat_engine import PipecatEngine -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_user_muting_during_bot_speech.py b/api/tests/test_user_muting_during_bot_speech.py index b055385..ac04299 100644 --- a/api/tests/test_user_muting_during_bot_speech.py +++ b/api/tests/test_user_muting_during_bot_speech.py @@ -48,7 +48,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine from api.services.workflow.pipecat_engine_variable_extractor import ( VariableExtractionManager, ) -from api.services.workflow.workflow import WorkflowGraph +from api.services.workflow.workflow_graph import WorkflowGraph from pipecat.tests import MockLLMService, MockTTSService diff --git a/api/tests/test_workflow_graph_constraints.py b/api/tests/test_workflow_graph_constraints.py new file mode 100644 index 0000000..a493f68 --- /dev/null +++ b/api/tests/test_workflow_graph_constraints.py @@ -0,0 +1,117 @@ +"""Regression tests for WorkflowGraph edge/graph constraints + the +admin audit script that mirrors them. + +Each fixture in `dto_fixtures/` is either a clean workflow or a single +category of violation we found in production. We pin two layers: + + 1. WorkflowGraph — semantic gate used by `/publish`, the SDK API, and + both MCP tools. Driven by `NodeSpec.graph_constraints`. If this + layer ever stops rejecting one of these fixtures, the production + write paths will quietly start accepting bad workflows again. + 2. audit_definition (api.services.admin_utils.local_exec) — read-only + sweep over persisted rows used to find legacy/imported breakage. + Pinned so refactors of the rule set don't silently change the + verdicts the migration relies on. + +DTO-level shape validation is covered by `test_dto.py` and isn't +re-pinned here. +""" + +import json +from pathlib import Path + +import pytest + +from api.services.admin_utils.local_exec import audit_definition +from api.services.workflow.dto import ReactFlowDTO +from api.services.workflow.workflow_graph import WorkflowGraph + +_FIXTURES_DIR = Path(__file__).parent / "dto_fixtures" + + +def _load(name: str) -> tuple[str, dict]: + raw = (_FIXTURES_DIR / f"{name}.json").read_text() + return raw, json.loads(raw) + + +# (fixture_name, expected_audit_reasons, expected_graph_messages) +# +# expected_graph_messages semantics: +# None — DTO rejects upstream, WorkflowGraph is never reached. +# [] — WorkflowGraph accepts (clean fixture). +# [...] — WorkflowGraph rejects; each substring must appear in the +# emitted WorkflowError messages. +_SCENARIOS = [ + ("clean", [], []), + # A workflow with just a startCall and no edges is valid — startCall + # has no `min_outgoing` constraint, so a "greet then hang up" flow + # passes both audit and WorkflowGraph. + ("start_only", [], []), + ( + "bad_edge_into_start", + ["target_max_incoming_0:startCall"], + ["Start Call cannot have incoming edges"], + ), + ( + "bad_edge_into_webhook", + ["target_max_incoming_0:webhook"], + ["Webhook cannot have incoming edges"], + ), + ( + "bad_edge_out_of_webhook", + ["source_max_outgoing_0:webhook"], + ["Webhook cannot have outgoing edges"], + ), + ( + "bad_edge_out_of_globalnode", + ["source_max_outgoing_0:globalNode"], + ["Global Node cannot have outgoing edges"], + ), + ("bad_edge_target_missing", ["target_id_missing"], None), + ("bad_edge_source_missing", ["source_id_missing"], None), + ( + "no_start_node", + ["no_start_node"], + ["Workflow must have exactly one start node"], + ), +] + + +@pytest.mark.parametrize( + "name,expected_reasons", + [(name, reasons) for name, reasons, _ in _SCENARIOS], +) +def test_audit_catches_violations(name, expected_reasons): + _, definition = _load(name) + violations = audit_definition(definition["nodes"], definition["edges"]) + reasons = sorted(v["reason"] for v in violations) + assert reasons == sorted(expected_reasons) + + +@pytest.mark.parametrize( + "name,expected_graph_messages", + [ + (name, messages) + for name, _, messages in _SCENARIOS + if messages is not None # skip fixtures DTO rejects upstream + ], +) +def test_workflow_graph_rejects_violations(name, expected_graph_messages): + """If WorkflowGraph accepts a definition, every save path that goes + 'live' will accept it — so this layer is the canonical regression + point for the rules in `NodeSpec.graph_constraints`.""" + raw, _ = _load(name) + dto = ReactFlowDTO.model_validate_json(raw) + + if not expected_graph_messages: + WorkflowGraph(dto) + return + + with pytest.raises(ValueError) as exc_info: + WorkflowGraph(dto) + + actual_messages = [w["message"] for w in exc_info.value.args[0]] + for expected in expected_graph_messages: + assert any(expected in m for m in actual_messages), ( + f"Expected substring {expected!r} not found in graph errors: {actual_messages}" + ) diff --git a/api/utils/template_renderer.py b/api/utils/template_renderer.py index d03c518..fe6bc87 100644 --- a/api/utils/template_renderer.py +++ b/api/utils/template_renderer.py @@ -8,7 +8,7 @@ from zoneinfo import ZoneInfo from loguru import logger -from api.services.workflow.workflow import TEMPLATE_VAR_PATTERN +from api.services.workflow.workflow_graph import TEMPLATE_VAR_PATTERN _CURRENT_TIME_PREFIX = "current_time" _CURRENT_WEEKDAY_PREFIX = "current_weekday" diff --git a/sdk/python/src/dograh_sdk/_generated_models.py b/sdk/python/src/dograh_sdk/_generated_models.py index 55c0381..d90cb4d 100644 --- a/sdk/python/src/dograh_sdk/_generated_models.py +++ b/sdk/python/src/dograh_sdk/_generated_models.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: -# filename: dograh-openapi-XXXXXX.json.k2orBArfVN -# timestamp: 2026-05-08T08:26:28+00:00 +# filename: dograh-openapi-XXXXXX.json.orthnRzifa +# timestamp: 2026-05-08T10:32:18+00:00 from __future__ import annotations