2026-04-25 17:38:38 +05:30
|
|
|
"""MCP tool that accepts LLM-authored SDK TypeScript and creates a new workflow.
|
|
|
|
|
|
|
|
|
|
Companion to `save_workflow`: where `save_workflow` updates an existing
|
|
|
|
|
workflow as a new draft, `create_workflow` brings a workflow into being
|
|
|
|
|
in one shot. The resulting workflow is published as version 1 — there
|
|
|
|
|
is no prior published version to protect, so we skip the draft step.
|
|
|
|
|
|
|
|
|
|
Execution flow mirrors `save_workflow`:
|
|
|
|
|
1. Parse via the Node TS validator — AST-only, never executes the code.
|
|
|
|
|
2. Pydantic validation via `ReactFlowDTO.model_validate`.
|
|
|
|
|
3. Graph validation via `WorkflowGraph`.
|
|
|
|
|
4. Persist via `db_client.create_workflow` — workflow row + v1
|
|
|
|
|
published definition in a single transaction.
|
|
|
|
|
|
2026-05-20 18:43:18 +05:30
|
|
|
Each failure path returns an `error_code` via `_error_result`. Those
|
|
|
|
|
codes and their meanings are documented in the `create_workflow`
|
|
|
|
|
docstring (the description shipped to the LLM via `tools/list`); keep the
|
|
|
|
|
two in sync — `test_mcp_instructions_drift.py` enforces it.
|
2026-04-25 17:38:38 +05:30
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
from pydantic import ValidationError as PydanticValidationError
|
|
|
|
|
|
|
|
|
|
from api.db import db_client
|
2026-04-25 19:44:44 +05:30
|
|
|
from api.db.agent_trigger_client import TriggerPathConflictError
|
2026-04-25 17:38:38 +05:30
|
|
|
from api.enums import PostHogEvent
|
|
|
|
|
from api.mcp_server.auth import authenticate_mcp_request
|
|
|
|
|
from api.mcp_server.tracing import traced_tool
|
|
|
|
|
from api.mcp_server.ts_bridge import TsBridgeError, parse_code
|
|
|
|
|
from api.services.posthog_client import capture_event
|
|
|
|
|
from api.services.workflow.dto import ReactFlowDTO
|
|
|
|
|
from api.services.workflow.layout import reconcile_positions
|
2026-05-08 16:02:51 +05:30
|
|
|
from api.services.workflow.workflow_graph import WorkflowGraph
|
2026-04-25 17:38:38 +05:30
|
|
|
|
|
|
|
|
|
|
|
|
|
def _error_result(code: str, message: str, **extra: Any) -> dict[str, Any]:
|
|
|
|
|
return {"created": False, "error_code": code, "error": message, **extra}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _format_errors(errors: list[dict[str, Any]]) -> str:
|
|
|
|
|
parts: list[str] = []
|
|
|
|
|
for e in errors:
|
|
|
|
|
loc = ""
|
|
|
|
|
line = e.get("line")
|
|
|
|
|
col = e.get("column")
|
|
|
|
|
if line is not None:
|
|
|
|
|
loc = f" (line {line}" + (f", col {col}" if col is not None else "") + ")"
|
|
|
|
|
parts.append(f"{e.get('message', '')}{loc}")
|
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_trigger_paths(workflow_definition: dict) -> list[str]:
|
|
|
|
|
"""Mirror of `routes.workflow.extract_trigger_paths` — kept local so the
|
|
|
|
|
MCP layer doesn't depend on the route module."""
|
|
|
|
|
if not workflow_definition:
|
|
|
|
|
return []
|
|
|
|
|
paths: list[str] = []
|
|
|
|
|
for node in workflow_definition.get("nodes") or []:
|
|
|
|
|
if node.get("type") == "trigger":
|
|
|
|
|
trigger_path = (node.get("data") or {}).get("trigger_path")
|
|
|
|
|
if trigger_path:
|
|
|
|
|
paths.append(trigger_path)
|
|
|
|
|
return paths
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@traced_tool
|
|
|
|
|
async def create_workflow(code: str) -> dict[str, Any]:
|
|
|
|
|
"""Parse SDK TypeScript and create a new published workflow.
|
|
|
|
|
|
|
|
|
|
`code` is TypeScript source using `@dograh/sdk`. The workflow name
|
|
|
|
|
comes from `new Workflow({ name: "..." })` — it is required.
|
|
|
|
|
|
|
|
|
|
Example code:
|
|
|
|
|
import { Workflow } from "@dograh/sdk";
|
|
|
|
|
import { startCall, endCall } from "@dograh/sdk/typed";
|
|
|
|
|
|
|
|
|
|
const wf = new Workflow({ name: "lead_qualification" });
|
|
|
|
|
const greeting = wf.addTyped(startCall({ name: "Greeting", prompt: "Hi!" }));
|
|
|
|
|
const done = wf.addTyped(endCall({ name: "Done", prompt: "Bye." }));
|
|
|
|
|
wf.edge(greeting, done, { label: "done", condition: "conversation complete" });
|
|
|
|
|
|
|
|
|
|
On success the new workflow is published as version 1. Use
|
|
|
|
|
`save_workflow(workflow_id, code)` for subsequent edits — those go to
|
|
|
|
|
a draft.
|
2026-05-20 18:43:18 +05:30
|
|
|
|
|
|
|
|
On failure the result has `created: false`, a machine-readable
|
|
|
|
|
`error_code`, and a human-readable `error` (with file:line:column
|
|
|
|
|
where the problem is locatable). Resubmit the full corrected source —
|
|
|
|
|
patches are not accepted. Possible `error_code` values:
|
|
|
|
|
- `parse_error` — disallowed construct or malformed TypeScript.
|
|
|
|
|
- `validation_error` — node data failed spec validation (unknown
|
|
|
|
|
field, missing required, wrong type, option out of range).
|
|
|
|
|
- `schema_validation` — wire-format (DTO) rejection; rare.
|
|
|
|
|
- `graph_validation` — structural rule broken (e.g. no start node,
|
|
|
|
|
unreachable node, edge to/from the wrong node type).
|
|
|
|
|
- `missing_name` — `new Workflow({ name })` is absent or empty; the
|
|
|
|
|
name is required and there is no prior workflow to fall back to.
|
|
|
|
|
- `trigger_path_conflict` — a trigger node's path is already used by
|
|
|
|
|
another workflow in this organization; rename it and resubmit.
|
|
|
|
|
- `bridge_error` — internal/transient; retry once, then surface it.
|
2026-04-25 17:38:38 +05:30
|
|
|
"""
|
|
|
|
|
user = await authenticate_mcp_request()
|
|
|
|
|
|
|
|
|
|
# 1. Parse + spec-validate via the Node TS validator.
|
|
|
|
|
try:
|
|
|
|
|
parsed = await parse_code(code)
|
|
|
|
|
except TsBridgeError as e:
|
|
|
|
|
logger.warning(f"ts_bridge failure: {e}")
|
|
|
|
|
return _error_result("bridge_error", str(e))
|
|
|
|
|
|
|
|
|
|
if not parsed.get("ok"):
|
|
|
|
|
stage = parsed.get("stage", "parse")
|
|
|
|
|
errs = parsed.get("errors") or []
|
|
|
|
|
code_key = "parse_error" if stage == "parse" else "validation_error"
|
|
|
|
|
return _error_result(code_key, _format_errors(errs), errors=errs)
|
|
|
|
|
|
|
|
|
|
payload = parsed["workflow"]
|
|
|
|
|
name = (parsed.get("workflowName") or "").strip()
|
|
|
|
|
if not name:
|
|
|
|
|
return _error_result(
|
|
|
|
|
"missing_name",
|
|
|
|
|
'Workflow name is required. Add `new Workflow({ name: "..." })` to the source.',
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# 1b. New workflow — no prior version to reconcile against; layout
|
|
|
|
|
# places new nodes adjacent to their first incoming neighbor.
|
|
|
|
|
payload = reconcile_positions(payload, None)
|
|
|
|
|
|
|
|
|
|
# 2. Pydantic shape check (defence in depth — parser is spec-driven).
|
|
|
|
|
try:
|
|
|
|
|
dto = ReactFlowDTO.model_validate(payload)
|
|
|
|
|
except PydanticValidationError as e:
|
|
|
|
|
return _error_result("schema_validation", str(e))
|
|
|
|
|
|
|
|
|
|
# 3. Graph-level semantic validation (start-node count, edge shape).
|
|
|
|
|
try:
|
|
|
|
|
WorkflowGraph(dto)
|
|
|
|
|
except (ValueError, Exception) as e: # WorkflowGraph raises ValueError
|
|
|
|
|
return _error_result("graph_validation", str(e))
|
|
|
|
|
|
2026-04-25 19:44:44 +05:30
|
|
|
# 4. Reject upfront if any trigger path collides with another workflow's
|
|
|
|
|
# trigger in this org so we don't leave an orphan workflow record.
|
|
|
|
|
trigger_paths = _extract_trigger_paths(payload)
|
|
|
|
|
if trigger_paths:
|
|
|
|
|
try:
|
|
|
|
|
await db_client.assert_trigger_paths_available(
|
|
|
|
|
trigger_paths=trigger_paths,
|
|
|
|
|
)
|
|
|
|
|
except TriggerPathConflictError as e:
|
|
|
|
|
return _error_result(
|
|
|
|
|
"trigger_path_conflict", str(e), trigger_paths=e.trigger_paths
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# 5. Persist as a new workflow with v1 published.
|
2026-04-25 17:38:38 +05:30
|
|
|
workflow = await db_client.create_workflow(
|
|
|
|
|
name,
|
|
|
|
|
payload,
|
|
|
|
|
user.id,
|
|
|
|
|
user.selected_organization_id,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
capture_event(
|
|
|
|
|
distinct_id=str(user.provider_id),
|
|
|
|
|
event=PostHogEvent.WORKFLOW_CREATED,
|
|
|
|
|
properties={
|
|
|
|
|
"workflow_id": workflow.id,
|
|
|
|
|
"workflow_name": workflow.name,
|
|
|
|
|
"source": "mcp",
|
|
|
|
|
"organization_id": user.selected_organization_id,
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if trigger_paths:
|
|
|
|
|
await db_client.sync_triggers_for_workflow(
|
|
|
|
|
workflow_id=workflow.id,
|
|
|
|
|
organization_id=user.selected_organization_id,
|
|
|
|
|
trigger_paths=trigger_paths,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"created": True,
|
|
|
|
|
"workflow_id": workflow.id,
|
|
|
|
|
"name": workflow.name,
|
|
|
|
|
"status": workflow.status,
|
|
|
|
|
"version_number": 1,
|
|
|
|
|
"node_count": len(payload["nodes"]),
|
|
|
|
|
"edge_count": len(payload["edges"]),
|
|
|
|
|
}
|