Merge remote-tracking branch 'origin/main' into feat/text-chat

This commit is contained in:
Abhishek Kumar 2026-05-21 07:47:07 +05:30
commit 129a6d700c
160 changed files with 9287 additions and 3935 deletions

View file

@ -7,7 +7,7 @@ script in `api/services/admin_utils/local_exec.py` is the production
consumer.
"""
from api.services.workflow.node_specs import REGISTRY
from api.services.workflow.node_specs import all_specs
def _build_type_rules() -> tuple[set[str], set[str]]:
@ -16,14 +16,14 @@ def _build_type_rules() -> tuple[set[str], set[str]]:
(max_incoming == 0)."""
src_forbidden: set[str] = set()
tgt_forbidden: set[str] = set()
for name, spec in REGISTRY.items():
for spec in all_specs():
gc = spec.graph_constraints
if gc is None:
continue
if gc.max_outgoing == 0:
src_forbidden.add(name)
src_forbidden.add(spec.name)
if gc.max_incoming == 0:
tgt_forbidden.add(name)
tgt_forbidden.add(spec.name)
return src_forbidden, tgt_forbidden

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,254 @@
"""Single unit that knows the MCP protocol + credentials.
Wraps the vendored Pipecat ``MCPClient`` for connection/session, builds
streamable-HTTP params from a Dograh credential, exposes namespaced
``FunctionSchema``s, and proxies tool calls. Connection failures degrade
(``available = False``) instead of raising the call must survive a
dead MCP server.
"""
from __future__ import annotations
import asyncio
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
from loguru import logger
from mcp.client.session_group import StreamableHttpParameters
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.services.mcp_service import MCPClient
from api.services.workflow.tools.mcp_tool import namespace_function_name
from api.utils.credential_auth import build_auth_header
if TYPE_CHECKING:
from api.db.models import ExternalCredentialModel
def build_streamable_http_params(
*,
url: str,
credential: Optional["ExternalCredentialModel"],
timeout_secs: int,
sse_read_timeout_secs: int,
) -> StreamableHttpParameters:
"""Build Pipecat/MCP streamable-HTTP params, injecting the auth header
from an ExternalCredentialModel (reuses the http_api credential path)."""
headers: Optional[Dict[str, str]] = None
if credential is not None:
auth = build_auth_header(credential)
headers = auth or None
return StreamableHttpParameters(
url=url,
headers=headers,
timeout=timedelta(seconds=timeout_secs),
sse_read_timeout=timedelta(seconds=sse_read_timeout_secs),
)
class McpToolSession:
"""One live MCP server connection for the duration of a call."""
def __init__(
self,
*,
tool_uuid: str,
tool_name: str,
url: str,
credential: Optional["ExternalCredentialModel"],
tools_filter: List[str],
timeout_secs: int,
sse_read_timeout_secs: int,
) -> None:
self._tool_uuid = tool_uuid
self._tool_name = tool_name
self._url = url
self._credential = credential
# An empty list is intentionally treated as "no filter (expose all
# tools)" — Pipecat's MCPClient applies a filter only when this is a
# non-empty list, so [] and None are equivalent ("all tools").
self._tools_filter = tools_filter or None
self._timeout_secs = timeout_secs
self._sse_read_timeout_secs = sse_read_timeout_secs
self._client: Optional[MCPClient] = None
self._session: Any = None # mcp.ClientSession (read once after start)
self._schemas: List[FunctionSchema] = []
# namespaced LLM name -> original MCP tool name
self._name_map: Dict[str, str] = {}
self.available: bool = False
async def start(self) -> None:
"""Connect, initialize, and cache the tool list. Never raises —
on any failure the session is marked unavailable."""
try:
params = build_streamable_http_params(
url=self._url,
credential=self._credential,
timeout_secs=self._timeout_secs,
sse_read_timeout_secs=self._sse_read_timeout_secs,
)
self._client = MCPClient(params, tools_filter=self._tools_filter)
await self._client.start()
# Single, isolated touch of Pipecat internals (vendored submodule).
self._session = self._client._active_session
tools_schema = await self._client.get_tools_schema()
fallback = self._tool_uuid[:8] if self._tool_uuid else "server"
for fs in tools_schema.standard_tools:
ns_name = namespace_function_name(
self._tool_name, fs.name, fallback=fallback
)
self._name_map[ns_name] = fs.name
self._schemas.append(
FunctionSchema(
name=ns_name,
description=fs.description,
properties=fs.properties,
required=fs.required,
)
)
self.available = True
logger.info(
f"MCP session ready for tool '{self._tool_name}' "
f"({self._tool_uuid}): {sorted(self._name_map)}"
)
except (KeyboardInterrupt, SystemExit):
raise
except asyncio.CancelledError as e:
# Empirically, a dead/unreachable MCP server does NOT surface as a
# plain Exception here. The real failure is httpx.ConnectError, but
# anyio's streamablehttp_client task group, while tearing down that
# ConnectError, re-surfaces it to our frame as an *internal*
# cancel-scope CancelledError carrying the signature message
# "Cancelled via cancel scope <id>". A genuine *external*
# cancellation (call teardown / shutdown) is a bare CancelledError
# (empty args) or one with an application-chosen message. Type, MRO,
# context chain, and asyncio task.cancelling() are all identical
# between the two, so the anyio scope-signature message is the only
# reliable discriminator. Re-raise genuine external cancellation to
# preserve structured concurrency; degrade only on the anyio
# connect-teardown artifact.
msg = "" if not e.args else str(e.args[0] or "")
if not msg.startswith("Cancelled via cancel scope"):
raise
await self._degrade(e)
except Exception as e: # noqa: BLE001 — see _degrade docstring
# Defensive: if a future Pipecat/httpx version surfaces the connect
# failure directly (e.g. httpx.ConnectError) instead of via the
# anyio cancel-scope artifact above, still degrade gracefully.
await self._degrade(e)
async def _degrade(self, e: BaseException) -> None:
"""Mark this session unavailable and tear down any dangling client so
start() leaves self._client either fully usable or None. The contract
requires graceful degradation on any *connect* failure (never raising
for a dead MCP server) while genuine external cancellation /
KeyboardInterrupt / SystemExit are re-raised by the caller."""
self.available = False
self._schemas = []
self._name_map = {}
# Self-contained cleanup: _client.start() may have succeeded before a
# later step (e.g. get_tools_schema()) failed, leaving an open client.
if self._client is not None:
try:
await self._client.close()
except Exception:
pass
finally:
self._client = None
self._session = None
logger.warning(
f"MCP session unavailable for tool '{self._tool_name}' "
f"({self._tool_uuid}) at {self._url}: {e!r}. "
f"Call proceeds without these tools."
)
@property
def call_timeout_secs(self) -> float:
"""Pipecat function-call timeout for this server's tools. Slightly
longer than the transport read timeout so a slow MCP call surfaces
as a structured tool error (handled in the handler) rather than a
hard pipeline timeout."""
return float(self._sse_read_timeout_secs) + 5.0
def function_schemas(
self, allowed_raw_names: Optional[Set[str]] = None
) -> List[FunctionSchema]:
"""Return cached FunctionSchemas, optionally filtered by raw MCP tool name.
``allowed_raw_names=None`` returns all schemas. An empty set returns none.
Raw names are the pre-namespace MCP tool names (e.g. ``echo``, not
``mcp__slug__echo``).
"""
if allowed_raw_names is None:
return list(self._schemas)
return [
s for s in self._schemas if self._name_map.get(s.name) in allowed_raw_names
]
def discovered_tools(self) -> List[Dict[str, str]]:
"""Raw MCP tool catalog for UI/cache: ``[{name, description}]``
using the *raw* server names (not the namespaced LLM names).
Empty if the session is unavailable."""
out: List[Dict[str, str]] = []
for s in self._schemas:
raw = self._name_map.get(s.name)
if raw is None:
continue
out.append({"name": raw, "description": s.description or ""})
return out
async def call(self, namespaced_name: str, arguments: Dict[str, Any]) -> str:
"""Invoke an MCP tool by its namespaced LLM name. Returns a string
(flattened text content). Raises if the session is unavailable so
the caller can map it to a structured error for the LLM."""
if not self.available or self._session is None:
raise RuntimeError(f"MCP session unavailable for {namespaced_name}")
original = self._name_map.get(namespaced_name)
if original is None:
raise RuntimeError(f"Unknown MCP function {namespaced_name}")
result = await self._session.call_tool(original, arguments=arguments)
text = ""
for content in getattr(result, "content", []) or []:
if getattr(content, "text", None):
text += content.text
return text or "Sorry, the MCP tool returned no content."
async def close(self) -> None:
if self._client is not None:
try:
await self._client.close()
except Exception as e:
logger.warning(f"Error closing MCP session {self._tool_uuid}: {e}")
finally:
self._client = None
self._session = None
async def discover_mcp_tools(
*,
url: str,
credential: Optional["ExternalCredentialModel"],
timeout_secs: int,
sse_read_timeout_secs: int,
) -> List[Dict[str, str]]:
"""Open an ephemeral MCP session, list its tools, close it. Returns
``[{name, description}]`` (raw names). Never raises on any connect
failure returns ``[]``."""
session = McpToolSession(
tool_uuid="discover",
tool_name="discover",
url=url,
credential=credential,
tools_filter=[],
timeout_secs=timeout_secs,
sse_read_timeout_secs=sse_read_timeout_secs,
)
await session.start()
try:
if not session.available:
return []
return session.discovered_tools()
finally:
await session.close()

View file

@ -0,0 +1,19 @@
from __future__ import annotations
from pydantic import BaseModel
from api.services.workflow.node_specs._base import PropertyType
from api.services.workflow.node_specs.model_spec import spec_field
class BaseNodeData(BaseModel):
name: str = spec_field(
...,
min_length=1,
ui_type=PropertyType.string,
display_name="Name",
description="Short identifier shown in the canvas and call logs.",
required=True,
)
is_start: bool = spec_field(default=False, spec_exclude=True)
is_end: bool = spec_field(default=False, spec_exclude=True)

View file

@ -1,10 +1,8 @@
"""Node specification registry.
Adding a new node type:
1. Create a new module under this package, define a `SPEC: NodeSpec`.
2. Add it to the imports + REGISTRY below.
3. The Pydantic discriminated-union variant in dto.py must use the same
`name` value as `SPEC.name`.
Core node specs are generated from the workflow DTO models. Third-party
integration node specs live under `api.services.integrations/<name>/` and
register through the integration registry so they don't need edits here.
"""
from __future__ import annotations
@ -21,8 +19,10 @@ from api.services.workflow.node_specs._base import (
PropertyType,
evaluate_display_options,
)
from api.services.workflow.node_specs.model_spec import build_spec
REGISTRY: dict[str, NodeSpec] = {}
_CORE_SPECS_LOADED = False
def register(spec: NodeSpec) -> NodeSpec:
@ -38,12 +38,23 @@ def register(spec: NodeSpec) -> NodeSpec:
def get_spec(name: str) -> NodeSpec | None:
return REGISTRY.get(name)
_ensure_core_registered()
if name in REGISTRY:
return REGISTRY[name]
from api.services.integrations import get_node_spec
return get_node_spec(name)
def all_specs() -> list[NodeSpec]:
"""All registered specs, sorted by name for stable output."""
return [REGISTRY[name] for name in sorted(REGISTRY)]
_ensure_core_registered()
from api.services.integrations import all_node_specs
specs = {spec.name: spec for spec in REGISTRY.values()}
specs.update({spec.name: spec for spec in all_node_specs()})
return [specs[name] for name in sorted(specs)]
__all__ = [
@ -64,19 +75,15 @@ __all__ = [
]
# Side-effect imports — each module's `register(SPEC)` call populates REGISTRY.
# Keep at module bottom so the registry helpers are defined first.
from api.services.workflow.node_specs import ( # noqa: E402, F401
agent,
end_call,
global_node,
qa,
start_call,
trigger,
webhook,
)
def _ensure_core_registered() -> None:
global _CORE_SPECS_LOADED
if _CORE_SPECS_LOADED:
return
# Wire up registrations from the SPEC constants in each module.
for _module in (start_call, agent, end_call, global_node, trigger, webhook, qa):
register(_module.SPEC)
del _module
from api.services.workflow.dto import _CORE_NODE_DATA_CLASSES
for model_cls in _CORE_NODE_DATA_CLASSES.values():
if model_cls.__node_spec_metadata__.name in REGISTRY:
continue
register(build_spec(model_cls))
_CORE_SPECS_LOADED = True

View file

@ -1,9 +1,9 @@
"""Spec schema for node definitions.
A `NodeSpec` is the single source of truth for a node type. It drives:
- Pydantic validation (the per-type DTOs in dto.py mirror these property types)
- The generic UI renderer (frontend reads specs via /api/v1/node-types)
- The LLM SDK (constructors and JSON-Schema derived from these specs)
`NodeSpec` is the serialized contract exposed to the frontend, MCP tools, and
SDKs. Core workflow node specs are generated from the DTO models plus
model-attached metadata; integration packages may generate them the same way or
register a prebuilt spec object.
Every property's `description` is LLM-readable copy — treat it as production
documentation, not internal notes. Spec lint enforces non-empty descriptions
@ -122,6 +122,16 @@ class PropertyOption(BaseModel):
model_config = ConfigDict(extra="forbid")
def to_mcp_dict(self) -> dict[str, Any]:
"""Lean projection for `get_node_type`: the `value` an LLM writes in
code, plus a `description` when one carries real meaning. The UI
`label` is dropped it's the option's display string, never used
when authoring."""
out: dict[str, Any] = {"value": self.value}
if self.description:
out["description"] = self.description
return out
class PropertySpec(BaseModel):
"""Single field on a node.
@ -175,6 +185,43 @@ class PropertySpec(BaseModel):
model_config = ConfigDict(extra="forbid")
def to_mcp_dict(self) -> dict[str, Any]:
"""Lean projection of this property for the `get_node_type` MCP tool.
Keeps only what an LLM needs to author a valid value: name, type,
description, llm_hint, requiredness, default, enum options, nested
row properties, and validation bounds. UI-rendering concerns
(`display_name`, `placeholder`, `display_options`, `editor`,
`extra`) and null/empty fields are omitted they're noise in the
model's context and never appear in authored SDK code.
"""
out: dict[str, Any] = {
"name": self.name,
"type": self.type.value,
"description": self.description,
}
if self.llm_hint:
out["llm_hint"] = self.llm_hint
if self.required:
out["required"] = True
if self.default is not None:
out["default"] = self.default
if self.options:
out["options"] = [opt.to_mcp_dict() for opt in self.options]
if self.properties:
out["properties"] = [prop.to_mcp_dict() for prop in self.properties]
for constraint in (
"min_value",
"max_value",
"min_length",
"max_length",
"pattern",
):
value = getattr(self, constraint)
if value is not None:
out[constraint] = value
return out
PropertySpec.model_rebuild()
@ -222,3 +269,33 @@ class NodeSpec(BaseModel):
graph_constraints: Optional[GraphConstraints] = None
model_config = ConfigDict(extra="forbid")
def to_mcp_dict(self) -> dict[str, Any]:
"""Lean projection of this spec for the `get_node_type` MCP tool.
Drops node-level UI metadata (`display_name`, `category`, `icon`,
`version`) and the per-property rendering concerns trimmed by
`PropertySpec.to_mcp_dict`, leaving just the authoring-relevant
schema the LLM consumes when composing a workflow. The full spec is
still served verbatim to the frontend renderer (REST `node-types`
route) and the SDK codegen / TS validator (`ts_bridge`), which need
the dropped fields.
"""
out: dict[str, Any] = {
"name": self.name,
"description": self.description,
}
if self.llm_hint:
out["llm_hint"] = self.llm_hint
out["properties"] = [prop.to_mcp_dict() for prop in self.properties]
if self.examples:
out["examples"] = [
ex.model_dump(mode="json", exclude_none=True) for ex in self.examples
]
if self.graph_constraints:
constraints = self.graph_constraints.model_dump(
mode="json", exclude_none=True
)
if constraints:
out["graph_constraints"] = constraints
return out

View file

@ -1,168 +0,0 @@
"""Spec for the Agent node — the workhorse mid-call node where the LLM
executes a focused conversational step with optional tools and documents."""
from api.services.workflow.node_specs._base import (
DisplayOptions,
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertyOption,
PropertySpec,
PropertyType,
)
SPEC = NodeSpec(
name="agentNode",
display_name="Agent Node",
description="Conversational step — the LLM runs one focused exchange.",
llm_hint=(
"Mid-call step executed by the LLM. Most workflows are a chain of "
"agent nodes connected by edges that describe transition conditions. "
"Each agent node can invoke tools and reference documents."
),
category=NodeCategory.call_node,
icon="Headset",
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Name",
description=(
"Short identifier for this step (e.g., 'Qualify Budget'). "
"Appears in call logs and edge transition tools."
),
required=True,
min_length=1,
default="Agent",
),
PropertySpec(
name="prompt",
type=PropertyType.mention_textarea,
display_name="Prompt",
description=(
"Agent system prompt for this step. Supports "
"{{template_variables}} from extraction or pre-call fetch."
),
required=True,
min_length=1,
placeholder="Ask the caller about their budget and timeline.",
),
PropertySpec(
name="allow_interrupt",
type=PropertyType.boolean,
display_name="Allow Interruption",
description=(
"When true, the user can interrupt the agent mid-utterance. "
"Set false for non-interruptible disclosures."
),
default=True,
),
PropertySpec(
name="add_global_prompt",
type=PropertyType.boolean,
display_name="Add Global Prompt",
description=(
"When true and a Global node exists, prepends the global "
"prompt to this node's prompt at runtime."
),
default=True,
),
PropertySpec(
name="extraction_enabled",
type=PropertyType.boolean,
display_name="Enable Variable Extraction",
description=(
"When true, runs an LLM extraction pass on transition out of "
"this node to capture variables from the conversation."
),
default=False,
),
PropertySpec(
name="extraction_prompt",
type=PropertyType.string,
display_name="Extraction Prompt",
description="Overall instructions guiding variable extraction.",
display_options=DisplayOptions(show={"extraction_enabled": [True]}),
editor="textarea",
),
PropertySpec(
name="extraction_variables",
type=PropertyType.fixed_collection,
display_name="Variables to Extract",
description=(
"Each entry declares one variable to capture from the "
"conversation, with its name, type, and per-variable hint."
),
display_options=DisplayOptions(show={"extraction_enabled": [True]}),
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Variable Name",
description="snake_case identifier used downstream.",
required=True,
),
PropertySpec(
name="type",
type=PropertyType.options,
display_name="Type",
description="Data type of the extracted value.",
required=True,
default="string",
options=[
PropertyOption(value="string", label="String"),
PropertyOption(value="number", label="Number"),
PropertyOption(value="boolean", label="Boolean"),
],
),
PropertySpec(
name="prompt",
type=PropertyType.string,
display_name="Extraction Hint",
description="Per-variable hint describing what to look for.",
editor="textarea",
),
],
),
PropertySpec(
name="tool_uuids",
type=PropertyType.tool_refs,
display_name="Tools",
description="Tools the agent can invoke during this step.",
llm_hint="List of tool UUIDs from `list_tools`.",
),
PropertySpec(
name="document_uuids",
type=PropertyType.document_refs,
display_name="Knowledge Base Documents",
description="Documents the agent can reference during this step.",
llm_hint="List of document UUIDs from `list_documents`.",
),
],
examples=[
NodeExample(
name="qualify_lead",
data={
"name": "Qualify Budget",
"prompt": "Ask about budget and timeline. Capture both before transitioning.",
"allow_interrupt": True,
"extraction_enabled": True,
"extraction_prompt": "Extract budget amount and rough timeline.",
"extraction_variables": [
{
"name": "budget_usd",
"type": "number",
"prompt": "Stated budget in USD",
},
{
"name": "timeline",
"type": "string",
"prompt": "When they want to start",
},
],
},
),
],
graph_constraints=GraphConstraints(min_incoming=1),
)

View file

@ -0,0 +1,44 @@
DEFAULT_QA_SYSTEM_PROMPT = """You are a QA analyst evaluating a specific segment of a voice AI conversation.
## Node Purpose
{{node_summary}}
## Previous Conversation Context (For start of conversation, previous conversation summary can be empty.)
{{previous_conversation_summary}}
## Tags to evaluate
Examine the conversation carefully and identify which of the following tags apply:
- UNCLEAR_CONVERSATION - The conversation is not coherent or clear, messages don't connect logically
- ASSISTANT_IN_LOOP - The assistant asks the same question multiple times or gets stuck repeating itself
- ASSISTANT_REPLY_IMPROPER - The assistant did not reply properly to the user's question/query or seems confused by what the user said
- USER_FRUSTRATED - The user seems angry, frustrated, or is complaining about something in the call
- USER_NOT_UNDERSTANDING - The user explicitly says they don't understand or repeatedly asks for clarification
- HEARING_ISSUES - Either party can't hear the other ("hello?", "are you there?", "can you hear me?")
- DEAD_AIR - Unusually long silences in the conversation (use the timestamps to judge)
- USER_REQUESTING_FEATURE - The user asks for something the assistant can't fulfill
- ASSISTANT_LACKS_EMPATHY - The assistant ignores the user's personal situation or emotional state and continues pitching or pushing the agenda.
- USER_DETECTS_AI - The user suspects or identifies that they are talking to an AI/robot/bot rather than a real human.
## Call metrics (pre-computed)
Use these alongside the transcript for your analysis:
{{metrics}}
## Output format
Return ONLY a valid JSON object (no markdown):
{
"tags": [
{
"tag": "TAG_NAME",
"reason": "Short reason with evidence from the transcript"
}
],
"overall_sentiment": "positive|neutral|negative",
"call_quality_score": <1-10>,
"summary": "1-2 sentence summary of this segment"
}
If no tags apply, return an empty tags list. Always provide sentiment, score, and summary."""

View file

@ -1,141 +0,0 @@
"""Spec for the End Call node — terminal node that wraps up a conversation
and optionally extracts variables before hangup."""
from api.services.workflow.node_specs._base import (
DisplayOptions,
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertyOption,
PropertySpec,
PropertyType,
)
SPEC = NodeSpec(
name="endCall",
display_name="End Call",
description="Closes the conversation and hangs up.",
llm_hint=(
"Terminal node that politely closes the conversation. Variable "
"extraction can run before hangup. A workflow can have multiple "
"endCall nodes reached via different edge conditions."
),
category=NodeCategory.call_node,
icon="OctagonX",
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Name",
description=(
"Short identifier shown in call logs. Should describe the "
"ending context (e.g., 'Successful close', 'Polite decline')."
),
required=True,
min_length=1,
default="End Call",
),
PropertySpec(
name="prompt",
type=PropertyType.mention_textarea,
display_name="Prompt",
description=(
"Agent system prompt for the closing exchange. Supports "
"{{template_variables}} from extraction or pre-call fetch."
),
required=True,
min_length=1,
placeholder="Thank the caller and confirm next steps before ending the call.",
),
PropertySpec(
name="add_global_prompt",
type=PropertyType.boolean,
display_name="Add Global Prompt",
description=(
"When true and a Global node exists, prepends the global "
"prompt to this node's prompt at runtime."
),
default=False,
),
PropertySpec(
name="extraction_enabled",
type=PropertyType.boolean,
display_name="Enable Variable Extraction",
description=(
"When true, runs an LLM extraction pass before hangup to "
"capture variables from the conversation."
),
default=False,
),
PropertySpec(
name="extraction_prompt",
type=PropertyType.string,
display_name="Extraction Prompt",
description=(
"Overall instructions guiding how variables should be "
"extracted from the conversation."
),
display_options=DisplayOptions(show={"extraction_enabled": [True]}),
editor="textarea",
),
PropertySpec(
name="extraction_variables",
type=PropertyType.fixed_collection,
display_name="Variables to Extract",
description=(
"Each entry declares one variable to capture from the "
"conversation, with its name, data type, and a per-variable "
"extraction hint."
),
display_options=DisplayOptions(show={"extraction_enabled": [True]}),
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Variable Name",
description="snake_case identifier used downstream.",
required=True,
),
PropertySpec(
name="type",
type=PropertyType.options,
display_name="Type",
description="The data type of the extracted value.",
required=True,
default="string",
options=[
PropertyOption(value="string", label="String"),
PropertyOption(value="number", label="Number"),
PropertyOption(value="boolean", label="Boolean"),
],
),
PropertySpec(
name="prompt",
type=PropertyType.string,
display_name="Extraction Hint",
description=(
"Per-variable hint describing what to look for in "
"the conversation."
),
editor="textarea",
),
],
),
],
examples=[
NodeExample(
name="successful_close",
data={
"name": "Successful Close",
"prompt": "Confirm the appointment time, thank the caller, and end the call.",
"add_global_prompt": False,
},
),
],
graph_constraints=GraphConstraints(
min_incoming=1,
min_outgoing=0,
max_outgoing=0,
),
)

View file

@ -1,77 +0,0 @@
"""Spec for the Global node — system-level instructions appended to every
agent node that opts in via `add_global_prompt`."""
from api.services.workflow.node_specs._base import (
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertySpec,
PropertyType,
)
SPEC = NodeSpec(
name="globalNode",
display_name="Global Node",
description="Persona/tone appended to every agent node's prompt.",
llm_hint=(
"System-level prompt appended to every prompted node whose "
"`add_global_prompt` is true. Use it for persona, tone, and shared "
"rules that apply across the entire conversation. At most one "
"global node per workflow."
),
category=NodeCategory.global_node,
icon="Globe",
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Name",
description=(
"Short identifier shown in the canvas and call logs. Has no "
"runtime effect."
),
required=True,
min_length=1,
default="Global Node",
),
PropertySpec(
name="prompt",
type=PropertyType.mention_textarea,
display_name="Global Prompt",
description=(
"Text appended to every prompted node's system prompt when "
"that node has `add_global_prompt=true`. Supports "
"{{template_variables}}."
),
required=True,
min_length=1,
placeholder="You are a friendly assistant calling on behalf of {{company_name}}.",
default=(
"You are a helpful assistant whose mode of interaction with "
"the user is voice. So don't use any special characters which "
"can not be pronounced. Use short sentences and simple language."
),
),
],
examples=[
NodeExample(
name="basic_persona",
description="Establishes a consistent persona across the call.",
data={
"name": "Persona",
"prompt": (
"You are Sarah, a polite and warm representative from "
"Acme Corp. Always thank the caller for their time and "
"speak in short conversational sentences."
),
},
),
],
graph_constraints=GraphConstraints(
min_incoming=0,
max_incoming=0,
min_outgoing=0,
max_outgoing=0,
),
)

View file

@ -0,0 +1,404 @@
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from enum import Enum
from types import NoneType
from typing import Any, Callable, Literal, get_args, get_origin
from pydantic import BaseModel, Field
from pydantic.fields import FieldInfo, PydanticUndefined
from api.services.workflow.node_specs._base import (
DisplayOptions,
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertyOption,
PropertySpec,
PropertyType,
)
_SPEC_FIELD_META_KEY = "__dograh_spec_field__"
_UNSET = object()
@dataclass(frozen=True)
class NodeSpecMetadata:
name: str
display_name: str
description: str
category: NodeCategory
icon: str
llm_hint: str | None = None
version: str = "1.0.0"
examples: tuple[NodeExample, ...] = ()
graph_constraints: GraphConstraints | None = None
property_order: tuple[str, ...] = ()
field_overrides: dict[str, dict[str, Any]] = dataclass_field(default_factory=dict)
def spec_field(
*field_args: Any,
ui_type: PropertyType | str | None = None,
display_name: str | None = None,
llm_hint: str | None = None,
required: bool | None = None,
spec_default: Any = _UNSET,
placeholder: str | None = None,
display_options: DisplayOptions | None = None,
options: list[PropertyOption] | None = None,
editor: str | None = None,
extra: dict[str, Any] | None = None,
spec_exclude: bool = False,
min_value: float | None = None,
max_value: float | None = None,
min_length: int | None = None,
max_length: int | None = None,
pattern: str | None = None,
**field_kwargs: Any,
):
json_schema_extra = dict(field_kwargs.pop("json_schema_extra", {}) or {})
json_schema_extra[_SPEC_FIELD_META_KEY] = {
"ui_type": ui_type.value if isinstance(ui_type, PropertyType) else ui_type,
"display_name": display_name,
"llm_hint": llm_hint,
"required": required,
"placeholder": placeholder,
"display_options": display_options,
"options": options,
"editor": editor,
"extra": extra or {},
"spec_exclude": spec_exclude,
"min_value": min_value,
"max_value": max_value,
"min_length": min_length,
"max_length": max_length,
"pattern": pattern,
}
if spec_default is not _UNSET:
json_schema_extra[_SPEC_FIELD_META_KEY]["spec_default"] = spec_default
return Field(*field_args, json_schema_extra=json_schema_extra, **field_kwargs)
def node_spec(
*,
name: str,
display_name: str,
description: str,
category: NodeCategory,
icon: str,
llm_hint: str | None = None,
version: str = "1.0.0",
examples: list[NodeExample] | tuple[NodeExample, ...] = (),
graph_constraints: GraphConstraints | None = None,
property_order: list[str] | tuple[str, ...] = (),
field_overrides: dict[str, dict[str, Any]] | None = None,
) -> Callable[[type[BaseModel]], type[BaseModel]]:
metadata = NodeSpecMetadata(
name=name,
display_name=display_name,
description=description,
category=category,
icon=icon,
llm_hint=llm_hint,
version=version,
examples=tuple(examples),
graph_constraints=graph_constraints,
property_order=tuple(property_order),
field_overrides=field_overrides or {},
)
def decorator(model_cls: type[BaseModel]) -> type[BaseModel]:
setattr(model_cls, "__node_spec_metadata__", metadata)
return model_cls
return decorator
def build_spec(model_cls: type[BaseModel]) -> NodeSpec:
metadata: NodeSpecMetadata | None = getattr(
model_cls, "__node_spec_metadata__", None
)
if metadata is None:
raise ValueError(f"{model_cls.__name__} is missing __node_spec_metadata__")
properties: list[PropertySpec] = []
for name, field in model_cls.model_fields.items():
prop = _build_property_spec(model_cls, name, field)
if prop is not None:
properties.append(prop)
properties = _sort_properties(metadata.name, properties, metadata.property_order)
return NodeSpec(
name=metadata.name,
display_name=metadata.display_name,
description=metadata.description,
llm_hint=metadata.llm_hint,
category=metadata.category,
icon=metadata.icon,
version=metadata.version,
properties=properties,
examples=list(metadata.examples),
graph_constraints=metadata.graph_constraints,
)
def _sort_properties(
spec_name: str,
properties: list[PropertySpec],
property_order: tuple[str, ...],
) -> list[PropertySpec]:
if not property_order:
return properties
property_names = {prop.name for prop in properties}
missing = [name for name in property_order if name not in property_names]
if missing:
raise ValueError(
f"{spec_name}: property_order references unknown properties: {missing}"
)
order_map = {name: idx for idx, name in enumerate(property_order)}
ordered = sorted(
enumerate(properties),
key=lambda item: (order_map.get(item[1].name, len(order_map)), item[0]),
)
return [prop for _, prop in ordered]
def _build_property_spec(
owner_cls: type[BaseModel],
field_name: str,
field: FieldInfo,
) -> PropertySpec | None:
meta = _merged_field_meta(owner_cls, field_name, field)
if meta.get("spec_exclude"):
return None
prop_type = _resolve_property_type(field.annotation, meta)
nested_properties = _resolve_nested_properties(field.annotation, prop_type)
options = _resolve_options(field.annotation, meta, prop_type)
min_value, max_value, min_length, max_length, pattern = _resolve_constraints(
field, meta
)
description = meta.get("description") or field.description
if not description:
raise ValueError(f"{owner_cls.__name__}.{field_name} is missing a description")
return PropertySpec(
name=field_name,
type=prop_type,
display_name=meta.get("display_name") or _humanize_identifier(field_name),
description=description,
llm_hint=meta.get("llm_hint"),
default=_resolve_default(field, meta),
required=_resolve_required(field, meta),
placeholder=meta.get("placeholder"),
display_options=meta.get("display_options"),
options=options,
properties=nested_properties,
min_value=min_value,
max_value=max_value,
min_length=min_length,
max_length=max_length,
pattern=pattern,
editor=meta.get("editor"),
extra=meta.get("extra") or {},
)
def _merged_field_meta(
owner_cls: type[BaseModel],
field_name: str,
field: FieldInfo,
) -> dict[str, Any]:
field_meta = {}
if isinstance(field.json_schema_extra, dict):
field_meta = dict(field.json_schema_extra.get(_SPEC_FIELD_META_KEY, {}) or {})
metadata: NodeSpecMetadata | None = getattr(
owner_cls, "__node_spec_metadata__", None
)
override = (
dict(metadata.field_overrides.get(field_name, {}) or {})
if metadata is not None
else {}
)
merged = dict(field_meta)
merged.update(override)
return merged
def _resolve_property_type(annotation: Any, meta: dict[str, Any]) -> PropertyType:
ui_type = meta.get("ui_type")
if ui_type:
return PropertyType(ui_type)
inner = _strip_optional(annotation)
origin = get_origin(inner)
args = get_args(inner)
if origin is list:
item_type = _strip_optional(args[0]) if args else Any
if isinstance(item_type, type) and issubclass(item_type, BaseModel):
return PropertyType.fixed_collection
raise ValueError(
"List-valued fields must declare an explicit ui_type unless they wrap a "
f"BaseModel row type (field annotation: {annotation!r})."
)
if _is_enum(inner) or _is_literal(inner):
return PropertyType.options
if inner in (str,):
return PropertyType.string
if inner in (int, float):
return PropertyType.number
if inner is bool:
return PropertyType.boolean
if inner in (dict, Any) or origin is dict:
return PropertyType.json
raise ValueError(f"Unable to derive PropertyType for annotation {annotation!r}")
def _resolve_nested_properties(
annotation: Any,
prop_type: PropertyType,
) -> list[PropertySpec] | None:
if prop_type != PropertyType.fixed_collection:
return None
inner = _strip_optional(annotation)
args = get_args(inner)
if not args:
raise ValueError(
f"fixed_collection field annotation is missing row type: {annotation!r}"
)
row_type = _strip_optional(args[0])
if not isinstance(row_type, type) or not issubclass(row_type, BaseModel):
raise ValueError(
f"fixed_collection rows must be BaseModel subclasses: {annotation!r}"
)
properties: list[PropertySpec] = []
for field_name, field in row_type.model_fields.items():
prop = _build_property_spec(row_type, field_name, field)
if prop is not None:
properties.append(prop)
return properties
def _resolve_options(
annotation: Any,
meta: dict[str, Any],
prop_type: PropertyType,
) -> list[PropertyOption] | None:
if prop_type not in (PropertyType.options, PropertyType.multi_options):
return meta.get("options")
if meta.get("options"):
return meta["options"]
inner = _strip_optional(annotation)
if prop_type == PropertyType.multi_options:
inner = _strip_optional(get_args(inner)[0])
if _is_enum(inner):
return [
PropertyOption(
value=member.value, label=_humanize_option_label(member.value)
)
for member in inner
]
if _is_literal(inner):
return [
PropertyOption(value=value, label=_humanize_option_label(value))
for value in get_args(inner)
if value is not None
]
return None
def _resolve_constraints(
field: FieldInfo,
meta: dict[str, Any],
) -> tuple[float | None, float | None, int | None, int | None, str | None]:
min_value = meta.get("min_value")
max_value = meta.get("max_value")
min_length = meta.get("min_length")
max_length = meta.get("max_length")
pattern = meta.get("pattern")
for item in field.metadata:
if min_value is None:
if hasattr(item, "ge") and item.ge is not None:
min_value = item.ge
elif hasattr(item, "gt") and item.gt is not None:
min_value = item.gt
if max_value is None:
if hasattr(item, "le") and item.le is not None:
max_value = item.le
elif hasattr(item, "lt") and item.lt is not None:
max_value = item.lt
if (
min_length is None
and hasattr(item, "min_length")
and item.min_length is not None
):
min_length = item.min_length
if (
max_length is None
and hasattr(item, "max_length")
and item.max_length is not None
):
max_length = item.max_length
if pattern is None and hasattr(item, "pattern") and item.pattern is not None:
pattern = item.pattern
return min_value, max_value, min_length, max_length, pattern
def _resolve_default(field: FieldInfo, meta: dict[str, Any]) -> Any:
if "spec_default" in meta:
return meta["spec_default"]
if field.default is not PydanticUndefined:
return field.default
return None
def _resolve_required(field: FieldInfo, meta: dict[str, Any]) -> bool:
if meta.get("required") is not None:
return bool(meta["required"])
return bool(field.is_required())
def _strip_optional(annotation: Any) -> Any:
origin = get_origin(annotation)
if origin is None:
return annotation
args = [arg for arg in get_args(annotation) if arg is not NoneType]
if len(args) == 1 and len(args) != len(get_args(annotation)):
return args[0]
return annotation
def _is_enum(annotation: Any) -> bool:
return isinstance(annotation, type) and issubclass(annotation, Enum)
def _is_literal(annotation: Any) -> bool:
return get_origin(annotation) is Literal
def _humanize_identifier(name: str) -> str:
return name.replace("_", " ").strip().title()
def _humanize_option_label(value: Any) -> str:
if isinstance(value, str):
return value.replace("_", " ").replace("-", " ").strip().title()
return str(value)

View file

@ -1,203 +0,0 @@
"""Spec for the QA Analysis node — runs an LLM quality review on the call
transcript after completion."""
from api.services.workflow.node_specs._base import (
DisplayOptions,
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertyOption,
PropertySpec,
PropertyType,
)
DEFAULT_QA_SYSTEM_PROMPT = """You are a QA analyst evaluating a specific segment of a voice AI conversation.
## Node Purpose
{{node_summary}}
## Previous Conversation Context (For start of conversation, previous conversation summary can be empty.)
{{previous_conversation_summary}}
## Tags to evaluate
Examine the conversation carefully and identify which of the following tags apply:
- UNCLEAR_CONVERSATION - The conversation is not coherent or clear, messages don't connect logically
- ASSISTANT_IN_LOOP - The assistant asks the same question multiple times or gets stuck repeating itself
- ASSISTANT_REPLY_IMPROPER - The assistant did not reply properly to the user's question/query or seems confused by what the user said
- USER_FRUSTRATED - The user seems angry, frustrated, or is complaining about something in the call
- USER_NOT_UNDERSTANDING - The user explicitly says they don't understand or repeatedly asks for clarification
- HEARING_ISSUES - Either party can't hear the other ("hello?", "are you there?", "can you hear me?")
- DEAD_AIR - Unusually long silences in the conversation (use the timestamps to judge)
- USER_REQUESTING_FEATURE - The user asks for something the assistant can't fulfill
- ASSISTANT_LACKS_EMPATHY - The assistant ignores the user's personal situation or emotional state and continues pitching or pushing the agenda.
- USER_DETECTS_AI - The user suspects or identifies that they are talking to an AI/robot/bot rather than a real human.
## Call metrics (pre-computed)
Use these alongside the transcript for your analysis:
{{metrics}}
## Output format
Return ONLY a valid JSON object (no markdown):
{
"tags": [
{
"tag": "TAG_NAME",
"reason": "Short reason with evidence from the transcript"
}
],
"overall_sentiment": "positive|neutral|negative",
"call_quality_score": <1-10>,
"summary": "1-2 sentence summary of this segment"
}
If no tags apply, return an empty tags list. Always provide sentiment, score, and summary."""
SPEC = NodeSpec(
name="qa",
display_name="QA Analysis",
description="Run LLM quality analysis on the call transcript.",
llm_hint=(
"Runs an LLM quality review on the call transcript after completion. "
"Per-node analysis splits the conversation by node and evaluates each "
"segment against the configured system prompt. Sampling, minimum "
"duration, and voicemail filters are supported."
),
category=NodeCategory.integration,
icon="ClipboardCheck",
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Name",
description="Short identifier for this QA configuration.",
required=True,
min_length=1,
default="QA Analysis",
),
PropertySpec(
name="qa_enabled",
type=PropertyType.boolean,
display_name="Enabled",
description="When false, the QA run is skipped.",
default=True,
),
PropertySpec(
name="qa_system_prompt",
type=PropertyType.string,
display_name="System Prompt",
description=(
"Instructions to the QA reviewer LLM. Supports placeholders: "
"`{node_summary}`, `{previous_conversation_summary}`, "
"`{transcript}`, `{metrics}`."
),
editor="textarea",
default=DEFAULT_QA_SYSTEM_PROMPT,
),
PropertySpec(
name="qa_min_call_duration",
type=PropertyType.number,
display_name="Minimum Call Duration (seconds)",
description="Calls shorter than this are skipped.",
default=15,
min_value=0,
),
PropertySpec(
name="qa_voicemail_calls",
type=PropertyType.boolean,
display_name="Include Voicemail Calls",
description="When false, calls flagged as voicemail are skipped.",
default=False,
),
PropertySpec(
name="qa_sample_rate",
type=PropertyType.number,
display_name="Sample Rate (%)",
description=(
"Percent of eligible calls QA'd. 100 means every call; lower "
"values use random sampling."
),
default=100,
min_value=1,
max_value=100,
),
# ---- LLM configuration ----
PropertySpec(
name="qa_use_workflow_llm",
type=PropertyType.boolean,
display_name="Use Workflow's LLM",
description=(
"When true, the QA pass uses the same LLM the workflow runs "
"with. Set false to specify a separate provider/model."
),
default=True,
),
PropertySpec(
name="qa_provider",
type=PropertyType.options,
display_name="QA LLM Provider",
description="LLM provider used for the QA pass.",
display_options=DisplayOptions(show={"qa_use_workflow_llm": [False]}),
options=[
PropertyOption(value="openai", label="OpenAI"),
PropertyOption(value="azure", label="Azure OpenAI"),
PropertyOption(value="openrouter", label="OpenRouter"),
PropertyOption(value="anthropic", label="Anthropic"),
],
),
PropertySpec(
name="qa_model",
type=PropertyType.string,
display_name="QA Model",
description=(
"Model identifier (e.g., 'gpt-4o', 'claude-sonnet-4-6'). "
"Provider-specific."
),
display_options=DisplayOptions(show={"qa_use_workflow_llm": [False]}),
default="default",
),
PropertySpec(
name="qa_api_key",
type=PropertyType.string,
display_name="API Key",
description="API key for the chosen provider.",
display_options=DisplayOptions(show={"qa_use_workflow_llm": [False]}),
),
PropertySpec(
name="qa_endpoint",
type=PropertyType.url,
display_name="Azure Endpoint",
description="Required for the Azure provider.",
display_options=DisplayOptions(
show={"qa_use_workflow_llm": [False], "qa_provider": ["azure"]}
),
),
],
examples=[
NodeExample(
name="basic_qa",
data={
"name": "Compliance Check",
"qa_enabled": True,
"qa_system_prompt": (
"You are a compliance reviewer. Review the transcript and "
"produce a JSON object with `tags`, `summary`, "
"`call_quality_score`, and `overall_sentiment`."
),
"qa_min_call_duration": 30,
"qa_sample_rate": 100,
},
),
],
# QA runs post-call against the saved transcript (run_integrations
# scans by type), never as a graph step. Reject any edge into or out
# of a QA node.
graph_constraints=GraphConstraints(
min_incoming=0, max_incoming=0, min_outgoing=0, max_outgoing=0
),
)

View file

@ -1,250 +0,0 @@
"""Spec for the Start Call node — the single entry point of every workflow.
Carries greeting, pre-call data fetch, and the same prompt/extraction/tools
fields as agent nodes."""
from api.services.workflow.node_specs._base import (
DisplayOptions,
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertyOption,
PropertySpec,
PropertyType,
)
SPEC = NodeSpec(
name="startCall",
display_name="Start Call",
description="Entry point of the workflow — plays a greeting and opens the conversation.",
llm_hint=(
"The entry point of every workflow (exactly one required). Plays an "
"optional greeting, can fetch context from an external API before "
"the call begins, and executes the first conversational turn."
),
category=NodeCategory.call_node,
icon="Play",
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Name",
description="Short identifier shown in the canvas and call logs.",
required=True,
min_length=1,
default="Start Call",
),
# ---- Greeting (variant via greeting_type) ----
PropertySpec(
name="greeting_type",
type=PropertyType.options,
display_name="Greeting Type",
description=(
"Whether the optional greeting is spoken via TTS from text "
"or played from a pre-recorded audio file."
),
default="text",
options=[
PropertyOption(value="text", label="Text (TTS)"),
PropertyOption(value="audio", label="Pre-recorded Audio"),
],
),
PropertySpec(
name="greeting",
type=PropertyType.string,
display_name="Greeting Text",
description=(
"Text spoken via TTS at the start of the call. Supports "
"{{template_variables}}. Leave empty to skip the greeting."
),
display_options=DisplayOptions(show={"greeting_type": ["text"]}),
editor="textarea",
placeholder="Hi {{first_name}}, this is Sarah from Acme.",
),
PropertySpec(
name="greeting_recording_id",
type=PropertyType.recording_ref,
display_name="Greeting Recording",
description="Pre-recorded audio file played at the start of the call.",
llm_hint=(
"Value is the `recording_id` string. Use the `list_recordings` "
"MCP tool to discover available recordings."
),
display_options=DisplayOptions(show={"greeting_type": ["audio"]}),
),
PropertySpec(
name="prompt",
type=PropertyType.mention_textarea,
display_name="Prompt",
description=(
"Agent system prompt for the opening turn. Supports "
"{{template_variables}} from pre-call fetch and the initial context."
),
required=True,
min_length=1,
placeholder="Greet the caller warmly and ask how you can help today.",
),
# ---- Behavior toggles ----
PropertySpec(
name="allow_interrupt",
type=PropertyType.boolean,
display_name="Allow Interruption",
description=("When true, the user can interrupt the agent mid-utterance."),
default=False,
),
PropertySpec(
name="add_global_prompt",
type=PropertyType.boolean,
display_name="Add Global Prompt",
description=(
"When true and a Global node exists, prepends the global "
"prompt to this node's prompt at runtime."
),
default=True,
),
PropertySpec(
name="delayed_start",
type=PropertyType.boolean,
display_name="Delayed Start",
description=(
"When true, the agent waits before speaking after pickup. "
"Useful for outbound calls where the called party needs a "
"moment to settle."
),
default=False,
),
PropertySpec(
name="delayed_start_duration",
type=PropertyType.number,
display_name="Delay Duration (seconds)",
description="Seconds to wait before the agent speaks. 0.110.",
default=2.0,
min_value=0.1,
max_value=10.0,
display_options=DisplayOptions(show={"delayed_start": [True]}),
),
# ---- Variable extraction ----
PropertySpec(
name="extraction_enabled",
type=PropertyType.boolean,
display_name="Enable Variable Extraction",
description=(
"When true, runs an LLM extraction pass on transition out of "
"this node to capture variables from the opening turn."
),
default=False,
),
PropertySpec(
name="extraction_prompt",
type=PropertyType.string,
display_name="Extraction Prompt",
description="Overall instructions guiding variable extraction.",
display_options=DisplayOptions(show={"extraction_enabled": [True]}),
editor="textarea",
),
PropertySpec(
name="extraction_variables",
type=PropertyType.fixed_collection,
display_name="Variables to Extract",
description=(
"Each entry declares one variable to capture, with its name, "
"data type, and per-variable extraction hint."
),
display_options=DisplayOptions(show={"extraction_enabled": [True]}),
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Variable Name",
description="snake_case identifier used downstream.",
required=True,
),
PropertySpec(
name="type",
type=PropertyType.options,
display_name="Type",
description="Data type of the extracted value.",
required=True,
default="string",
options=[
PropertyOption(value="string", label="String"),
PropertyOption(value="number", label="Number"),
PropertyOption(value="boolean", label="Boolean"),
],
),
PropertySpec(
name="prompt",
type=PropertyType.string,
display_name="Extraction Hint",
description="Per-variable hint describing what to look for.",
editor="textarea",
),
],
),
# ---- Tools / documents ----
PropertySpec(
name="tool_uuids",
type=PropertyType.tool_refs,
display_name="Tools",
description="Tools the agent can invoke during the opening turn.",
llm_hint="List of tool UUIDs from `list_tools`.",
),
PropertySpec(
name="document_uuids",
type=PropertyType.document_refs,
display_name="Knowledge Base Documents",
description="Documents the agent can reference.",
llm_hint="List of document UUIDs from `list_documents`.",
),
# ---- Pre-call data fetch (advanced) ----
PropertySpec(
name="pre_call_fetch_enabled",
type=PropertyType.boolean,
display_name="Pre-Call Data Fetch",
description=(
"When true, makes a POST request to an external API before "
"the call starts and merges the JSON response into the call "
"context as template variables."
),
default=False,
),
PropertySpec(
name="pre_call_fetch_url",
type=PropertyType.url,
display_name="Endpoint URL",
description=(
"URL the pre-call POST request is sent to. The request body "
"includes caller and called numbers."
),
display_options=DisplayOptions(show={"pre_call_fetch_enabled": [True]}),
placeholder="https://api.example.com/customer-lookup",
),
PropertySpec(
name="pre_call_fetch_credential_uuid",
type=PropertyType.credential_ref,
display_name="Authentication",
description="Optional credential attached to the pre-call request.",
llm_hint="Credential UUID from `list_credentials`.",
display_options=DisplayOptions(show={"pre_call_fetch_enabled": [True]}),
),
],
examples=[
NodeExample(
name="warm_greeting",
data={
"name": "Greeting",
"prompt": "Greet warmly and ask the caller's reason for calling.",
"greeting_type": "text",
"greeting": "Hi {{first_name}}, this is Sarah from Acme.",
"allow_interrupt": True,
},
),
],
# `min_outgoing` is intentionally unset: a startCall is allowed to
# sit on the canvas without an outgoing edge (e.g. a workflow with
# just a greeting). Only constraint: nothing flows INTO the start.
graph_constraints=GraphConstraints(
min_incoming=0,
max_incoming=0,
),
)

View file

@ -1,79 +0,0 @@
"""Spec for the API Trigger node — exposes a public webhook URL that
external systems can hit to launch the workflow."""
from api.services.workflow.node_specs._base import (
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertySpec,
PropertyType,
)
SPEC = NodeSpec(
name="trigger",
display_name="API Trigger",
description=("Public HTTP endpoints that launch the workflow."),
llm_hint=(
"Exposes two public HTTP POST endpoints derived from the auto-generated "
"`trigger_path`:\n"
" • Production: `<backend>/api/v1/public/agent/<trigger_path>` — runs "
"the published agent. Use this from production systems.\n"
" • Test: `<backend>/api/v1/public/agent/test/<trigger_path>` — runs "
"the latest draft, useful for verifying changes before publishing. "
"Falls back to the published agent when no draft exists.\n"
"Both require an API key in the `X-API-Key` header.\n"
"Request body fields:\n"
" • `phone_number` (string, required) — destination to dial.\n"
" • `initial_context` (object, optional) — merged into the run's "
"initial context.\n"
" • `telephony_configuration_id` (int, optional) — pick a specific "
"telephony configuration for the call. Must belong to the same "
"organization as the trigger. When omitted, the org's default "
"outbound configuration is used."
),
category=NodeCategory.trigger,
icon="Webhook",
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Name",
description="Short identifier shown in the canvas. No runtime effect.",
required=True,
min_length=1,
default="API Trigger",
),
PropertySpec(
name="enabled",
type=PropertyType.boolean,
display_name="Enabled",
description="When false, the trigger URL returns 404.",
default=True,
),
PropertySpec(
name="trigger_path",
type=PropertyType.string,
display_name="Trigger Path",
description=(
"Auto-generated UUID-style path segment that uniquely "
"identifies this trigger. Used in both URLs:\n"
" • Production: `/api/v1/public/agent/<trigger_path>` — "
"executes the published agent.\n"
" • Test: `/api/v1/public/agent/test/<trigger_path>` — "
"executes the latest draft.\n"
"Do not edit manually."
),
),
],
examples=[
NodeExample(
name="default",
data={"name": "Inbound Trigger", "enabled": True},
),
],
graph_constraints=GraphConstraints(
min_incoming=0,
max_incoming=0,
),
)

View file

@ -1,133 +0,0 @@
"""Spec for the Webhook node — sends an HTTP request to an external system
after the workflow completes."""
from api.services.workflow.node_specs._base import (
GraphConstraints,
NodeCategory,
NodeExample,
NodeSpec,
PropertyOption,
PropertySpec,
PropertyType,
)
SPEC = NodeSpec(
name="webhook",
display_name="Webhook",
description="Send HTTP request after the workflow completes.",
llm_hint=(
"Sends an HTTP request to an external system after the workflow "
"completes. The payload is a Jinja-templated JSON body with access "
"to `workflow_run_id`, `initial_context`, `gathered_context`, "
"`annotations`, and call metadata."
),
category=NodeCategory.integration,
icon="Link2",
properties=[
PropertySpec(
name="name",
type=PropertyType.string,
display_name="Name",
description="Short identifier shown in the canvas and run logs.",
required=True,
min_length=1,
default="Webhook",
),
PropertySpec(
name="enabled",
type=PropertyType.boolean,
display_name="Enabled",
description="When false, the webhook is skipped at run time.",
default=True,
),
PropertySpec(
name="http_method",
type=PropertyType.options,
display_name="HTTP Method",
description="HTTP verb used for the outbound request.",
default="POST",
options=[
PropertyOption(value="GET", label="GET"),
PropertyOption(value="POST", label="POST"),
PropertyOption(value="PUT", label="PUT"),
PropertyOption(value="PATCH", label="PATCH"),
PropertyOption(value="DELETE", label="DELETE"),
],
),
PropertySpec(
name="endpoint_url",
type=PropertyType.url,
display_name="Endpoint URL",
description="URL the request is sent to.",
placeholder="https://api.example.com/webhook",
),
PropertySpec(
name="credential_uuid",
type=PropertyType.credential_ref,
display_name="Authentication",
description="Optional credential applied as the Authorization header.",
llm_hint="Credential UUID from `list_credentials`.",
),
PropertySpec(
name="custom_headers",
type=PropertyType.fixed_collection,
display_name="Custom Headers",
description="Additional HTTP headers to include with the request.",
properties=[
PropertySpec(
name="key",
type=PropertyType.string,
display_name="Header Name",
description="HTTP header name (e.g., 'X-Source').",
required=True,
),
PropertySpec(
name="value",
type=PropertyType.string,
display_name="Header Value",
description="Header value (supports {{template_variables}}).",
required=True,
),
],
),
PropertySpec(
name="payload_template",
type=PropertyType.json,
display_name="Payload Template",
description=(
"JSON body of the request. Values are Jinja-rendered against "
"the run context — `{{workflow_run_id}}`, "
"`{{gathered_context.foo}}`, `{{annotations.qa_xxx}}`, etc."
),
default={
"call_id": "{{workflow_run_id}}",
"first_name": "{{initial_context.first_name}}",
"rsvp": "{{gathered_context.rsvp}}",
"duration": "{{cost_info.call_duration_seconds}}",
"recording_url": "{{recording_url}}",
"transcript_url": "{{transcript_url}}",
},
),
],
examples=[
NodeExample(
name="post_to_crm",
data={
"name": "Notify CRM",
"enabled": True,
"http_method": "POST",
"endpoint_url": "https://crm.example.com/calls",
"payload_template": {
"run_id": "{{workflow_run_id}}",
"outcome": "{{gathered_context.call_disposition}}",
},
},
),
],
# Webhooks fire post-call (run_integrations scans nodes by type),
# never as a graph step. Reject any edge into or out of a webhook so
# the editor can't wire one into the conversation flow.
graph_constraints=GraphConstraints(
min_incoming=0, max_incoming=0, min_outgoing=0, max_outgoing=0
),
)

View file

@ -1,4 +1,5 @@
from typing import TYPE_CHECKING, Awaitable, Callable, Literal, Optional, Union
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Optional, Union
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import (
@ -17,6 +18,7 @@ from pipecat.services.settings import LLMSettings
from pipecat.utils.enums import EndTaskReason
from api.db import db_client
from api.enums import ToolCategory
from api.services.pipecat.audio_playback import play_audio
from api.services.workflow.disposition_mapper import apply_disposition_mapping
from api.services.workflow.workflow_graph import Node, WorkflowGraph
@ -35,6 +37,7 @@ import asyncio
from loguru import logger
from api.services.workflow import pipecat_engine_callbacks as engine_callbacks
from api.services.workflow.mcp_tool_session import McpToolSession
from api.services.workflow.pipecat_engine_context_composer import (
compose_functions_for_node,
compose_system_prompt_for_node,
@ -117,6 +120,9 @@ class PipecatEngine:
# Cached organization ID (resolved lazily from workflow run)
self._organization_id: Optional[int] = None
# Open MCP tool sessions for this call, keyed by tool_uuid
self._mcp_sessions: Dict[str, McpToolSession] = {}
# Embeddings configuration (passed from run_pipeline.py)
self._embeddings_api_key: Optional[str] = embeddings_api_key
self._embeddings_model: Optional[str] = embeddings_model
@ -179,6 +185,9 @@ class PipecatEngine:
# Helper that encapsulates custom tool management
self._custom_tool_manager = CustomToolManager(self)
# Open persistent MCP server sessions for this call (degrades on failure)
await self._open_mcp_sessions()
# Helper that encapsulates context summarization
if self._context_compaction_enabled:
self._context_summarization_manager = ContextSummarizationManager(self)
@ -504,7 +513,10 @@ class PipecatEngine:
# Register custom tool handlers for this node
if node.tool_uuids and self._custom_tool_manager:
await self._custom_tool_manager.register_handlers(node.tool_uuids)
await self._custom_tool_manager.register_handlers(
node.tool_uuids,
mcp_tool_filters=getattr(node, "mcp_tool_filters", None),
)
# Register knowledge base retrieval handler if node has documents
if node.document_uuids:
@ -530,7 +542,7 @@ class PipecatEngine:
node = self.workflow.nodes[node_id]
logger.debug(
f"Executing node: name: {node.name} is_static: {node.is_static} allow_interrupt: {node.allow_interrupt} is_end: {node.is_end}"
f"Executing node: name: {node.name} allow_interrupt: {node.allow_interrupt} is_end: {node.is_end}"
)
# Track previous node for transition event
@ -585,11 +597,8 @@ class PipecatEngine:
)
await asyncio.sleep(delay_duration)
if node.is_static:
raise ValueError("Static nodes are not supported!")
else:
# Setup LLM Context with Prompts and Functions
await self._setup_llm_context(node)
# Setup LLM context with prompts and functions.
await self._setup_llm_context(node)
def get_node_greeting(self, node_id: str) -> Optional[tuple[str, Optional[str]]]:
"""Return the greeting info for a node, or None if not configured.
@ -685,19 +694,13 @@ class PipecatEngine:
async def _handle_end_node(self, node: Node) -> None:
"""Handle end node execution."""
if node.is_static:
raise ValueError("Static nodes are not supported!")
else:
# Setup LLM Context with Prompts and Functions
await self._setup_llm_context(node)
# Setup LLM context with prompts and functions.
await self._setup_llm_context(node)
async def _handle_agent_node(self, node: Node) -> None:
"""Handle agent node execution."""
if node.is_static:
raise ValueError("Static nodes are not supported!")
else:
# Setup LLM Context with Prompts and Functions
await self._setup_llm_context(node)
# Setup LLM context with prompts and functions.
await self._setup_llm_context(node)
async def end_call_with_reason(
self,
@ -884,6 +887,79 @@ class PipecatEngine:
"""Get the gathered context including extracted variables."""
return self._gathered_context.copy()
async def _open_mcp_sessions(self) -> None:
"""Connect every MCP-category tool referenced by any workflow node.
Failures degrade (session marked unavailable); never raises."""
from api.services.workflow.tools.mcp_tool import (
McpDefinitionError,
validate_mcp_definition,
)
try:
tool_uuids: set[str] = set()
for node in self.workflow.nodes.values():
for tu in getattr(node, "tool_uuids", None) or []:
tool_uuids.add(tu)
if not tool_uuids:
return
organization_id = await self._get_organization_id()
if not organization_id:
logger.warning("Cannot open MCP sessions: organization_id missing")
return
tools = await db_client.get_tools_by_uuids(
list(tool_uuids), organization_id
)
for tool in tools:
if tool.category != ToolCategory.MCP.value:
continue
try:
cfg = validate_mcp_definition(tool.definition)
except McpDefinitionError as e:
logger.warning(
f"Skipping MCP tool '{tool.name}' ({tool.tool_uuid}): "
f"invalid definition: {e}"
)
continue
credential = None
if cfg["credential_uuid"]:
try:
credential = await db_client.get_credential_by_uuid(
cfg["credential_uuid"], organization_id
)
except Exception as e:
logger.warning(
f"MCP tool '{tool.name}': credential fetch failed: {e}"
)
continue
session = McpToolSession(
tool_uuid=tool.tool_uuid,
tool_name=tool.name,
url=cfg["url"],
credential=credential,
tools_filter=cfg["tools_filter"],
timeout_secs=cfg["timeout_secs"],
sse_read_timeout_secs=cfg["sse_read_timeout_secs"],
)
await session.start()
self._mcp_sessions[tool.tool_uuid] = session
except Exception as e:
logger.warning(
f"Failed to open MCP sessions; call proceeds without MCP tools: {e}",
exc_info=True,
)
async def _close_mcp_sessions(self) -> None:
for tool_uuid, session in list(self._mcp_sessions.items()):
try:
await session.close()
except Exception as e:
logger.warning(f"Error closing MCP session {tool_uuid}: {e}")
self._mcp_sessions = {}
async def cleanup(self):
"""Clean up engine resources on disconnect."""
# Cancel any pending timeout tasks
@ -893,6 +969,12 @@ class PipecatEngine:
):
self._user_response_timeout_task.cancel()
# Cancel any in-flight background summarization
if self._context_summarization_manager:
await self._context_summarization_manager.cleanup()
# Cancel any in-flight background summarization.
# MCP sessions are closed in a finally block so they are guaranteed to
# run even if the summarization cleanup raises an exception.
try:
if self._context_summarization_manager:
await self._context_summarization_manager.cleanup()
finally:
# Close any open MCP tool sessions
await self._close_mcp_sessions()

View file

@ -117,7 +117,8 @@ async def compose_functions_for_node(
# Custom tools
if node.tool_uuids and custom_tool_manager:
custom_tool_schemas = await custom_tool_manager.get_tool_schemas(
node.tool_uuids
node.tool_uuids,
mcp_tool_filters=getattr(node, "mcp_tool_filters", None),
)
functions.extend(custom_tool_schemas)

View file

@ -34,6 +34,7 @@ from api.services.workflow.tools.custom_tool import (
)
if TYPE_CHECKING:
from api.services.workflow.mcp_tool_session import McpToolSession
from api.services.workflow.pipecat_engine import PipecatEngine
@ -121,11 +122,18 @@ class CustomToolManager:
"""Get the organization ID from the engine (shared cache)."""
return await self._engine._get_organization_id()
async def get_tool_schemas(self, tool_uuids: list[str]) -> list[FunctionSchema]:
async def get_tool_schemas(
self,
tool_uuids: list[str],
mcp_tool_filters: Optional[dict[str, list[str]]] = None,
) -> list[FunctionSchema]:
"""Fetch custom tools and convert them to function schemas.
Args:
tool_uuids: List of tool UUIDs to fetch
mcp_tool_filters: Optional per-node filter mapping tool_uuid list of
raw MCP tool names to expose. None (default) exposes all tools.
Empty dict or entry with [] suppresses all tools for that uuid.
Returns:
List of FunctionSchema objects for LLM
@ -154,6 +162,22 @@ class CustomToolManager:
)
continue
if tool.category == ToolCategory.MCP.value:
session = self._engine._mcp_sessions.get(tool.tool_uuid)
if session is None or not session.available:
logger.warning(
f"MCP tool '{tool.name}' ({tool.tool_uuid}) "
f"unavailable; skipping"
)
continue
allowed = (
None
if mcp_tool_filters is None
else set(mcp_tool_filters.get(tool.tool_uuid, []))
)
schemas.extend(session.function_schemas(allowed))
continue
raw_schema = tool_to_function_schema(tool)
function_name = raw_schema["function"]["name"]
@ -178,11 +202,18 @@ class CustomToolManager:
logger.error(f"Failed to fetch custom tools: {e}")
return []
async def register_handlers(self, tool_uuids: list[str]) -> None:
async def register_handlers(
self,
tool_uuids: list[str],
mcp_tool_filters: Optional[dict[str, list[str]]] = None,
) -> None:
"""Register custom tool execution handlers with the LLM.
Args:
tool_uuids: List of tool UUIDs to register handlers for
mcp_tool_filters: Optional per-node filter mapping tool_uuid list of
raw MCP tool names to expose. None (default) exposes all tools.
Empty dict or entry with [] suppresses all tools for that uuid.
"""
organization_id = await self.get_organization_id()
if not organization_id:
@ -203,6 +234,32 @@ class CustomToolManager:
)
continue
if tool.category == ToolCategory.MCP.value:
session = self._engine._mcp_sessions.get(tool.tool_uuid)
if session is None or not session.available:
logger.warning(
f"MCP tool '{tool.name}' ({tool.tool_uuid}) "
f"unavailable; skipping handler registration"
)
continue
allowed = (
None
if mcp_tool_filters is None
else set(mcp_tool_filters.get(tool.tool_uuid, []))
)
mcp_schemas = session.function_schemas(allowed)
for fs in mcp_schemas:
self._engine.llm.register_function(
fs.name,
self._create_mcp_handler(session, fs.name),
timeout_secs=session.call_timeout_secs,
)
logger.debug(
f"Registered {len(mcp_schemas)} MCP "
f"handlers for tool '{tool.name}' ({tool.tool_uuid})"
)
continue
schema = tool_to_function_schema(tool)
function_name = schema["function"]["name"]
@ -335,6 +392,29 @@ class CustomToolManager:
return http_tool_handler
def _create_mcp_handler(self, session: "McpToolSession", function_name: str):
"""Create a handler that proxies an LLM function call to a live MCP
session. Errors are returned to the LLM as structured text so the
agent can recover verbally; the call is never crashed."""
async def mcp_tool_handler(
function_call_params: FunctionCallParams,
) -> None:
logger.info(f"MCP Tool EXECUTED: {function_name}")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
result = await session.call(
function_name, function_call_params.arguments or {}
)
await function_call_params.result_callback(result)
except Exception as e:
logger.error(f"MCP tool '{function_name}' failed: {e}")
await function_call_params.result_callback(
{"status": "error", "error": str(e)}
)
return mcp_tool_handler
def _create_end_call_handler(self, tool: Any, function_name: str):
"""Create a handler function for an end call tool.

View file

@ -0,0 +1,116 @@
"""Pure helpers for MCP-category tools: definition validation and
LLM-function-name namespacing. No I/O, no MCP protocol here."""
from __future__ import annotations
import re
from typing import Any, Dict, Literal, Optional
from pydantic import BaseModel, Field, ValidationError, field_validator
DEFAULT_TIMEOUT_SECS = 30
DEFAULT_SSE_READ_TIMEOUT_SECS = 300
class McpDefinitionError(ValueError):
"""Raised when an MCP tool definition is structurally invalid."""
class McpToolConfig(BaseModel):
"""Configuration for an MCP tool definition."""
transport: Literal["streamable_http"] = Field(
default="streamable_http", description="MCP transport protocol"
)
url: str = Field(description="MCP server URL (must be http:// or https://)")
credential_uuid: Optional[str] = Field(
default=None, description="Reference to ExternalCredentialModel for auth"
)
tools_filter: list[str] = Field(
default_factory=list,
description="Allowlist of MCP tool names to expose (empty = all tools)",
)
timeout_secs: int = Field(
default=DEFAULT_TIMEOUT_SECS, description="Connection timeout in seconds"
)
sse_read_timeout_secs: int = Field(
default=DEFAULT_SSE_READ_TIMEOUT_SECS,
description="SSE read timeout in seconds",
)
discovered_tools: list[dict[str, Any]] = Field(
default_factory=list,
description=(
"Server-managed cache of the MCP server's tool catalog "
"[{name, description}]. Populated best-effort by the backend."
),
)
@field_validator("url")
@classmethod
def validate_url(cls, v: str) -> str:
if not isinstance(v, str) or not v.startswith(("http://", "https://")):
raise ValueError("config.url must be an http(s) URL")
return v
@field_validator("tools_filter")
@classmethod
def validate_tools_filter(cls, v: list[str]) -> list[str]:
if not all(isinstance(tool_name, str) for tool_name in v):
raise ValueError("config.tools_filter must be a list of strings")
return v
class McpToolDefinition(BaseModel):
"""Persisted MCP tool definition."""
schema_version: int = Field(default=1, description="Schema version")
type: Literal["mcp"] = Field(description="Tool type")
config: McpToolConfig = Field(description="MCP server configuration")
def _format_validation_error(error: ValidationError) -> str:
parts: list[str] = []
for item in error.errors():
location = ".".join(str(part) for part in item["loc"])
parts.append(f"{location}: {item['msg']}")
return "; ".join(parts)
def validate_mcp_definition(definition: Dict[str, Any]) -> Dict[str, Any]:
"""Validate a ``type: "mcp"`` ToolModel definition and return a
normalized config dict with defaults applied.
Raises:
McpDefinitionError: if the definition is missing required fields
or uses an unsupported transport.
"""
if not isinstance(definition, dict) or definition.get("type") != "mcp":
raise McpDefinitionError("definition.type must be 'mcp'")
config = definition.get("config")
if not isinstance(config, dict):
raise McpDefinitionError("definition.config is required and must be an object")
try:
parsed = McpToolDefinition.model_validate(definition)
except ValidationError as e:
raise McpDefinitionError(_format_validation_error(e)) from e
return parsed.config.model_dump(exclude={"discovered_tools"})
def _slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "_", value.strip().lower()).strip("_")
return slug
def namespace_function_name(
tool_name: str, mcp_tool_name: str, *, fallback: str = "server"
) -> str:
"""Build a collision-safe LLM function name: ``mcp__<slug>__<tool>``.
``slug`` is derived from the Dograh ToolModel name; if it slugifies to
empty, ``fallback`` (e.g. first 8 chars of tool_uuid) is used instead.
"""
slug = _slugify(tool_name) or _slugify(fallback) or "server"
return f"mcp__{slug}__{mcp_tool_name}"

View file

@ -4,7 +4,8 @@ from typing import Dict, List, Set
from api.services.workflow.dto import EdgeDataDTO, NodeType, ReactFlowDTO
from api.services.workflow.errors import ItemKind, WorkflowError
from api.services.workflow.node_specs import REGISTRY
from api.services.workflow.node_data import BaseNodeData
from api.services.workflow.node_specs import get_spec
# Regex for matching {{ variable }} template placeholders.
# Captures: group(1) = variable path, group(2) = filter name, group(3) = filter value.
@ -62,7 +63,7 @@ class Edge:
class Node:
def __init__(self, id: str, node_type: NodeType, data):
def __init__(self, id: str, node_type: str, data: BaseNodeData):
self.id, self.node_type, self.data = id, node_type, data
self.out: Dict[str, "Node"] = {} # forward nodes
self.out_edges: List[Edge] = [] # forward edges with properties
@ -75,7 +76,6 @@ class Node:
# Type-specific fields — read with getattr so this works for every
# node variant in the discriminated union.
self.prompt = getattr(data, "prompt", None)
self.is_static = getattr(data, "is_static", False)
self.allow_interrupt = getattr(data, "allow_interrupt", False)
self.extraction_enabled = getattr(data, "extraction_enabled", False)
self.extraction_prompt = getattr(data, "extraction_prompt", None)
@ -84,11 +84,11 @@ class Node:
self.greeting = getattr(data, "greeting", None)
self.greeting_type = getattr(data, "greeting_type", None)
self.greeting_recording_id = getattr(data, "greeting_recording_id", None)
self.detect_voicemail = getattr(data, "detect_voicemail", False)
self.delayed_start = getattr(data, "delayed_start", False)
self.delayed_start_duration = getattr(data, "delayed_start_duration", None)
self.tool_uuids = getattr(data, "tool_uuids", None)
self.document_uuids = getattr(data, "document_uuids", None)
self.mcp_tool_filters = getattr(data, "mcp_tool_filters", None)
self.pre_call_fetch_enabled = getattr(data, "pre_call_fetch_enabled", False)
self.pre_call_fetch_url = getattr(data, "pre_call_fetch_url", None)
self.pre_call_fetch_credential_uuid = getattr(
@ -105,11 +105,11 @@ class WorkflowGraph:
"""
def __init__(self, dto: ReactFlowDTO):
# build adjacency list. n.type comes off the discriminated-union
# variant as a literal string; coerce to NodeType for downstream
# comparisons.
# Build adjacency list from validated DTO nodes. Core node comparisons
# still use NodeType string enums; integration nodes remain plain
# strings and resolve constraints through node specs.
self.nodes: Dict[str, Node] = {
n.id: Node(n.id, NodeType(n.type), n.data) for n in dto.nodes
n.id: Node(n.id, n.type, n.data) for n in dto.nodes
}
# Store all edges
@ -139,7 +139,7 @@ class WorkflowGraph:
# Get a reference to the global node
try:
self.global_node_id = [
n.id for n in dto.nodes if n.type == NodeType.globalNode
n.id for n in dto.nodes if n.type == NodeType.globalNode.value
][0]
except IndexError:
self.global_node_id = None
@ -249,7 +249,7 @@ class WorkflowGraph:
def _assert_global_node(self):
errors: list[WorkflowError] = []
global_node = [
n for n in self.nodes.values() if n.node_type == NodeType.globalNode
n for n in self.nodes.values() if n.node_type == NodeType.globalNode.value
]
if not len(global_node) <= 1:
errors.append(
@ -281,7 +281,7 @@ class WorkflowGraph:
in_deg[m.id] += 1
for n in self.nodes.values():
spec = REGISTRY.get(n.node_type.value)
spec = get_spec(n.node_type)
if spec is None or spec.graph_constraints is None:
continue
gc = spec.graph_constraints