feat: add test mode for API trigger

This commit is contained in:
Abhishek Kumar 2026-04-25 16:30:26 +05:30
parent f041e6030d
commit 4171ad7a54
13 changed files with 279 additions and 124 deletions

View file

@ -5,7 +5,9 @@ from api.mcp_server.instructions import DOGRAH_MCP_INSTRUCTIONS
mcp = FastMCP("dograh", instructions=DOGRAH_MCP_INSTRUCTIONS)
from api.mcp_server.tools import catalog as _catalog # noqa: E402, F401
from api.mcp_server.tools import get_workflow_code as _get_workflow_code # noqa: E402, F401
from api.mcp_server.tools import (
get_workflow_code as _get_workflow_code, # noqa: E402, F401
)
from api.mcp_server.tools import node_types as _node_types # noqa: E402, F401
from api.mcp_server.tools import save_workflow as _save_workflow # noqa: E402, F401
from api.mcp_server.tools import workflows as _workflows # noqa: E402, F401

View file

@ -53,27 +53,17 @@ def trigger_exists_in_workflow(workflow_definition: dict, trigger_path: str) ->
return False
@router.post("/{uuid}", response_model=TriggerCallResponse)
async def initiate_call(
async def _initiate_call(
uuid: str,
request: TriggerCallRequest,
x_api_key: str = Header(..., alias="X-API-Key"),
):
"""Initiate a phone call via API trigger.
x_api_key: str,
*,
use_draft: bool,
) -> TriggerCallResponse:
"""Shared core for production and test trigger endpoints.
This endpoint allows external systems (CRMs, automation tools, etc.) to
programmatically trigger outbound phone calls with custom context variables.
Args:
uuid: The unique trigger UUID
request: The call request with phone number and optional context
x_api_key: API key for authentication (passed in X-API-Key header)
Returns:
TriggerCallResponse with workflow run details
Raises:
HTTPException: Various error conditions (401, 403, 404, 400)
When ``use_draft`` is True the latest draft definition is executed;
otherwise the published (released) definition is used.
"""
# 1. Validate API key
api_key = await db_client.validate_api_key(x_api_key)
@ -98,14 +88,23 @@ async def initiate_call(
if not quota_result.has_quota:
raise HTTPException(status_code=402, detail=quota_result.error_message)
# 5. Get workflow and validate trigger exists in definition
# 5. Get workflow and resolve the definition (published vs draft)
workflow = await db_client.get_workflow_by_id(trigger.workflow_id)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
workflow_definition = workflow.released_definition.workflow_json
if use_draft:
draft = await db_client.get_draft_version(trigger.workflow_id)
# Fall back to the published definition when no draft exists, so the
# test URL always runs *something* — typically the same agent the
# production URL would run.
workflow_definition = (
draft.workflow_json if draft else workflow.released_definition.workflow_json
)
else:
workflow_definition = workflow.released_definition.workflow_json
# Validate trigger node still exists in the workflow definition
# Validate trigger node still exists in the resolved definition
if not trigger_exists_in_workflow(workflow_definition, uuid):
raise HTTPException(
status_code=404,
@ -126,7 +125,8 @@ async def initiate_call(
workflow_run_mode = provider.PROVIDER_NAME
# 8. Create workflow run
workflow_run_name = f"WR-API-{random.randint(1000, 9999)}"
mode_label = "TEST" if use_draft else "API"
workflow_run_name = f"WR-{mode_label}-{random.randint(1000, 9999)}"
workflow_run = await db_client.create_workflow_run(
name=workflow_run_name,
workflow_id=trigger.workflow_id,
@ -135,13 +135,16 @@ async def initiate_call(
"provider": provider.PROVIDER_NAME,
"phone_number": request.phone_number,
"agent_uuid": uuid,
"trigger_mode": "test" if use_draft else "production",
**(request.initial_context or {}),
},
user_id=api_key.created_by,
use_draft=use_draft,
)
logger.info(
f"Created workflow run {workflow_run.id} for API trigger {uuid} "
f"(mode={'test' if use_draft else 'production'}) "
f"to phone number {request.phone_number}"
)
@ -183,3 +186,30 @@ async def initiate_call(
workflow_run_id=workflow_run.id,
workflow_run_name=workflow_run_name,
)
@router.post("/{uuid}", response_model=TriggerCallResponse)
async def initiate_call(
uuid: str,
request: TriggerCallRequest,
x_api_key: str = Header(..., alias="X-API-Key"),
):
"""Initiate a phone call against the published agent.
Executes the workflow's currently released definition.
"""
return await _initiate_call(uuid, request, x_api_key, use_draft=False)
@router.post("/test/{uuid}", response_model=TriggerCallResponse)
async def initiate_call_test(
uuid: str,
request: TriggerCallRequest,
x_api_key: str = Header(..., alias="X-API-Key"),
):
"""Initiate a phone call against the latest draft of the agent.
Useful for verifying changes before publishing. Falls back to the
published definition when no draft exists.
"""
return await _initiate_call(uuid, request, x_api_key, use_draft=True)

View file

@ -56,7 +56,10 @@ async def get_user(
# ------------------------------------------------------------------
try:
user_model, user_was_created = await db_client.get_or_create_user_by_provider_id(stack_user["id"])
(
user_model,
user_was_created,
) = await db_client.get_or_create_user_by_provider_id(stack_user["id"])
# Sync email from Stack Auth if available and not already set
stack_email = stack_user.get("primary_email_verified") and stack_user.get(

View file

@ -660,7 +660,7 @@ TTSConfig = Annotated[
###################################################### STT ########################################################################
DEEPGRAM_STT_MODELS = ["nova-3-general", "flux-general-en"]
DEEPGRAM_STT_MODELS = ["nova-3-general", "flux-general-en", "flux-general-multi"]
DEEPGRAM_LANGUAGES = [
"multi",
"ar",

View file

@ -13,11 +13,16 @@ from api.services.workflow.node_specs._base import (
SPEC = NodeSpec(
name="trigger",
display_name="API Trigger",
description="Public HTTP endpoint that launches the workflow.",
description=("Public HTTP endpoints that launch the workflow."),
llm_hint=(
"Exposes a public HTTP POST endpoint. External systems call the URL "
"(derived from the auto-generated `trigger_path`) to launch this "
"workflow. Requires an API key in the `X-API-Key` header."
"Exposes two public HTTP POST endpoints derived from the auto-generated "
"`trigger_path`:\n"
" • Production: `<backend>/api/v1/public/agent/<trigger_path>` — runs "
"the published agent. Use this from production systems.\n"
" • Test: `<backend>/api/v1/public/agent/test/<trigger_path>` — runs "
"the latest draft, useful for verifying changes before publishing. "
"Falls back to the published agent when no draft exists.\n"
"Both require an API key in the `X-API-Key` header."
),
category=NodeCategory.trigger,
icon="Webhook",
@ -44,7 +49,12 @@ SPEC = NodeSpec(
display_name="Trigger Path",
description=(
"Auto-generated UUID-style path segment that uniquely "
"identifies this trigger. Do not edit manually."
"identifies this trigger. Used in both URLs:\n"
" • Production: `/api/v1/public/agent/<trigger_path>` — "
"executes the published agent.\n"
" • Test: `/api/v1/public/agent/test/<trigger_path>` — "
"executes the latest draft.\n"
"Do not edit manually."
),
),
],

View file

@ -13,8 +13,6 @@ Covers:
from __future__ import annotations
from typing import Any
import pytest
from dograh_sdk import Workflow
from dograh_sdk._generated_models import NodeSpec

View file

@ -9,8 +9,6 @@ Covers:
from __future__ import annotations
from typing import Any
import pytest
from dograh_sdk import Workflow
from dograh_sdk._generated_models import NodeSpec