feat: add workflow graph constraints fixtures

This commit is contained in:
Abhishek Kumar 2026-05-08 16:02:51 +05:30
parent 6d93be3ef6
commit 5a358d4d29
38 changed files with 447 additions and 49 deletions

View file

@ -43,5 +43,16 @@ On Windows (PowerShell):
## Environment Configuration
- `api/.env` - Backend environment variables
- `api/.env` - Backend environment variables. Source this when running diagnostic scripts or one-off services against the dev DB (e.g. `python -m api.services.admin_utils.local_exec`).
- `api/.env.test` - Test-only environment variables. Source this when running pytest so tests hit the test DB and never the dev/prod credentials in `api/.env`.
- `ui/.env` - Frontend environment variables
Typical invocation:
```bash
# Tests
source venv/bin/activate && set -a && source api/.env.test && set +a && python -m pytest api/tests/...
# Diagnostics / scripts
source venv/bin/activate && set -a && source api/.env && set +a && python -m api.services.admin_utils.local_exec
```

View file

@ -34,7 +34,7 @@ from api.mcp_server.ts_bridge import TsBridgeError, parse_code
from api.services.posthog_client import capture_event
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.layout import reconcile_positions
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
def _error_result(code: str, message: str, **extra: Any) -> dict[str, Any]:

View file

@ -36,7 +36,7 @@ from api.mcp_server.tracing import traced_tool
from api.mcp_server.ts_bridge import TsBridgeError, parse_code
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.layout import reconcile_positions
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
async def _previous_workflow_json(workflow: Any) -> dict[str, Any] | None:

View file

@ -369,7 +369,7 @@ async def create_campaign(
)
if workflow:
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
workflow_def = workflow.released_definition.workflow_json
if workflow_def:

View file

@ -32,7 +32,7 @@ from api.services.storage import storage_fs
from api.services.workflow.dto import ReactFlowDTO, sanitize_workflow_definition
from api.services.workflow.duplicate import duplicate_workflow
from api.services.workflow.errors import ItemKind, WorkflowError
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
def extract_trigger_paths(workflow_definition: dict) -> List[str]:

View file

@ -26,7 +26,7 @@ from api.services.pipecat.service_factory import (
)
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
class LoopTalkPipelineBuilder:

View file

@ -48,7 +48,7 @@ from api.services.pipecat.ws_sender_registry import get_ws_sender
from api.services.telephony import registry as telephony_registry
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer

View file

@ -3,6 +3,7 @@ transcript after completion."""
from api.services.workflow.node_specs._base import (
DisplayOptions,
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
@ -193,4 +194,10 @@ SPEC = NodeSpec(
},
),
],
# QA runs post-call against the saved transcript (run_integrations
# scans by type), never as a graph step. Reject any edge into or out
# of a QA node.
graph_constraints=GraphConstraints(
min_incoming=0, max_incoming=0, min_outgoing=0, max_outgoing=0
),
)

View file

@ -240,9 +240,11 @@ SPEC = NodeSpec(
},
),
],
# `min_outgoing` is intentionally unset: a startCall is allowed to
# sit on the canvas without an outgoing edge (e.g. a workflow with
# just a greeting). Only constraint: nothing flows INTO the start.
graph_constraints=GraphConstraints(
min_incoming=0,
max_incoming=0,
min_outgoing=1,
),
)

View file

@ -2,6 +2,7 @@
after the workflow completes."""
from api.services.workflow.node_specs._base import (
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
@ -132,4 +133,10 @@ SPEC = NodeSpec(
},
),
],
# Webhooks fire post-call (run_integrations scans nodes by type),
# never as a graph step. Reject any edge into or out of a webhook so
# the editor can't wire one into the conversation flow.
graph_constraints=GraphConstraints(
min_incoming=0, max_incoming=0, min_outgoing=0, max_outgoing=0
),
)

View file

@ -18,7 +18,7 @@ from pipecat.utils.enums import EndTaskReason
from api.db import db_client
from api.services.pipecat.audio_playback import play_audio
from api.services.workflow.disposition_mapper import apply_disposition_mapping
from api.services.workflow.workflow import Node, WorkflowGraph
from api.services.workflow.workflow_graph import Node, WorkflowGraph
if TYPE_CHECKING:
from pipecat.frames.frames import Frame

View file

@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Callable, Optional
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager
from api.services.workflow.workflow import Node, WorkflowGraph
from api.services.workflow.workflow_graph import Node, WorkflowGraph
from api.services.workflow.pipecat_engine_custom_tools import get_function_schema
from api.services.workflow.tools.knowledge_base import get_knowledge_base_tool

View file

@ -4,6 +4,7 @@ from typing import Dict, List, Set
from api.services.workflow.dto import EdgeDataDTO, NodeType, ReactFlowDTO
from api.services.workflow.errors import ItemKind, WorkflowError
from api.services.workflow.node_specs import REGISTRY
# Regex for matching {{ variable }} template placeholders.
# Captures: group(1) = variable path, group(2) = filter name, group(3) = filter value.
@ -259,41 +260,67 @@ class WorkflowGraph:
return errors
def _assert_connection_counts(self):
"""Enforce per-type incoming/outgoing edge constraints.
Driven by `NodeSpec.graph_constraints` so a single source of truth
in the spec dictates what's legal. Types without a `graph_constraints`
block are unconstrained (e.g. agentNode on the outgoing side).
"""
errors: list[WorkflowError] = []
out_deg = Counter()
in_deg = Counter()
for n in self.nodes.values(): # init counters
for n in self.nodes.values():
out_deg[n.id] = in_deg[n.id] = 0
for src, n in self.nodes.items(): # compute degrees
for src, n in self.nodes.items():
for m in n.out.values():
out_deg[src] += 1
in_deg[m.id] += 1
for n in self.nodes.values():
spec = REGISTRY.get(n.node_type.value)
if spec is None or spec.graph_constraints is None:
continue
gc = spec.graph_constraints
in_d, out_d = in_deg[n.id], out_deg[n.id]
label = spec.display_name
match n.node_type:
case NodeType.endNode:
if in_d < 1 or out_d != 0:
errors.append(
WorkflowError(
kind=ItemKind.node,
id=n.id,
field=None,
message=f"EndNode must have at least 1 incoming edge",
)
)
case NodeType.agentNode:
if in_d < 1:
errors.append(
WorkflowError(
kind=ItemKind.node,
id=n.id,
field=None,
message=f"Worker must have at least 1 incoming edge",
)
)
if gc.max_incoming is not None and in_d > gc.max_incoming:
msg = (
f"{label} cannot have incoming edges"
if gc.max_incoming == 0
else f"{label} can have at most {gc.max_incoming} incoming edge(s)"
)
errors.append(
WorkflowError(kind=ItemKind.node, id=n.id, field=None, message=msg)
)
if gc.min_incoming is not None and in_d < gc.min_incoming:
errors.append(
WorkflowError(
kind=ItemKind.node,
id=n.id,
field=None,
message=f"{label} must have at least {gc.min_incoming} incoming edge(s)",
)
)
if gc.max_outgoing is not None and out_d > gc.max_outgoing:
msg = (
f"{label} cannot have outgoing edges"
if gc.max_outgoing == 0
else f"{label} can have at most {gc.max_outgoing} outgoing edge(s)"
)
errors.append(
WorkflowError(kind=ItemKind.node, id=n.id, field=None, message=msg)
)
if gc.min_outgoing is not None and out_d < gc.min_outgoing:
errors.append(
WorkflowError(
kind=ItemKind.node,
id=n.id,
field=None,
message=f"{label} must have at least {gc.min_outgoing} outgoing edge(s)",
)
)
return errors

View file

@ -27,7 +27,7 @@ from api.services.workflow.dto import (
StartCallRFNode,
VariableType,
)
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
START_CALL_SYSTEM_PROMPT = "Start Call System Prompt"
AGENT_SYSTEM_PROMPT = "Agent Node System Prompt"

View file

@ -0,0 +1,24 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller."}
},
{
"id": "a",
"type": "agentNode",
"position": {"x": 200, "y": 0},
"data": {"name": "Agent", "prompt": "Continue the conversation."}
}
],
"edges": [
{
"id": "a-s",
"source": "a",
"target": "s",
"data": {"label": "loop back to start", "condition": "always"}
}
]
}

View file

@ -0,0 +1,30 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": -200, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller."}
},
{
"id": "a",
"type": "agentNode",
"position": {"x": 0, "y": 0},
"data": {"name": "Agent", "prompt": "Talk to the caller."}
},
{
"id": "w",
"type": "webhook",
"position": {"x": 200, "y": 0},
"data": {"name": "Notify CRM"}
}
],
"edges": [
{
"id": "a-w",
"source": "a",
"target": "w",
"data": {"label": "post to webhook", "condition": "always"}
}
]
}

View file

@ -0,0 +1,30 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": -200, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller."}
},
{
"id": "g",
"type": "globalNode",
"position": {"x": 0, "y": 0},
"data": {"name": "Global", "prompt": "Shared system prompt."}
},
{
"id": "a",
"type": "agentNode",
"position": {"x": 200, "y": 0},
"data": {"name": "Agent", "prompt": "Talk to the caller."}
}
],
"edges": [
{
"id": "g-a",
"source": "g",
"target": "a",
"data": {"label": "global to agent", "condition": "always"}
}
]
}

View file

@ -0,0 +1,30 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": -200, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller."}
},
{
"id": "w",
"type": "webhook",
"position": {"x": 0, "y": 0},
"data": {"name": "Notify CRM"}
},
{
"id": "e",
"type": "endCall",
"position": {"x": 200, "y": 0},
"data": {"name": "End", "prompt": "Say goodbye."}
}
],
"edges": [
{
"id": "w-e",
"source": "w",
"target": "e",
"data": {"label": "after webhook", "condition": "always"}
}
]
}

View file

@ -0,0 +1,24 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": -200, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller."}
},
{
"id": "e",
"type": "endCall",
"position": {"x": 0, "y": 0},
"data": {"name": "End", "prompt": "Say goodbye."}
}
],
"edges": [
{
"id": "ghost-e",
"source": "ghost-node",
"target": "e",
"data": {"label": "from nowhere", "condition": "always"}
}
]
}

View file

@ -0,0 +1,18 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller."}
}
],
"edges": [
{
"id": "s-ghost",
"source": "s",
"target": "ghost-node",
"data": {"label": "to nowhere", "condition": "always"}
}
]
}

View file

@ -0,0 +1,36 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller."}
},
{
"id": "a",
"type": "agentNode",
"position": {"x": 200, "y": 0},
"data": {"name": "Agent", "prompt": "Continue the conversation."}
},
{
"id": "e",
"type": "endCall",
"position": {"x": 400, "y": 0},
"data": {"name": "End", "prompt": "Say goodbye."}
}
],
"edges": [
{
"id": "s-a",
"source": "s",
"target": "a",
"data": {"label": "start to agent", "condition": "always"}
},
{
"id": "a-e",
"source": "a",
"target": "e",
"data": {"label": "agent to end", "condition": "always"}
}
]
}

View file

@ -0,0 +1,24 @@
{
"nodes": [
{
"id": "a",
"type": "agentNode",
"position": {"x": 0, "y": 0},
"data": {"name": "Agent", "prompt": "Talk to the caller."}
},
{
"id": "e",
"type": "endCall",
"position": {"x": 200, "y": 0},
"data": {"name": "End", "prompt": "Say goodbye."}
}
],
"edges": [
{
"id": "a-e",
"source": "a",
"target": "e",
"data": {"label": "agent to end", "condition": "always"}
}
]
}

View file

@ -0,0 +1,11 @@
{
"nodes": [
{
"id": "s",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {"name": "Start", "prompt": "Greet the caller and hang up."}
}
],
"edges": []
}

View file

@ -4,14 +4,14 @@ import pytest
from api.services.workflow.dto import ReactFlowDTO, sanitize_workflow_definition
_FIXTURES_DIR = Path(__file__).parent / "definitions"
_FIXTURES_DIR = Path(__file__).parent / "dto_fixtures"
@pytest.mark.asyncio
async def test_dto():
# Path resolved relative to this test file so the test works regardless
# of the cwd pytest is invoked from.
with open(_FIXTURES_DIR / "rf-1.json", "r") as f:
with open(_FIXTURES_DIR / "sample_branching_workflow.json", "r") as f:
dto = ReactFlowDTO.model_validate_json(f.read())
assert dto is not None

View file

@ -31,7 +31,7 @@ from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from api.tests.conftest import (
AGENT_SYSTEM_PROMPT,
END_CALL_SYSTEM_PROMPT,

View file

@ -57,7 +57,7 @@ from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
)
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from api.tests.conftest import END_CALL_SYSTEM_PROMPT, START_CALL_SYSTEM_PROMPT
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -49,7 +49,7 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -22,7 +22,7 @@ from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from api.tests.conftest import END_CALL_SYSTEM_PROMPT
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -35,7 +35,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
)
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -32,7 +32,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
)
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -43,7 +43,7 @@ from api.services.workflow.dto import (
)
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService
# ─── Constants ──────────────────────────────────────────────────

View file

@ -54,7 +54,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
)
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -44,7 +44,7 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -48,7 +48,7 @@ from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
)
from api.services.workflow.workflow import WorkflowGraph
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService

View file

@ -0,0 +1,117 @@
"""Regression tests for WorkflowGraph edge/graph constraints + the
admin audit script that mirrors them.
Each fixture in `dto_fixtures/` is either a clean workflow or a single
category of violation we found in production. We pin two layers:
1. WorkflowGraph semantic gate used by `/publish`, the SDK API, and
both MCP tools. Driven by `NodeSpec.graph_constraints`. If this
layer ever stops rejecting one of these fixtures, the production
write paths will quietly start accepting bad workflows again.
2. audit_definition (api.services.admin_utils.local_exec) read-only
sweep over persisted rows used to find legacy/imported breakage.
Pinned so refactors of the rule set don't silently change the
verdicts the migration relies on.
DTO-level shape validation is covered by `test_dto.py` and isn't
re-pinned here.
"""
import json
from pathlib import Path
import pytest
from api.services.admin_utils.local_exec import audit_definition
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.workflow_graph import WorkflowGraph
_FIXTURES_DIR = Path(__file__).parent / "dto_fixtures"
def _load(name: str) -> tuple[str, dict]:
raw = (_FIXTURES_DIR / f"{name}.json").read_text()
return raw, json.loads(raw)
# (fixture_name, expected_audit_reasons, expected_graph_messages)
#
# expected_graph_messages semantics:
# None — DTO rejects upstream, WorkflowGraph is never reached.
# [] — WorkflowGraph accepts (clean fixture).
# [...] — WorkflowGraph rejects; each substring must appear in the
# emitted WorkflowError messages.
_SCENARIOS = [
("clean", [], []),
# A workflow with just a startCall and no edges is valid — startCall
# has no `min_outgoing` constraint, so a "greet then hang up" flow
# passes both audit and WorkflowGraph.
("start_only", [], []),
(
"bad_edge_into_start",
["target_max_incoming_0:startCall"],
["Start Call cannot have incoming edges"],
),
(
"bad_edge_into_webhook",
["target_max_incoming_0:webhook"],
["Webhook cannot have incoming edges"],
),
(
"bad_edge_out_of_webhook",
["source_max_outgoing_0:webhook"],
["Webhook cannot have outgoing edges"],
),
(
"bad_edge_out_of_globalnode",
["source_max_outgoing_0:globalNode"],
["Global Node cannot have outgoing edges"],
),
("bad_edge_target_missing", ["target_id_missing"], None),
("bad_edge_source_missing", ["source_id_missing"], None),
(
"no_start_node",
["no_start_node"],
["Workflow must have exactly one start node"],
),
]
@pytest.mark.parametrize(
"name,expected_reasons",
[(name, reasons) for name, reasons, _ in _SCENARIOS],
)
def test_audit_catches_violations(name, expected_reasons):
_, definition = _load(name)
violations = audit_definition(definition["nodes"], definition["edges"])
reasons = sorted(v["reason"] for v in violations)
assert reasons == sorted(expected_reasons)
@pytest.mark.parametrize(
"name,expected_graph_messages",
[
(name, messages)
for name, _, messages in _SCENARIOS
if messages is not None # skip fixtures DTO rejects upstream
],
)
def test_workflow_graph_rejects_violations(name, expected_graph_messages):
"""If WorkflowGraph accepts a definition, every save path that goes
'live' will accept it so this layer is the canonical regression
point for the rules in `NodeSpec.graph_constraints`."""
raw, _ = _load(name)
dto = ReactFlowDTO.model_validate_json(raw)
if not expected_graph_messages:
WorkflowGraph(dto)
return
with pytest.raises(ValueError) as exc_info:
WorkflowGraph(dto)
actual_messages = [w["message"] for w in exc_info.value.args[0]]
for expected in expected_graph_messages:
assert any(expected in m for m in actual_messages), (
f"Expected substring {expected!r} not found in graph errors: {actual_messages}"
)

View file

@ -8,7 +8,7 @@ from zoneinfo import ZoneInfo
from loguru import logger
from api.services.workflow.workflow import TEMPLATE_VAR_PATTERN
from api.services.workflow.workflow_graph import TEMPLATE_VAR_PATTERN
_CURRENT_TIME_PREFIX = "current_time"
_CURRENT_WEEKDAY_PREFIX = "current_weekday"

View file

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: dograh-openapi-XXXXXX.json.k2orBArfVN
# timestamp: 2026-05-08T08:26:28+00:00
# filename: dograh-openapi-XXXXXX.json.orthnRzifa
# timestamp: 2026-05-08T10:32:18+00:00
from __future__ import annotations