dograh/api/routes/public_agent.py
2026-04-30 11:35:32 +05:30

234 lines
8 KiB
Python

"""Public API endpoints for agent triggers.
These endpoints are accessible with API key authentication and allow
external systems to programmatically trigger phone calls.
"""
import random
from typing import Optional
from fastapi import APIRouter, Header, HTTPException
from loguru import logger
from pydantic import BaseModel
from api.db import db_client
from api.enums import TriggerState
from api.services.quota_service import check_dograh_quota_by_user_id
from api.services.telephony.factory import get_default_telephony_provider
from api.utils.common import get_backend_endpoints
router = APIRouter(prefix="/public/agent")
class TriggerCallRequest(BaseModel):
"""Request model for triggering a call via API"""
phone_number: str
initial_context: Optional[dict] = None
class TriggerCallResponse(BaseModel):
"""Response model for successful call initiation"""
status: str
workflow_run_id: int
workflow_run_name: str
def trigger_exists_in_workflow(workflow_definition: dict, trigger_path: str) -> bool:
"""Check if trigger node exists in workflow definition.
Args:
workflow_definition: The workflow definition JSON
trigger_path: The trigger UUID to look for
Returns:
True if trigger node exists, False otherwise
"""
nodes = workflow_definition.get("nodes", [])
for node in nodes:
if node.get("type") == "trigger":
if node.get("data", {}).get("trigger_path") == trigger_path:
return True
return False
async def _initiate_call(
uuid: str,
request: TriggerCallRequest,
x_api_key: str,
*,
use_draft: bool,
) -> TriggerCallResponse:
"""Shared core for production and test trigger endpoints.
When ``use_draft`` is True the latest draft definition is executed;
otherwise the published (released) definition is used.
"""
# 1. Validate API key
api_key = await db_client.validate_api_key(x_api_key)
if not api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
# 2. Lookup agent trigger by UUID
trigger = await db_client.get_agent_trigger_by_path(uuid)
if not trigger:
raise HTTPException(status_code=404, detail="Agent trigger not found")
# 3. Validate organization match (API key org must match trigger org)
if api_key.organization_id != trigger.organization_id:
raise HTTPException(status_code=403, detail="Access denied")
# 4. Validate trigger is active
if trigger.state != TriggerState.ACTIVE.value:
raise HTTPException(status_code=404, detail="Agent trigger is not active")
# 4.5 Check Dograh quota before initiating the call (apply the trigger's
# workflow's model_overrides so we evaluate the keys this run will use).
quota_result = await check_dograh_quota_by_user_id(
api_key.created_by, workflow_id=trigger.workflow_id
)
if not quota_result.has_quota:
raise HTTPException(status_code=402, detail=quota_result.error_message)
# 5. Get workflow and resolve the definition (published vs draft)
workflow = await db_client.get_workflow_by_id(trigger.workflow_id)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
if use_draft:
draft = await db_client.get_draft_version(trigger.workflow_id)
# Fall back to the published definition when no draft exists, so the
# test URL always runs *something* — typically the same agent the
# production URL would run.
workflow_definition = (
draft.workflow_json if draft else workflow.released_definition.workflow_json
)
else:
workflow_definition = workflow.released_definition.workflow_json
# Validate trigger node still exists in the resolved definition
if not trigger_exists_in_workflow(workflow_definition, uuid):
raise HTTPException(
status_code=404,
detail="Trigger not found in the published Agent",
)
# 6. Get telephony provider for the organization (using its default config).
try:
provider = await get_default_telephony_provider(trigger.organization_id)
except ValueError:
raise HTTPException(
status_code=400,
detail="Telephony provider not configured for this organization",
)
# Validate provider is configured
if not provider.validate_config():
raise HTTPException(
status_code=400,
detail="Telephony provider not configured for this organization",
)
default_cfg = await db_client.get_default_telephony_configuration(
trigger.organization_id
)
# 7. Determine the workflow run mode based on provider type
workflow_run_mode = provider.PROVIDER_NAME
# 8. Create workflow run
mode_label = "TEST" if use_draft else "API"
workflow_run_name = f"WR-{mode_label}-{random.randint(1000, 9999)}"
workflow_run = await db_client.create_workflow_run(
name=workflow_run_name,
workflow_id=trigger.workflow_id,
mode=workflow_run_mode,
initial_context={
"provider": provider.PROVIDER_NAME,
"phone_number": request.phone_number,
"agent_uuid": uuid,
"trigger_mode": "test" if use_draft else "production",
"telephony_configuration_id": default_cfg.id if default_cfg else None,
**(request.initial_context or {}),
},
user_id=api_key.created_by,
use_draft=use_draft,
)
logger.info(
f"Created workflow run {workflow_run.id} for API trigger {uuid} "
f"(mode={'test' if use_draft else 'production'}) "
f"to phone number {request.phone_number}"
)
# 9. Construct webhook URL for telephony provider callback
backend_endpoint, _ = await get_backend_endpoints()
webhook_endpoint = provider.WEBHOOK_ENDPOINT
webhook_url = (
f"{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
f"?workflow_id={trigger.workflow_id}"
f"&user_id={api_key.created_by}"
f"&workflow_run_id={workflow_run.id}"
f"&organization_id={trigger.organization_id}"
)
# 10. Initiate call via telephony provider. workflow_id and user_id are
# required by providers that build the media WebSocket URL at dial time
# (e.g. Telnyx, Cloudonix); without them the URL contains "None/None" and
# the stream connection fails.
try:
await provider.initiate_call(
to_number=request.phone_number,
webhook_url=webhook_url,
workflow_run_id=workflow_run.id,
workflow_id=trigger.workflow_id,
user_id=api_key.created_by,
)
except Exception as e:
logger.warning(
f"Failed to initiate call for workflow run {workflow_run.id}: {e}"
)
raise HTTPException(
status_code=400,
detail=f"Failed to initiate call: {e}",
)
logger.info(
f"Call initiated successfully for workflow run {workflow_run.id} "
f"via trigger {uuid}"
)
return TriggerCallResponse(
status="initiated",
workflow_run_id=workflow_run.id,
workflow_run_name=workflow_run_name,
)
@router.post("/{uuid}", response_model=TriggerCallResponse)
async def initiate_call(
uuid: str,
request: TriggerCallRequest,
x_api_key: str = Header(..., alias="X-API-Key"),
):
"""Initiate a phone call against the published agent.
Executes the workflow's currently released definition.
"""
return await _initiate_call(uuid, request, x_api_key, use_draft=False)
@router.post("/test/{uuid}", response_model=TriggerCallResponse)
async def initiate_call_test(
uuid: str,
request: TriggerCallRequest,
x_api_key: str = Header(..., alias="X-API-Key"),
):
"""Initiate a phone call against the latest draft of the agent.
Useful for verifying changes before publishing. Falls back to the
published definition when no draft exists.
"""
return await _initiate_call(uuid, request, x_api_key, use_draft=True)