feat: add create workflow tool in MCP

This commit is contained in:
Abhishek Kumar 2026-04-25 17:38:38 +05:30
parent d6567eef28
commit 3e3773f400
13 changed files with 245 additions and 94 deletions

View file

@ -12,27 +12,26 @@ mistake the system has seen at least once.
"""
DOGRAH_MCP_INSTRUCTIONS = """\
You build and edit Dograh voice-AI workflows by emitting TypeScript that
uses the `@dograh/sdk` package. Workflows are stored as JSON; this server
projects them to TypeScript for editing and parses them back on save.
You build and edit Dograh voice-AI workflows by emitting TypeScript that uses the `@dograh/sdk` package. Workflows are stored as JSON; this server projects them to TypeScript for editing and parses them back on save.
## Call order
### Editing an existing workflow
1. `list_workflows` locate the target workflow.
2. `get_workflow_code(workflow_id)` fetch the current source (draft if
one exists, otherwise published).
3. (optional) `list_node_types` / `get_node_type(name)` consult before
adding or editing a node type whose fields aren't already visible in
the current code.
4. Mutate the code in place. Preserve existing nodes, edges, and variable
names unless the task requires removing or renaming them.
5. `save_workflow(workflow_id, code)` persist as a new draft. The
published version is untouched.
2. `get_workflow_code(workflow_id)` fetch the current source.
3. (optional) `list_node_types` / `get_node_type(name)` consult before adding or editing a node type whose fields aren't already visible in the current code.
4. Mutate the code in place. Preserve existing nodes, edges, and variable names unless the task requires removing or renaming them.
5. `save_workflow(workflow_id, code)` persist as a new draft. The published version is untouched.
### Creating a new workflow
1. Create a simple 1-node workflow with only `startCall`. The user can iteratively add complexity by editing it.
2. `list_node_types` / `get_node_type(name)` consult to learn the fields available on the node types you intend to use.
3. Author SDK TypeScript from scratch. The `new Workflow({ name: "..." })` call is required `name` becomes the workflow's display name.
4. `create_workflow(code)` persists a new workflow as version 1 (published). Returns the new `workflow_id`. For subsequent edits use `save_workflow(workflow_id, code)` (which writes a draft).
## Allowed source shape
The parser is AST-only and rejects anything outside this grammar. At the
top level, only three statement forms are accepted:
The parser is AST-only and rejects anything outside this grammar. At the top level, only three statement forms are accepted:
import ... from "..."; // any import
const <var> = <initializer>; // bindings (see below)
@ -43,26 +42,17 @@ top level, only three statement forms are accepted:
wf.addTyped(<factory>({ ...fields }) [, { position: [x, y] }])
wf.add({ type: "<nodeType>", ...fields [, position: [x, y]] })
No functions, arrow fns, loops, conditionals, ternaries, spreads,
destructuring, template interpolation, `export`, or `.map`/`.forEach`.
Data-position values must be plain literals (strings, numbers, booleans,
null, arrays/objects of same). A single `new Workflow(...)` per file
the `name` you pass there is the workflow's display name and is applied
on save (renames propagate immediately; definition changes go to draft).
No functions, arrow fns, loops, conditionals, ternaries, spreads, destructuring, template interpolation, `export`, or `.map`/`.forEach`.
Data-position values must be plain literals (strings, numbers, booleans, null, arrays/objects of same). A single `new Workflow(...)` per file the `name` you pass there is the workflow's display name and is applied on save (renames propagate immediately; definition changes go to draft).
## Adding edges — explicit syntax
wf.edge(source, target, { label: "...", condition: "..." });
Rules:
- `source` and `target` are the **bare variable identifiers** bound by
`wf.addTyped(...)` / `wf.add(...)` not strings, not `.id`, not inline
factories. Both must be declared earlier in the file.
- `label` is a short tag (4 words) shown in call logs to identify the
branch: `"qualified"`, `"wrap up"`, `"retry"`.
- `condition` is a full natural-language predicate the runtime evaluates
against the live conversation: `"caller confirmed interest in a demo"`,
not `"interested"`. Condition clarity determines routing accuracy.
- `source` and `target` are the **bare variable identifiers** bound by `wf.addTyped(...)` / `wf.add(...)` not strings, not `.id`, not inline factories. Both must be declared earlier in the file.
- `label` is a short tag (4 words) shown in call logs to identify the branch: `"qualified"`, `"wrap up"`, `"retry"`.
- `condition` is a full natural-language predicate the runtime evaluates against the live conversation: `"caller confirmed interest in a demo"`, not `"interested"`. Condition clarity determines routing accuracy.
- Both fields are required and must be non-empty strings.
- Edges are directional; emit one `wf.edge(...)` per outgoing branch.
- Place all edges after all node bindings; group by source node.
@ -76,49 +66,30 @@ Example:
condition: "user acknowledged the greeting and is ready to end"
});
## Hard graph constraints
- Exactly one `startCall` node per workflow; no incoming edges.
- `endCall` nodes have no outgoing edges.
- `globalNode` has no incoming or outgoing edges; its prompt is prepended
to every other node's prompt at runtime when that node sets
`add_global_prompt=true`.
- Every non-global node must be reachable from `startCall`.
## Iterating on errors
`save_workflow` returns one of:
- `parse_error` Disallowed construct (see grammar above) or
malformed TypeScript.
- `validation_error` Node data failed spec validation (unknown field,
missing required, wrong type, bad `options` value).
- `graph_validation` Structural rule broken (missing startCall,
unreachable node, edge to/from wrong node type).
- `bridge_error` Internal retry once, then surface to the user.
`save_workflow` and `create_workflow` return one of:
- `parse_error` Disallowed construct (see grammar above) or malformed TypeScript.
- `validation_error` Node data failed spec validation (unknown field, missing required, wrong type, bad `options` value).
- `graph_validation` Structural rule broken (missing startCall, unreachable node, edge to/from wrong node type).
- `missing_name` (`create_workflow` only) `new Workflow({ name })` is absent or empty.
- `bridge_error` Internal; retry once, then surface to the user.
Every error carries `line` and `column`. Fix at that location and
resubmit the **complete source** this tool does not accept patches.
Every error carries `line` and `column`. Fix at that location and resubmit the **complete source** this tool does not accept patches.
## Field conventions
- `data.name` is the canonical identifier. Pick a descriptive name
(`"Qualify Budget"`, not `"Node1"`) the generated code uses it as
the variable name and call logs reference it.
- `data.name` is the canonical identifier. Pick a descriptive name (`"Qualify Budget"`, not `"Node1"`) the generated code uses it as the variable name and call logs reference it.
- Reference fields take UUIDs, not human names:
`tool_refs`, `document_refs` from `list_tools`, `list_documents`
`credential_ref` from `list_credentials`
`recording_ref` from `list_recordings`
- `mention_textarea` fields (prompts, greetings, etc.) accept
`{{template_variables}}` values resolved at runtime from
`pre_call_fetch`, caller context, or earlier extraction passes.
- `tool_refs`, `document_refs` from `list_tools`, `list_documents`
- `credential_ref` from `list_credentials`
- `recording_ref` from `list_recordings`
- `mention_textarea` fields (prompts, greetings, etc.) accept `{{template_variables}}` values resolved at runtime from `pre_call_fetch`, caller context, or earlier extraction passes.
## Style
- Prefer `wf.addTyped(factory({ ... }))` over `wf.add({ type, ... })`.
- Only include fields whose values differ from the spec default the
parser re-applies defaults on save, so extras are noise.
- Omit `position`; the server reconciles positions against the previous
saved workflow and lays out new nodes automatically.
- Add nodes in call-flow order (start intermediate end) so the
generated code reads top-to-bottom, with all edges after all nodes.
- Only include fields whose values differ from the spec default the parser re-applies defaults on save, so extras are noise.
- Omit `position`; the server reconciles positions against the previous saved workflow and lays out new nodes automatically.
- Add nodes in call-flow order (start intermediate end) so the generated code reads top-to-bottom, with all edges after all nodes.
"""

View file

@ -1,13 +1,31 @@
from fastmcp import FastMCP
from api.mcp_server.instructions import DOGRAH_MCP_INSTRUCTIONS
from api.mcp_server.tools.catalog import (
list_credentials,
list_documents,
list_recordings,
list_tools,
)
from api.mcp_server.tools.create_workflow import create_workflow
from api.mcp_server.tools.get_workflow_code import get_workflow_code
from api.mcp_server.tools.node_types import get_node_type, list_node_types
from api.mcp_server.tools.save_workflow import save_workflow
from api.mcp_server.tools.workflows import get_workflow, list_workflows
mcp = FastMCP("dograh", instructions=DOGRAH_MCP_INSTRUCTIONS)
from api.mcp_server.tools import catalog as _catalog # noqa: E402, F401
from api.mcp_server.tools import (
get_workflow_code as _get_workflow_code, # noqa: E402, F401
)
from api.mcp_server.tools import node_types as _node_types # noqa: E402, F401
from api.mcp_server.tools import save_workflow as _save_workflow # noqa: E402, F401
from api.mcp_server.tools import workflows as _workflows # noqa: E402, F401
for _tool in (
create_workflow,
get_node_type,
get_workflow,
get_workflow_code,
list_credentials,
list_documents,
list_node_types,
list_recordings,
list_tools,
list_workflows,
save_workflow,
):
mcp.tool(_tool)

View file

@ -7,11 +7,9 @@ list the catalog before populating those fields with real UUIDs.
from api.db import db_client
from api.mcp_server.auth import authenticate_mcp_request
from api.mcp_server.server import mcp
from api.mcp_server.tracing import traced_tool
@mcp.tool
@traced_tool
async def list_tools(status: str | None = "active") -> list[dict]:
"""List tools the agent can invoke during a call.
@ -36,7 +34,6 @@ async def list_tools(status: str | None = "active") -> list[dict]:
]
@mcp.tool
@traced_tool
async def list_documents() -> list[dict]:
"""List knowledge-base documents the agent can reference during a call.
@ -59,7 +56,6 @@ async def list_documents() -> list[dict]:
]
@mcp.tool
@traced_tool
async def list_credentials() -> list[dict]:
"""List external credentials available for webhook auth and pre-call fetch.
@ -83,7 +79,6 @@ async def list_credentials() -> list[dict]:
]
@mcp.tool
@traced_tool
async def list_recordings(workflow_id: int | None = None) -> list[dict]:
"""List pre-recorded audio files available for greetings and edge

View file

@ -0,0 +1,163 @@
"""MCP tool that accepts LLM-authored SDK TypeScript and creates a new workflow.
Companion to `save_workflow`: where `save_workflow` updates an existing
workflow as a new draft, `create_workflow` brings a workflow into being
in one shot. The resulting workflow is published as version 1 there
is no prior published version to protect, so we skip the draft step.
Execution flow mirrors `save_workflow`:
1. Parse via the Node TS validator AST-only, never executes the code.
2. Pydantic validation via `ReactFlowDTO.model_validate`.
3. Graph validation via `WorkflowGraph`.
4. Persist via `db_client.create_workflow` workflow row + v1
published definition in a single transaction.
Error codes surfaced to the LLM match `save_workflow`. An additional
`missing_name` error is returned when the source omits
`new Workflow({ name: "..." })` the name is required and there is no
prior workflow to fall back to.
"""
from __future__ import annotations
from typing import Any
from loguru import logger
from pydantic import ValidationError as PydanticValidationError
from api.db import db_client
from api.enums import PostHogEvent
from api.mcp_server.auth import authenticate_mcp_request
from api.mcp_server.tracing import traced_tool
from api.mcp_server.ts_bridge import TsBridgeError, parse_code
from api.services.posthog_client import capture_event
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.layout import reconcile_positions
from api.services.workflow.workflow import WorkflowGraph
def _error_result(code: str, message: str, **extra: Any) -> dict[str, Any]:
return {"created": False, "error_code": code, "error": message, **extra}
def _format_errors(errors: list[dict[str, Any]]) -> str:
parts: list[str] = []
for e in errors:
loc = ""
line = e.get("line")
col = e.get("column")
if line is not None:
loc = f" (line {line}" + (f", col {col}" if col is not None else "") + ")"
parts.append(f"{e.get('message', '')}{loc}")
return "\n".join(parts)
def _extract_trigger_paths(workflow_definition: dict) -> list[str]:
"""Mirror of `routes.workflow.extract_trigger_paths` — kept local so the
MCP layer doesn't depend on the route module."""
if not workflow_definition:
return []
paths: list[str] = []
for node in workflow_definition.get("nodes") or []:
if node.get("type") == "trigger":
trigger_path = (node.get("data") or {}).get("trigger_path")
if trigger_path:
paths.append(trigger_path)
return paths
@traced_tool
async def create_workflow(code: str) -> dict[str, Any]:
"""Parse SDK TypeScript and create a new published workflow.
`code` is TypeScript source using `@dograh/sdk`. The workflow name
comes from `new Workflow({ name: "..." })` it is required.
Example code:
import { Workflow } from "@dograh/sdk";
import { startCall, endCall } from "@dograh/sdk/typed";
const wf = new Workflow({ name: "lead_qualification" });
const greeting = wf.addTyped(startCall({ name: "Greeting", prompt: "Hi!" }));
const done = wf.addTyped(endCall({ name: "Done", prompt: "Bye." }));
wf.edge(greeting, done, { label: "done", condition: "conversation complete" });
On success the new workflow is published as version 1. Use
`save_workflow(workflow_id, code)` for subsequent edits those go to
a draft.
"""
user = await authenticate_mcp_request()
# 1. Parse + spec-validate via the Node TS validator.
try:
parsed = await parse_code(code)
except TsBridgeError as e:
logger.warning(f"ts_bridge failure: {e}")
return _error_result("bridge_error", str(e))
if not parsed.get("ok"):
stage = parsed.get("stage", "parse")
errs = parsed.get("errors") or []
code_key = "parse_error" if stage == "parse" else "validation_error"
return _error_result(code_key, _format_errors(errs), errors=errs)
payload = parsed["workflow"]
name = (parsed.get("workflowName") or "").strip()
if not name:
return _error_result(
"missing_name",
'Workflow name is required. Add `new Workflow({ name: "..." })` to the source.',
)
# 1b. New workflow — no prior version to reconcile against; layout
# places new nodes adjacent to their first incoming neighbor.
payload = reconcile_positions(payload, None)
# 2. Pydantic shape check (defence in depth — parser is spec-driven).
try:
dto = ReactFlowDTO.model_validate(payload)
except PydanticValidationError as e:
return _error_result("schema_validation", str(e))
# 3. Graph-level semantic validation (start-node count, edge shape).
try:
WorkflowGraph(dto)
except (ValueError, Exception) as e: # WorkflowGraph raises ValueError
return _error_result("graph_validation", str(e))
# 4. Persist as a new workflow with v1 published.
workflow = await db_client.create_workflow(
name,
payload,
user.id,
user.selected_organization_id,
)
capture_event(
distinct_id=str(user.provider_id),
event=PostHogEvent.WORKFLOW_CREATED,
properties={
"workflow_id": workflow.id,
"workflow_name": workflow.name,
"source": "mcp",
"organization_id": user.selected_organization_id,
},
)
trigger_paths = _extract_trigger_paths(payload)
if trigger_paths:
await db_client.sync_triggers_for_workflow(
workflow_id=workflow.id,
organization_id=user.selected_organization_id,
trigger_paths=trigger_paths,
)
return {
"created": True,
"workflow_id": workflow.id,
"name": workflow.name,
"status": workflow.status,
"version_number": 1,
"node_count": len(payload["nodes"]),
"edge_count": len(payload["edges"]),
}

View file

@ -18,12 +18,10 @@ from fastapi import HTTPException
from api.db import db_client
from api.mcp_server.auth import authenticate_mcp_request
from api.mcp_server.server import mcp
from api.mcp_server.tracing import traced_tool
from api.mcp_server.ts_bridge import TsBridgeError, generate_code
@mcp.tool
@traced_tool
async def get_workflow_code(workflow_id: int) -> dict[str, Any]:
"""Return the workflow as SDK TypeScript code the LLM can edit.

View file

@ -7,12 +7,10 @@ each node's property schema before composing or modifying a workflow.
from fastapi import HTTPException
from api.mcp_server.auth import authenticate_mcp_request
from api.mcp_server.server import mcp
from api.mcp_server.tracing import traced_tool
from api.services.workflow.node_specs import SPEC_VERSION, all_specs, get_spec
@mcp.tool
@traced_tool
async def list_node_types() -> dict:
"""List every available node type with a brief summary.
@ -40,7 +38,6 @@ async def list_node_types() -> dict:
}
@mcp.tool
@traced_tool
async def get_node_type(name: str) -> dict:
"""Fetch the full schema for a node type, including every property's

View file

@ -32,7 +32,6 @@ from pydantic import ValidationError as PydanticValidationError
from api.db import db_client
from api.mcp_server.auth import authenticate_mcp_request
from api.mcp_server.server import mcp
from api.mcp_server.tracing import traced_tool
from api.mcp_server.ts_bridge import TsBridgeError, parse_code
from api.services.workflow.dto import ReactFlowDTO
@ -73,7 +72,6 @@ def _format_errors(errors: list[dict[str, Any]]) -> str:
return "\n".join(parts)
@mcp.tool
@traced_tool
async def save_workflow(workflow_id: int, code: str) -> dict[str, Any]:
"""Parse SDK TypeScript and save the resulting workflow as a draft.

View file

@ -2,11 +2,9 @@ from fastapi import HTTPException
from api.db import db_client
from api.mcp_server.auth import authenticate_mcp_request
from api.mcp_server.server import mcp
from api.mcp_server.tracing import traced_tool
@mcp.tool
@traced_tool
async def list_workflows(status: str | None = "active") -> list[dict]:
"""List agents (workflows) in the caller's organization.
@ -32,7 +30,6 @@ async def list_workflows(status: str | None = "active") -> list[dict]:
]
@mcp.tool
@traced_tool
async def get_workflow(workflow_id: int) -> dict:
"""Fetch a single agent by id, including its current published definition."""