From 3e3773f4007a86ee5091e4c9335159a5c15f98bf Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sat, 25 Apr 2026 17:38:38 +0530 Subject: [PATCH] feat: add create workflow tool in MCP --- api/mcp_server/instructions.py | 95 ++++------ api/mcp_server/server.py | 32 +++- api/mcp_server/tools/catalog.py | 5 - api/mcp_server/tools/create_workflow.py | 163 ++++++++++++++++++ api/mcp_server/tools/get_workflow_code.py | 2 - api/mcp_server/tools/node_types.py | 3 - api/mcp_server/tools/save_workflow.py | 2 - api/mcp_server/tools/workflows.py | 3 - api/requirements.dev.txt | 1 + scripts/release_sdks.sh | 25 ++- sdk/python/pyproject.toml | 2 +- .../src/dograh_sdk/_generated_models.py | 4 +- sdk/typescript/package.json | 2 +- 13 files changed, 245 insertions(+), 94 deletions(-) create mode 100644 api/mcp_server/tools/create_workflow.py diff --git a/api/mcp_server/instructions.py b/api/mcp_server/instructions.py index ecb8772..f0b2618 100644 --- a/api/mcp_server/instructions.py +++ b/api/mcp_server/instructions.py @@ -12,27 +12,26 @@ mistake the system has seen at least once. """ DOGRAH_MCP_INSTRUCTIONS = """\ -You build and edit Dograh voice-AI workflows by emitting TypeScript that -uses the `@dograh/sdk` package. Workflows are stored as JSON; this server -projects them to TypeScript for editing and parses them back on save. +You build and edit Dograh voice-AI workflows by emitting TypeScript that uses the `@dograh/sdk` package. Workflows are stored as JSON; this server projects them to TypeScript for editing and parses them back on save. ## Call order +### Editing an existing workflow 1. `list_workflows` — locate the target workflow. -2. `get_workflow_code(workflow_id)` — fetch the current source (draft if - one exists, otherwise published). -3. (optional) `list_node_types` / `get_node_type(name)` — consult before - adding or editing a node type whose fields aren't already visible in - the current code. -4. Mutate the code in place. Preserve existing nodes, edges, and variable - names unless the task requires removing or renaming them. -5. `save_workflow(workflow_id, code)` — persist as a new draft. The - published version is untouched. +2. `get_workflow_code(workflow_id)` — fetch the current source. +3. (optional) `list_node_types` / `get_node_type(name)` — consult before adding or editing a node type whose fields aren't already visible in the current code. +4. Mutate the code in place. Preserve existing nodes, edges, and variable names unless the task requires removing or renaming them. +5. `save_workflow(workflow_id, code)` — persist as a new draft. The published version is untouched. + +### Creating a new workflow +1. Create a simple 1-node workflow with only `startCall`. The user can iteratively add complexity by editing it. +2. `list_node_types` / `get_node_type(name)` — consult to learn the fields available on the node types you intend to use. +3. Author SDK TypeScript from scratch. The `new Workflow({ name: "..." })` call is required — `name` becomes the workflow's display name. +4. `create_workflow(code)` — persists a new workflow as version 1 (published). Returns the new `workflow_id`. For subsequent edits use `save_workflow(workflow_id, code)` (which writes a draft). ## Allowed source shape -The parser is AST-only and rejects anything outside this grammar. At the -top level, only three statement forms are accepted: +The parser is AST-only and rejects anything outside this grammar. At the top level, only three statement forms are accepted: import ... from "..."; // any import const = ; // bindings (see below) @@ -43,26 +42,17 @@ top level, only three statement forms are accepted: wf.addTyped(({ ...fields }) [, { position: [x, y] }]) wf.add({ type: "", ...fields [, position: [x, y]] }) -No functions, arrow fns, loops, conditionals, ternaries, spreads, -destructuring, template interpolation, `export`, or `.map`/`.forEach`. -Data-position values must be plain literals (strings, numbers, booleans, -null, arrays/objects of same). A single `new Workflow(...)` per file — -the `name` you pass there is the workflow's display name and is applied -on save (renames propagate immediately; definition changes go to draft). +No functions, arrow fns, loops, conditionals, ternaries, spreads, destructuring, template interpolation, `export`, or `.map`/`.forEach`. +Data-position values must be plain literals (strings, numbers, booleans, null, arrays/objects of same). A single `new Workflow(...)` per file — the `name` you pass there is the workflow's display name and is applied on save (renames propagate immediately; definition changes go to draft). ## Adding edges — explicit syntax wf.edge(source, target, { label: "...", condition: "..." }); Rules: -- `source` and `target` are the **bare variable identifiers** bound by - `wf.addTyped(...)` / `wf.add(...)` — not strings, not `.id`, not inline - factories. Both must be declared earlier in the file. -- `label` is a short tag (≤4 words) shown in call logs to identify the - branch: `"qualified"`, `"wrap up"`, `"retry"`. -- `condition` is a full natural-language predicate the runtime evaluates - against the live conversation: `"caller confirmed interest in a demo"`, - not `"interested"`. Condition clarity determines routing accuracy. +- `source` and `target` are the **bare variable identifiers** bound by `wf.addTyped(...)` / `wf.add(...)` — not strings, not `.id`, not inline factories. Both must be declared earlier in the file. +- `label` is a short tag (≤4 words) shown in call logs to identify the branch: `"qualified"`, `"wrap up"`, `"retry"`. +- `condition` is a full natural-language predicate the runtime evaluates against the live conversation: `"caller confirmed interest in a demo"`, not `"interested"`. Condition clarity determines routing accuracy. - Both fields are required and must be non-empty strings. - Edges are directional; emit one `wf.edge(...)` per outgoing branch. - Place all edges after all node bindings; group by source node. @@ -76,49 +66,30 @@ Example: condition: "user acknowledged the greeting and is ready to end" }); -## Hard graph constraints - -- Exactly one `startCall` node per workflow; no incoming edges. -- `endCall` nodes have no outgoing edges. -- `globalNode` has no incoming or outgoing edges; its prompt is prepended - to every other node's prompt at runtime when that node sets - `add_global_prompt=true`. -- Every non-global node must be reachable from `startCall`. - ## Iterating on errors -`save_workflow` returns one of: - - `parse_error` Disallowed construct (see grammar above) or - malformed TypeScript. - - `validation_error` Node data failed spec validation (unknown field, - missing required, wrong type, bad `options` value). - - `graph_validation` Structural rule broken (missing startCall, - unreachable node, edge to/from wrong node type). - - `bridge_error` Internal — retry once, then surface to the user. +`save_workflow` and `create_workflow` return one of: +- `parse_error` — Disallowed construct (see grammar above) or malformed TypeScript. +- `validation_error` — Node data failed spec validation (unknown field, missing required, wrong type, bad `options` value). +- `graph_validation` — Structural rule broken (missing startCall, unreachable node, edge to/from wrong node type). +- `missing_name` — (`create_workflow` only) `new Workflow({ name })` is absent or empty. +- `bridge_error` — Internal; retry once, then surface to the user. -Every error carries `line` and `column`. Fix at that location and -resubmit the **complete source** — this tool does not accept patches. +Every error carries `line` and `column`. Fix at that location and resubmit the **complete source** — this tool does not accept patches. ## Field conventions -- `data.name` is the canonical identifier. Pick a descriptive name - (`"Qualify Budget"`, not `"Node1"`) — the generated code uses it as - the variable name and call logs reference it. +- `data.name` is the canonical identifier. Pick a descriptive name (`"Qualify Budget"`, not `"Node1"`) — the generated code uses it as the variable name and call logs reference it. - Reference fields take UUIDs, not human names: - `tool_refs`, `document_refs` → from `list_tools`, `list_documents` - `credential_ref` → from `list_credentials` - `recording_ref` → from `list_recordings` -- `mention_textarea` fields (prompts, greetings, etc.) accept - `{{template_variables}}` — values resolved at runtime from - `pre_call_fetch`, caller context, or earlier extraction passes. + - `tool_refs`, `document_refs` → from `list_tools`, `list_documents` + - `credential_ref` → from `list_credentials` + - `recording_ref` → from `list_recordings` +- `mention_textarea` fields (prompts, greetings, etc.) accept `{{template_variables}}` — values resolved at runtime from `pre_call_fetch`, caller context, or earlier extraction passes. ## Style - Prefer `wf.addTyped(factory({ ... }))` over `wf.add({ type, ... })`. -- Only include fields whose values differ from the spec default — the - parser re-applies defaults on save, so extras are noise. -- Omit `position`; the server reconciles positions against the previous - saved workflow and lays out new nodes automatically. -- Add nodes in call-flow order (start → intermediate → end) so the - generated code reads top-to-bottom, with all edges after all nodes. +- Only include fields whose values differ from the spec default — the parser re-applies defaults on save, so extras are noise. +- Omit `position`; the server reconciles positions against the previous saved workflow and lays out new nodes automatically. +- Add nodes in call-flow order (start → intermediate → end) so the generated code reads top-to-bottom, with all edges after all nodes. """ diff --git a/api/mcp_server/server.py b/api/mcp_server/server.py index 46ffda5..12ad42e 100644 --- a/api/mcp_server/server.py +++ b/api/mcp_server/server.py @@ -1,13 +1,31 @@ from fastmcp import FastMCP from api.mcp_server.instructions import DOGRAH_MCP_INSTRUCTIONS +from api.mcp_server.tools.catalog import ( + list_credentials, + list_documents, + list_recordings, + list_tools, +) +from api.mcp_server.tools.create_workflow import create_workflow +from api.mcp_server.tools.get_workflow_code import get_workflow_code +from api.mcp_server.tools.node_types import get_node_type, list_node_types +from api.mcp_server.tools.save_workflow import save_workflow +from api.mcp_server.tools.workflows import get_workflow, list_workflows mcp = FastMCP("dograh", instructions=DOGRAH_MCP_INSTRUCTIONS) -from api.mcp_server.tools import catalog as _catalog # noqa: E402, F401 -from api.mcp_server.tools import ( - get_workflow_code as _get_workflow_code, # noqa: E402, F401 -) -from api.mcp_server.tools import node_types as _node_types # noqa: E402, F401 -from api.mcp_server.tools import save_workflow as _save_workflow # noqa: E402, F401 -from api.mcp_server.tools import workflows as _workflows # noqa: E402, F401 +for _tool in ( + create_workflow, + get_node_type, + get_workflow, + get_workflow_code, + list_credentials, + list_documents, + list_node_types, + list_recordings, + list_tools, + list_workflows, + save_workflow, +): + mcp.tool(_tool) diff --git a/api/mcp_server/tools/catalog.py b/api/mcp_server/tools/catalog.py index a87f9b9..fa6b343 100644 --- a/api/mcp_server/tools/catalog.py +++ b/api/mcp_server/tools/catalog.py @@ -7,11 +7,9 @@ list the catalog before populating those fields with real UUIDs. from api.db import db_client from api.mcp_server.auth import authenticate_mcp_request -from api.mcp_server.server import mcp from api.mcp_server.tracing import traced_tool -@mcp.tool @traced_tool async def list_tools(status: str | None = "active") -> list[dict]: """List tools the agent can invoke during a call. @@ -36,7 +34,6 @@ async def list_tools(status: str | None = "active") -> list[dict]: ] -@mcp.tool @traced_tool async def list_documents() -> list[dict]: """List knowledge-base documents the agent can reference during a call. @@ -59,7 +56,6 @@ async def list_documents() -> list[dict]: ] -@mcp.tool @traced_tool async def list_credentials() -> list[dict]: """List external credentials available for webhook auth and pre-call fetch. @@ -83,7 +79,6 @@ async def list_credentials() -> list[dict]: ] -@mcp.tool @traced_tool async def list_recordings(workflow_id: int | None = None) -> list[dict]: """List pre-recorded audio files available for greetings and edge diff --git a/api/mcp_server/tools/create_workflow.py b/api/mcp_server/tools/create_workflow.py new file mode 100644 index 0000000..66dc5e8 --- /dev/null +++ b/api/mcp_server/tools/create_workflow.py @@ -0,0 +1,163 @@ +"""MCP tool that accepts LLM-authored SDK TypeScript and creates a new workflow. + +Companion to `save_workflow`: where `save_workflow` updates an existing +workflow as a new draft, `create_workflow` brings a workflow into being +in one shot. The resulting workflow is published as version 1 — there +is no prior published version to protect, so we skip the draft step. + +Execution flow mirrors `save_workflow`: + 1. Parse via the Node TS validator — AST-only, never executes the code. + 2. Pydantic validation via `ReactFlowDTO.model_validate`. + 3. Graph validation via `WorkflowGraph`. + 4. Persist via `db_client.create_workflow` — workflow row + v1 + published definition in a single transaction. + +Error codes surfaced to the LLM match `save_workflow`. An additional +`missing_name` error is returned when the source omits +`new Workflow({ name: "..." })` — the name is required and there is no +prior workflow to fall back to. +""" + +from __future__ import annotations + +from typing import Any + +from loguru import logger +from pydantic import ValidationError as PydanticValidationError + +from api.db import db_client +from api.enums import PostHogEvent +from api.mcp_server.auth import authenticate_mcp_request +from api.mcp_server.tracing import traced_tool +from api.mcp_server.ts_bridge import TsBridgeError, parse_code +from api.services.posthog_client import capture_event +from api.services.workflow.dto import ReactFlowDTO +from api.services.workflow.layout import reconcile_positions +from api.services.workflow.workflow import WorkflowGraph + + +def _error_result(code: str, message: str, **extra: Any) -> dict[str, Any]: + return {"created": False, "error_code": code, "error": message, **extra} + + +def _format_errors(errors: list[dict[str, Any]]) -> str: + parts: list[str] = [] + for e in errors: + loc = "" + line = e.get("line") + col = e.get("column") + if line is not None: + loc = f" (line {line}" + (f", col {col}" if col is not None else "") + ")" + parts.append(f"{e.get('message', '')}{loc}") + return "\n".join(parts) + + +def _extract_trigger_paths(workflow_definition: dict) -> list[str]: + """Mirror of `routes.workflow.extract_trigger_paths` — kept local so the + MCP layer doesn't depend on the route module.""" + if not workflow_definition: + return [] + paths: list[str] = [] + for node in workflow_definition.get("nodes") or []: + if node.get("type") == "trigger": + trigger_path = (node.get("data") or {}).get("trigger_path") + if trigger_path: + paths.append(trigger_path) + return paths + + +@traced_tool +async def create_workflow(code: str) -> dict[str, Any]: + """Parse SDK TypeScript and create a new published workflow. + + `code` is TypeScript source using `@dograh/sdk`. The workflow name + comes from `new Workflow({ name: "..." })` — it is required. + + Example code: + import { Workflow } from "@dograh/sdk"; + import { startCall, endCall } from "@dograh/sdk/typed"; + + const wf = new Workflow({ name: "lead_qualification" }); + const greeting = wf.addTyped(startCall({ name: "Greeting", prompt: "Hi!" })); + const done = wf.addTyped(endCall({ name: "Done", prompt: "Bye." })); + wf.edge(greeting, done, { label: "done", condition: "conversation complete" }); + + On success the new workflow is published as version 1. Use + `save_workflow(workflow_id, code)` for subsequent edits — those go to + a draft. + """ + user = await authenticate_mcp_request() + + # 1. Parse + spec-validate via the Node TS validator. + try: + parsed = await parse_code(code) + except TsBridgeError as e: + logger.warning(f"ts_bridge failure: {e}") + return _error_result("bridge_error", str(e)) + + if not parsed.get("ok"): + stage = parsed.get("stage", "parse") + errs = parsed.get("errors") or [] + code_key = "parse_error" if stage == "parse" else "validation_error" + return _error_result(code_key, _format_errors(errs), errors=errs) + + payload = parsed["workflow"] + name = (parsed.get("workflowName") or "").strip() + if not name: + return _error_result( + "missing_name", + 'Workflow name is required. Add `new Workflow({ name: "..." })` to the source.', + ) + + # 1b. New workflow — no prior version to reconcile against; layout + # places new nodes adjacent to their first incoming neighbor. + payload = reconcile_positions(payload, None) + + # 2. Pydantic shape check (defence in depth — parser is spec-driven). + try: + dto = ReactFlowDTO.model_validate(payload) + except PydanticValidationError as e: + return _error_result("schema_validation", str(e)) + + # 3. Graph-level semantic validation (start-node count, edge shape). + try: + WorkflowGraph(dto) + except (ValueError, Exception) as e: # WorkflowGraph raises ValueError + return _error_result("graph_validation", str(e)) + + # 4. Persist as a new workflow with v1 published. + workflow = await db_client.create_workflow( + name, + payload, + user.id, + user.selected_organization_id, + ) + + capture_event( + distinct_id=str(user.provider_id), + event=PostHogEvent.WORKFLOW_CREATED, + properties={ + "workflow_id": workflow.id, + "workflow_name": workflow.name, + "source": "mcp", + "organization_id": user.selected_organization_id, + }, + ) + + trigger_paths = _extract_trigger_paths(payload) + if trigger_paths: + await db_client.sync_triggers_for_workflow( + workflow_id=workflow.id, + organization_id=user.selected_organization_id, + trigger_paths=trigger_paths, + ) + + return { + "created": True, + "workflow_id": workflow.id, + "name": workflow.name, + "status": workflow.status, + "version_number": 1, + "node_count": len(payload["nodes"]), + "edge_count": len(payload["edges"]), + } diff --git a/api/mcp_server/tools/get_workflow_code.py b/api/mcp_server/tools/get_workflow_code.py index 311c7ed..d0c99c4 100644 --- a/api/mcp_server/tools/get_workflow_code.py +++ b/api/mcp_server/tools/get_workflow_code.py @@ -18,12 +18,10 @@ from fastapi import HTTPException from api.db import db_client from api.mcp_server.auth import authenticate_mcp_request -from api.mcp_server.server import mcp from api.mcp_server.tracing import traced_tool from api.mcp_server.ts_bridge import TsBridgeError, generate_code -@mcp.tool @traced_tool async def get_workflow_code(workflow_id: int) -> dict[str, Any]: """Return the workflow as SDK TypeScript code the LLM can edit. diff --git a/api/mcp_server/tools/node_types.py b/api/mcp_server/tools/node_types.py index f29c940..04e8c55 100644 --- a/api/mcp_server/tools/node_types.py +++ b/api/mcp_server/tools/node_types.py @@ -7,12 +7,10 @@ each node's property schema before composing or modifying a workflow. from fastapi import HTTPException from api.mcp_server.auth import authenticate_mcp_request -from api.mcp_server.server import mcp from api.mcp_server.tracing import traced_tool from api.services.workflow.node_specs import SPEC_VERSION, all_specs, get_spec -@mcp.tool @traced_tool async def list_node_types() -> dict: """List every available node type with a brief summary. @@ -40,7 +38,6 @@ async def list_node_types() -> dict: } -@mcp.tool @traced_tool async def get_node_type(name: str) -> dict: """Fetch the full schema for a node type, including every property's diff --git a/api/mcp_server/tools/save_workflow.py b/api/mcp_server/tools/save_workflow.py index c05df55..13b60cf 100644 --- a/api/mcp_server/tools/save_workflow.py +++ b/api/mcp_server/tools/save_workflow.py @@ -32,7 +32,6 @@ from pydantic import ValidationError as PydanticValidationError from api.db import db_client from api.mcp_server.auth import authenticate_mcp_request -from api.mcp_server.server import mcp from api.mcp_server.tracing import traced_tool from api.mcp_server.ts_bridge import TsBridgeError, parse_code from api.services.workflow.dto import ReactFlowDTO @@ -73,7 +72,6 @@ def _format_errors(errors: list[dict[str, Any]]) -> str: return "\n".join(parts) -@mcp.tool @traced_tool async def save_workflow(workflow_id: int, code: str) -> dict[str, Any]: """Parse SDK TypeScript and save the resulting workflow as a draft. diff --git a/api/mcp_server/tools/workflows.py b/api/mcp_server/tools/workflows.py index 7580511..af5f165 100644 --- a/api/mcp_server/tools/workflows.py +++ b/api/mcp_server/tools/workflows.py @@ -2,11 +2,9 @@ from fastapi import HTTPException from api.db import db_client from api.mcp_server.auth import authenticate_mcp_request -from api.mcp_server.server import mcp from api.mcp_server.tracing import traced_tool -@mcp.tool @traced_tool async def list_workflows(status: str | None = "active") -> list[dict]: """List agents (workflows) in the caller's organization. @@ -32,7 +30,6 @@ async def list_workflows(status: str | None = "active") -> list[dict]: ] -@mcp.tool @traced_tool async def get_workflow(workflow_id: int) -> dict: """Fetch a single agent by id, including its current published definition.""" diff --git a/api/requirements.dev.txt b/api/requirements.dev.txt index 275079c..cebdad0 100644 --- a/api/requirements.dev.txt +++ b/api/requirements.dev.txt @@ -7,4 +7,5 @@ watchfiles==1.1.0 python-dotenv==1.2.1 datamodel-code-generator==0.56.1 twine==6.2.0 +build==1.2.2 -e ./sdk/python diff --git a/scripts/release_sdks.sh b/scripts/release_sdks.sh index 2eb88a8..2cf2c72 100755 --- a/scripts/release_sdks.sh +++ b/scripts/release_sdks.sh @@ -37,6 +37,19 @@ confirm() { [[ "$reply" =~ ^[Yy]$ ]] } +echo "→ Pre-flight checks..." +if ! command -v npm >/dev/null 2>&1; then + echo "error: npm not found in PATH" >&2 + exit 1 +fi +if ! NPM_USER="$(npm whoami 2>/dev/null)"; then + echo "error: not logged in to npm. Run 'npm login' as a member of the" >&2 + echo " dograh org before re-running this script — otherwise PyPI" >&2 + echo " will publish and npm will 404, leaving the release split." >&2 + exit 1 +fi +echo " npm: logged in as $NPM_USER" + echo "→ Regenerating typed SDK sources from node_specs..." ./scripts/generate_sdk.sh @@ -102,18 +115,18 @@ if confirm "Upload dograh-sdk==$VERSION to TestPyPI first (recommended)?"; then echo fi -if confirm "Upload dograh-sdk==$VERSION to PyPI?"; then - (cd sdk/python && twine upload dist/*) - echo " → https://pypi.org/project/dograh-sdk/$VERSION/" - echo -fi - if confirm "Publish @dograh/sdk@$VERSION to npm? (will prompt for 2FA OTP)"; then (cd sdk/typescript && npm publish --access public) echo " → https://www.npmjs.com/package/@dograh/sdk/v/$VERSION" echo fi +if confirm "Upload dograh-sdk==$VERSION to PyPI?"; then + (cd sdk/python && twine upload dist/*) + echo " → https://pypi.org/project/dograh-sdk/$VERSION/" + echo +fi + if confirm "Create annotated git tag sdks-v$VERSION at HEAD?"; then git tag -a "sdks-v$VERSION" -m "dograh-sdk + @dograh/sdk $VERSION" echo " → created tag (not pushed). Push with:" diff --git a/sdk/python/pyproject.toml b/sdk/python/pyproject.toml index a22f682..739e94d 100644 --- a/sdk/python/pyproject.toml +++ b/sdk/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dograh-sdk" -version = "0.1.3" +version = "0.1.4" description = "Typed builder for Dograh voice-AI workflows" readme = "README.md" requires-python = ">=3.10" diff --git a/sdk/python/src/dograh_sdk/_generated_models.py b/sdk/python/src/dograh_sdk/_generated_models.py index 6bfe2bd..00b7d1b 100644 --- a/sdk/python/src/dograh_sdk/_generated_models.py +++ b/sdk/python/src/dograh_sdk/_generated_models.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: -# filename: dograh-openapi-XXXXXX.json.e1fkh2B88V -# timestamp: 2026-04-25T11:03:19+00:00 +# filename: dograh-openapi-XXXXXX.json.W6Dd8pliVH +# timestamp: 2026-04-25T11:15:04+00:00 from __future__ import annotations diff --git a/sdk/typescript/package.json b/sdk/typescript/package.json index 37b0746..3fc7db8 100644 --- a/sdk/typescript/package.json +++ b/sdk/typescript/package.json @@ -1,6 +1,6 @@ { "name": "@dograh/sdk", - "version": "0.1.3", + "version": "0.1.4", "description": "Typed builder for Dograh voice-AI workflows", "license": "BSD-2-Clause", "author": "Zansat Technologies Private Limited",