mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
408 lines
13 KiB
Python
408 lines
13 KiB
Python
"""Public API endpoints for public agent execution.
|
|
|
|
These endpoints are accessible with API key authentication and allow
|
|
external systems to programmatically trigger phone calls.
|
|
"""
|
|
|
|
import random
|
|
from dataclasses import dataclass
|
|
from typing import Awaitable, Callable, Optional
|
|
|
|
from fastapi import APIRouter, Header, HTTPException
|
|
from loguru import logger
|
|
from pydantic import BaseModel
|
|
|
|
from api.db import db_client
|
|
from api.enums import TriggerState, WorkflowStatus
|
|
from api.services.quota_service import check_dograh_quota_by_user_id
|
|
from api.services.telephony.factory import (
|
|
get_default_telephony_provider,
|
|
get_telephony_provider_by_id,
|
|
)
|
|
from api.utils.common import get_backend_endpoints
|
|
|
|
router = APIRouter(prefix="/public/agent")
|
|
|
|
|
|
class TriggerCallRequest(BaseModel):
|
|
"""Request model for triggering a call via API"""
|
|
|
|
phone_number: str
|
|
initial_context: Optional[dict] = None
|
|
telephony_configuration_id: int | None = None
|
|
|
|
|
|
class TriggerCallResponse(BaseModel):
|
|
"""Response model for successful call initiation"""
|
|
|
|
status: str
|
|
workflow_run_id: int
|
|
workflow_run_name: str
|
|
|
|
|
|
@dataclass
|
|
class ResolvedAgentTarget:
|
|
workflow: object
|
|
organization_id: int
|
|
identifier_type: str
|
|
identifier_value: str
|
|
|
|
|
|
def trigger_exists_in_workflow(workflow_definition: dict, trigger_path: str) -> bool:
|
|
"""Check if trigger node exists in workflow definition.
|
|
|
|
Args:
|
|
workflow_definition: The workflow definition JSON
|
|
trigger_path: The trigger UUID to look for
|
|
|
|
Returns:
|
|
True if trigger node exists, False otherwise
|
|
"""
|
|
nodes = workflow_definition.get("nodes", [])
|
|
for node in nodes:
|
|
if node.get("type") == "trigger":
|
|
if node.get("data", {}).get("trigger_path") == trigger_path:
|
|
return True
|
|
return False
|
|
|
|
|
|
async def _validate_api_key(x_api_key: str):
|
|
"""Validate the org API key used to invoke a public agent endpoint."""
|
|
api_key = await db_client.validate_api_key(x_api_key)
|
|
if not api_key:
|
|
raise HTTPException(status_code=401, detail="Invalid API key")
|
|
return api_key
|
|
|
|
|
|
def _ensure_workflow_is_active(workflow) -> None:
|
|
if workflow.status != WorkflowStatus.ACTIVE.value:
|
|
raise HTTPException(status_code=404, detail="Workflow is not active")
|
|
|
|
|
|
def _get_execution_user_id(workflow) -> int:
|
|
if workflow.user_id is None:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Workflow has no execution owner",
|
|
)
|
|
return workflow.user_id
|
|
|
|
|
|
async def _get_workflow_definition_for_execution(workflow, *, use_draft: bool) -> dict:
|
|
"""Return the definition that would execute for this public agent request."""
|
|
if use_draft:
|
|
draft = await db_client.get_draft_version(workflow.id)
|
|
if draft:
|
|
return draft.workflow_json
|
|
|
|
if workflow.released_definition is None:
|
|
raise HTTPException(
|
|
status_code=404, detail="Workflow has no published definition"
|
|
)
|
|
|
|
return workflow.released_definition.workflow_json
|
|
|
|
|
|
async def _resolve_trigger_target(
|
|
trigger_path: str,
|
|
organization_id: int,
|
|
*,
|
|
use_draft: bool,
|
|
) -> ResolvedAgentTarget:
|
|
"""Resolve a trigger UUID to a workflow, scoped to the API key's org."""
|
|
trigger = await db_client.get_agent_trigger_by_path(trigger_path)
|
|
if not trigger:
|
|
raise HTTPException(status_code=404, detail="Agent trigger not found")
|
|
|
|
if organization_id != trigger.organization_id:
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
|
|
|
if trigger.state != TriggerState.ACTIVE.value:
|
|
raise HTTPException(status_code=404, detail="Agent trigger is not active")
|
|
|
|
workflow = await db_client.get_workflow(
|
|
trigger.workflow_id,
|
|
organization_id=organization_id,
|
|
)
|
|
if not workflow:
|
|
raise HTTPException(status_code=404, detail="Workflow not found")
|
|
|
|
_ensure_workflow_is_active(workflow)
|
|
workflow_definition = await _get_workflow_definition_for_execution(
|
|
workflow,
|
|
use_draft=use_draft,
|
|
)
|
|
if not trigger_exists_in_workflow(workflow_definition, trigger_path):
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="Trigger not found in the selected Agent",
|
|
)
|
|
|
|
return ResolvedAgentTarget(
|
|
workflow=workflow,
|
|
organization_id=organization_id,
|
|
identifier_type="trigger_path",
|
|
identifier_value=trigger_path,
|
|
)
|
|
|
|
|
|
async def _resolve_workflow_uuid_target(
|
|
workflow_uuid: str,
|
|
organization_id: int,
|
|
*,
|
|
use_draft: bool,
|
|
) -> ResolvedAgentTarget:
|
|
"""Resolve a workflow UUID directly, scoped to the API key's org."""
|
|
workflow = await db_client.get_workflow_by_uuid(workflow_uuid, organization_id)
|
|
if not workflow:
|
|
raise HTTPException(status_code=404, detail="Workflow not found")
|
|
|
|
_ensure_workflow_is_active(workflow)
|
|
await _get_workflow_definition_for_execution(workflow, use_draft=use_draft)
|
|
|
|
return ResolvedAgentTarget(
|
|
workflow=workflow,
|
|
organization_id=organization_id,
|
|
identifier_type="workflow_uuid",
|
|
identifier_value=workflow_uuid,
|
|
)
|
|
|
|
|
|
async def _execute_resolved_target(
|
|
target: ResolvedAgentTarget,
|
|
request: TriggerCallRequest,
|
|
*,
|
|
use_draft: bool,
|
|
api_key_id: int | None,
|
|
api_key_created_by: int | None,
|
|
) -> TriggerCallResponse:
|
|
"""Shared execution path once the target workflow has been resolved."""
|
|
execution_user_id = _get_execution_user_id(target.workflow)
|
|
|
|
# Check Dograh quota using the workflow owner's config and model overrides.
|
|
quota_result = await check_dograh_quota_by_user_id(
|
|
execution_user_id,
|
|
workflow_id=target.workflow.id,
|
|
)
|
|
if not quota_result.has_quota:
|
|
raise HTTPException(status_code=402, detail=quota_result.error_message)
|
|
|
|
# Get telephony provider — either the caller-specified config (validated
|
|
# against the workflow's org) or the org's default config.
|
|
if request.telephony_configuration_id is not None:
|
|
cfg = await db_client.get_telephony_configuration_for_org(
|
|
request.telephony_configuration_id,
|
|
target.organization_id,
|
|
)
|
|
if not cfg:
|
|
raise HTTPException(
|
|
status_code=404, detail="Telephony configuration not found"
|
|
)
|
|
try:
|
|
provider = await get_telephony_provider_by_id(
|
|
cfg.id, target.organization_id
|
|
)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Telephony provider not configured for this configuration",
|
|
)
|
|
resolved_cfg_id = cfg.id
|
|
else:
|
|
try:
|
|
provider = await get_default_telephony_provider(target.organization_id)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Telephony provider not configured for this organization",
|
|
)
|
|
default_cfg = await db_client.get_default_telephony_configuration(
|
|
target.organization_id
|
|
)
|
|
resolved_cfg_id = default_cfg.id if default_cfg else None
|
|
|
|
# Validate provider is configured
|
|
if not provider.validate_config():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Telephony provider not configured for this organization",
|
|
)
|
|
|
|
# 7. Determine the workflow run mode based on provider type
|
|
workflow_run_mode = provider.PROVIDER_NAME
|
|
|
|
# 8. Create workflow run
|
|
mode_label = "TEST" if use_draft else "API"
|
|
workflow_run_name = f"WR-{mode_label}-{random.randint(1000, 9999)}"
|
|
initial_context = {
|
|
"provider": provider.PROVIDER_NAME,
|
|
"phone_number": request.phone_number,
|
|
"trigger_mode": "test" if use_draft else "production",
|
|
"telephony_configuration_id": resolved_cfg_id,
|
|
"agent_identifier": target.identifier_value,
|
|
"agent_identifier_type": target.identifier_type,
|
|
"workflow_uuid": target.workflow.workflow_uuid,
|
|
}
|
|
if target.identifier_type == "trigger_path":
|
|
initial_context["agent_uuid"] = target.identifier_value
|
|
if api_key_id is not None:
|
|
initial_context["api_key_id"] = api_key_id
|
|
if api_key_created_by is not None:
|
|
initial_context["api_key_created_by"] = api_key_created_by
|
|
initial_context.update(request.initial_context or {})
|
|
|
|
workflow_run = await db_client.create_workflow_run(
|
|
name=workflow_run_name,
|
|
workflow_id=target.workflow.id,
|
|
mode=workflow_run_mode,
|
|
initial_context=initial_context,
|
|
user_id=execution_user_id,
|
|
use_draft=use_draft,
|
|
organization_id=target.organization_id,
|
|
)
|
|
|
|
logger.info(
|
|
f"Created workflow run {workflow_run.id} for public agent "
|
|
f"{target.identifier_type}={target.identifier_value} "
|
|
f"(mode={'test' if use_draft else 'production'}) "
|
|
f"to phone number {request.phone_number}"
|
|
)
|
|
|
|
# 9. Construct webhook URL for telephony provider callback
|
|
backend_endpoint, _ = await get_backend_endpoints()
|
|
webhook_endpoint = provider.WEBHOOK_ENDPOINT
|
|
|
|
webhook_url = (
|
|
f"{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
|
|
f"?workflow_id={target.workflow.id}"
|
|
f"&user_id={execution_user_id}"
|
|
f"&workflow_run_id={workflow_run.id}"
|
|
f"&organization_id={target.organization_id}"
|
|
)
|
|
|
|
# 10. Initiate call via telephony provider. workflow_id and user_id are
|
|
# required by providers that build the media WebSocket URL at dial time
|
|
# (e.g. Telnyx, Cloudonix); without them the URL contains "None/None" and
|
|
# the stream connection fails.
|
|
try:
|
|
await provider.initiate_call(
|
|
to_number=request.phone_number,
|
|
webhook_url=webhook_url,
|
|
workflow_run_id=workflow_run.id,
|
|
workflow_id=target.workflow.id,
|
|
user_id=execution_user_id,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to initiate call for workflow run {workflow_run.id}: {e}"
|
|
)
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Failed to initiate call: {e}",
|
|
)
|
|
|
|
logger.info(
|
|
f"Call initiated successfully for workflow run {workflow_run.id} "
|
|
f"via {target.identifier_type}={target.identifier_value}"
|
|
)
|
|
|
|
return TriggerCallResponse(
|
|
status="initiated",
|
|
workflow_run_id=workflow_run.id,
|
|
workflow_run_name=workflow_run_name,
|
|
)
|
|
|
|
|
|
async def _initiate_call(
|
|
identifier: str,
|
|
request: TriggerCallRequest,
|
|
x_api_key: str,
|
|
*,
|
|
use_draft: bool,
|
|
target_resolver: Callable[..., Awaitable[ResolvedAgentTarget]],
|
|
) -> TriggerCallResponse:
|
|
"""Resolve the requested public target, then execute the common call flow."""
|
|
api_key = await _validate_api_key(x_api_key)
|
|
target = await target_resolver(
|
|
identifier,
|
|
api_key.organization_id,
|
|
use_draft=use_draft,
|
|
)
|
|
return await _execute_resolved_target(
|
|
target,
|
|
request,
|
|
use_draft=use_draft,
|
|
api_key_id=api_key.id,
|
|
api_key_created_by=api_key.created_by,
|
|
)
|
|
|
|
|
|
@router.post("/{uuid}", response_model=TriggerCallResponse)
|
|
async def initiate_call(
|
|
uuid: str,
|
|
request: TriggerCallRequest,
|
|
x_api_key: str = Header(..., alias="X-API-Key"),
|
|
):
|
|
"""Initiate a phone call against the published agent.
|
|
|
|
Executes the workflow's currently released definition.
|
|
"""
|
|
return await _initiate_call(
|
|
uuid,
|
|
request,
|
|
x_api_key,
|
|
use_draft=False,
|
|
target_resolver=_resolve_trigger_target,
|
|
)
|
|
|
|
|
|
@router.post("/test/{uuid}", response_model=TriggerCallResponse)
|
|
async def initiate_call_test(
|
|
uuid: str,
|
|
request: TriggerCallRequest,
|
|
x_api_key: str = Header(..., alias="X-API-Key"),
|
|
):
|
|
"""Initiate a phone call against the latest draft of the agent.
|
|
|
|
Useful for verifying changes before publishing. Falls back to the
|
|
published definition when no draft exists.
|
|
"""
|
|
return await _initiate_call(
|
|
uuid,
|
|
request,
|
|
x_api_key,
|
|
use_draft=True,
|
|
target_resolver=_resolve_trigger_target,
|
|
)
|
|
|
|
|
|
@router.post("/workflow/{workflow_uuid}", response_model=TriggerCallResponse)
|
|
async def initiate_call_by_workflow_uuid(
|
|
workflow_uuid: str,
|
|
request: TriggerCallRequest,
|
|
x_api_key: str = Header(..., alias="X-API-Key"),
|
|
):
|
|
"""Initiate a phone call against the published workflow identified by UUID."""
|
|
return await _initiate_call(
|
|
workflow_uuid,
|
|
request,
|
|
x_api_key,
|
|
use_draft=False,
|
|
target_resolver=_resolve_workflow_uuid_target,
|
|
)
|
|
|
|
|
|
@router.post("/test/workflow/{workflow_uuid}", response_model=TriggerCallResponse)
|
|
async def initiate_call_test_by_workflow_uuid(
|
|
workflow_uuid: str,
|
|
request: TriggerCallRequest,
|
|
x_api_key: str = Header(..., alias="X-API-Key"),
|
|
):
|
|
"""Initiate a phone call against the latest draft of the workflow by UUID."""
|
|
return await _initiate_call(
|
|
workflow_uuid,
|
|
request,
|
|
x_api_key,
|
|
use_draft=True,
|
|
target_resolver=_resolve_workflow_uuid_target,
|
|
)
|