mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
475 lines
No EOL
17 KiB
Python
475 lines
No EOL
17 KiB
Python
"""
|
|
Generic telephony routes that work with any telephony provider.
|
|
"""
|
|
import json
|
|
import random
|
|
from datetime import UTC, datetime
|
|
from typing import Annotated, Optional
|
|
|
|
from fastapi import APIRouter, Depends, Form, Header, HTTPException, Request, WebSocket
|
|
from loguru import logger
|
|
from pydantic import BaseModel
|
|
from starlette.responses import HTMLResponse
|
|
|
|
from api.db import db_client
|
|
from api.db.models import UserModel
|
|
from api.enums import WorkflowRunMode
|
|
from api.services.auth.depends import get_user
|
|
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
|
|
from api.services.campaign.campaign_event_publisher import get_campaign_event_publisher
|
|
from api.services.telephony.factory import get_telephony_provider
|
|
from api.utils.tunnel import TunnelURLProvider
|
|
from pipecat.utils.context import set_current_run_id
|
|
|
|
router = APIRouter(prefix="/telephony")
|
|
|
|
|
|
class InitiateCallRequest(BaseModel):
|
|
workflow_id: int
|
|
workflow_run_id: int | None = None
|
|
phone_number: str | None = None
|
|
|
|
|
|
class StatusCallbackRequest(BaseModel):
|
|
"""Generic status callback that can handle different providers"""
|
|
# Common fields
|
|
call_id: str
|
|
status: str
|
|
from_number: Optional[str] = None
|
|
to_number: Optional[str] = None
|
|
direction: Optional[str] = None
|
|
duration: Optional[str] = None
|
|
|
|
# Provider-specific fields stored as extra
|
|
extra: dict = {}
|
|
|
|
@classmethod
|
|
def from_twilio(cls, data: dict):
|
|
"""Convert Twilio callback to generic format"""
|
|
return cls(
|
|
call_id=data.get("CallSid", ""),
|
|
status=data.get("CallStatus", ""),
|
|
from_number=data.get("From"),
|
|
to_number=data.get("To"),
|
|
direction=data.get("Direction"),
|
|
duration=data.get("CallDuration") or data.get("Duration"),
|
|
extra=data
|
|
)
|
|
|
|
@classmethod
|
|
def from_vonage(cls, data: dict):
|
|
"""Convert Vonage event to generic format"""
|
|
# Map Vonage status to common format
|
|
status_map = {
|
|
"started": "initiated",
|
|
"ringing": "ringing",
|
|
"answered": "answered",
|
|
"complete": "completed",
|
|
"failed": "failed",
|
|
"busy": "busy",
|
|
"timeout": "no-answer",
|
|
"rejected": "busy"
|
|
}
|
|
|
|
return cls(
|
|
call_id=data.get("uuid", ""),
|
|
status=status_map.get(data.get("status", ""), data.get("status", "")),
|
|
from_number=data.get("from"),
|
|
to_number=data.get("to"),
|
|
direction=data.get("direction"),
|
|
duration=data.get("duration"),
|
|
extra=data
|
|
)
|
|
|
|
|
|
@router.post("/initiate-call")
|
|
async def initiate_call(
|
|
request: InitiateCallRequest, user: UserModel = Depends(get_user)
|
|
):
|
|
"""Initiate a call using the configured telephony provider."""
|
|
|
|
# Get the telephony provider for the organization
|
|
provider = await get_telephony_provider(user.selected_organization_id)
|
|
|
|
# Validate provider is configured
|
|
if not provider.validate_config():
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="telephony_not_configured",
|
|
)
|
|
|
|
# Determine the workflow run mode based on provider type
|
|
workflow_run_mode = provider.PROVIDER_NAME
|
|
|
|
user_configuration = await db_client.get_user_configurations(user.id)
|
|
|
|
phone_number = request.phone_number or user_configuration.test_phone_number
|
|
|
|
if not phone_number:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Phone number must be provided in request or set in user configuration"
|
|
)
|
|
|
|
workflow_run_id = request.workflow_run_id
|
|
|
|
if not workflow_run_id:
|
|
workflow_run_name = f"WR-TEL-{random.randint(1000, 9999)}"
|
|
workflow_run = await db_client.create_workflow_run(
|
|
workflow_run_name,
|
|
request.workflow_id,
|
|
workflow_run_mode,
|
|
initial_context={
|
|
"phone_number": phone_number,
|
|
},
|
|
user_id=user.id,
|
|
)
|
|
workflow_run_id = workflow_run.id
|
|
else:
|
|
workflow_run = await db_client.get_workflow_run(workflow_run_id, user.id)
|
|
if not workflow_run:
|
|
raise HTTPException(status_code=400, detail="Workflow run not found")
|
|
workflow_run_name = workflow_run.name
|
|
|
|
# Construct webhook URL based on provider type
|
|
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
|
|
|
webhook_endpoint = provider.WEBHOOK_ENDPOINT
|
|
|
|
webhook_url = (
|
|
f"https://{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
|
|
f"?workflow_id={request.workflow_id}"
|
|
f"&user_id={user.id}"
|
|
f"&workflow_run_id={workflow_run_id}"
|
|
f"&organization_id={user.selected_organization_id}"
|
|
)
|
|
|
|
# Initiate call via provider
|
|
result = await provider.initiate_call(
|
|
to_number=phone_number,
|
|
webhook_url=webhook_url,
|
|
workflow_run_id=workflow_run_id,
|
|
)
|
|
|
|
# Store provider type and any provider-specific metadata in workflow run context
|
|
gathered_context = {
|
|
"provider": provider.PROVIDER_NAME,
|
|
**(result.provider_metadata or {})
|
|
}
|
|
await db_client.update_workflow_run(
|
|
run_id=workflow_run_id,
|
|
gathered_context=gathered_context
|
|
)
|
|
|
|
return {
|
|
"message": f"Call initiated successfully with run name {workflow_run_name}"
|
|
}
|
|
|
|
|
|
@router.post("/twiml", include_in_schema=False)
|
|
async def handle_twiml_webhook(
|
|
workflow_id: int,
|
|
user_id: int,
|
|
workflow_run_id: int,
|
|
organization_id: int
|
|
):
|
|
"""
|
|
Handle initial webhook from telephony provider.
|
|
Returns provider-specific response (e.g., TwiML for Twilio).
|
|
"""
|
|
|
|
provider = await get_telephony_provider(organization_id)
|
|
|
|
response_content = await provider.get_webhook_response(
|
|
workflow_id, user_id, workflow_run_id
|
|
)
|
|
|
|
return HTMLResponse(content=response_content, media_type="application/xml")
|
|
|
|
|
|
@router.get("/ncco", include_in_schema=False)
|
|
async def handle_ncco_webhook(
|
|
workflow_id: int,
|
|
user_id: int,
|
|
workflow_run_id: int,
|
|
organization_id: Optional[int] = None
|
|
):
|
|
"""Handle NCCO (Nexmo Call Control Objects) webhook for Vonage.
|
|
|
|
Returns JSON response instead of XML like TwiML.
|
|
"""
|
|
|
|
provider = await get_telephony_provider(organization_id or user_id)
|
|
|
|
response_content = await provider.get_webhook_response(
|
|
workflow_id, user_id, workflow_run_id
|
|
)
|
|
|
|
return json.loads(response_content)
|
|
|
|
|
|
@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
|
|
async def websocket_endpoint(
|
|
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
|
|
):
|
|
"""WebSocket endpoint for real-time call handling - routes to provider-specific handlers."""
|
|
await websocket.accept()
|
|
|
|
try:
|
|
# Set the run context
|
|
set_current_run_id(workflow_run_id)
|
|
|
|
# Get workflow run to determine provider type
|
|
workflow_run = await db_client.get_workflow_run(workflow_run_id)
|
|
if not workflow_run:
|
|
logger.error(f"Workflow run {workflow_run_id} not found")
|
|
await websocket.close(code=4404, reason="Workflow run not found")
|
|
return
|
|
|
|
# Get workflow for organization info
|
|
workflow = await db_client.get_workflow(workflow_id)
|
|
if not workflow:
|
|
logger.error(f"Workflow {workflow_id} not found")
|
|
await websocket.close(code=4404, reason="Workflow not found")
|
|
return
|
|
|
|
# Extract provider type from workflow run context
|
|
provider_type = None
|
|
if workflow_run.gathered_context:
|
|
provider_type = workflow_run.gathered_context.get("provider")
|
|
|
|
if not provider_type:
|
|
logger.error(f"No provider type found in workflow run {workflow_run_id}")
|
|
await websocket.close(code=4400, reason="Provider type not found")
|
|
return
|
|
|
|
logger.info(f"WebSocket connected for {provider_type} provider, workflow_run {workflow_run_id}")
|
|
|
|
# Get the telephony provider instance
|
|
provider = await get_telephony_provider(workflow.organization_id)
|
|
|
|
# Verify the provider matches what was stored
|
|
if provider.PROVIDER_NAME != provider_type:
|
|
logger.error(
|
|
f"Provider mismatch: expected {provider_type}, got {provider.PROVIDER_NAME}"
|
|
)
|
|
await websocket.close(code=4400, reason="Provider mismatch")
|
|
return
|
|
|
|
# Delegate to provider-specific handler
|
|
await provider.handle_websocket(websocket, workflow_id, user_id, workflow_run_id)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in WebSocket connection: {e}")
|
|
await websocket.close(1011, "Internal server error")
|
|
|
|
|
|
@router.post("/twilio/status-callback/{workflow_run_id}")
|
|
async def handle_twilio_status_callback(
|
|
workflow_run_id: int,
|
|
request: Request,
|
|
x_webhook_signature: Optional[str] = Header(None),
|
|
):
|
|
"""Handle Twilio-specific status callbacks."""
|
|
|
|
# Parse form data
|
|
form_data = await request.form()
|
|
callback_data = dict(form_data)
|
|
|
|
logger.info(
|
|
f"[run {workflow_run_id}] Received status callback: {json.dumps(callback_data)}"
|
|
)
|
|
|
|
# Get workflow run to find organization
|
|
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
|
if not workflow_run:
|
|
logger.warning(f"Workflow run {workflow_run_id} not found for status callback")
|
|
return {"status": "ignored", "reason": "workflow_run_not_found"}
|
|
|
|
# Get workflow and provider
|
|
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
|
|
if not workflow:
|
|
logger.warning(f"Workflow {workflow_run.workflow_id} not found")
|
|
return {"status": "ignored", "reason": "workflow_not_found"}
|
|
|
|
provider = await get_telephony_provider(workflow.organization_id)
|
|
|
|
if x_webhook_signature:
|
|
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
|
full_url = f"https://{backend_endpoint}/api/v1/telephony/twilio/status-callback/{workflow_run_id}"
|
|
|
|
is_valid = await provider.verify_webhook_signature(
|
|
full_url, callback_data, x_webhook_signature
|
|
)
|
|
|
|
if not is_valid:
|
|
logger.warning(f"Invalid webhook signature for workflow run {workflow_run_id}")
|
|
return {"status": "error", "reason": "invalid_signature"}
|
|
|
|
# Parse the callback data into generic format
|
|
parsed_data = provider.parse_status_callback(callback_data)
|
|
|
|
# Create StatusCallbackRequest from parsed data
|
|
status_update = StatusCallbackRequest(
|
|
call_id=parsed_data["call_id"],
|
|
status=parsed_data["status"],
|
|
from_number=parsed_data.get("from_number"),
|
|
to_number=parsed_data.get("to_number"),
|
|
direction=parsed_data.get("direction"),
|
|
duration=parsed_data.get("duration"),
|
|
extra=parsed_data.get("extra", {})
|
|
)
|
|
|
|
# Process the status update
|
|
await _process_status_update(workflow_run_id, status_update, workflow_run)
|
|
|
|
return {"status": "success"}
|
|
|
|
|
|
async def _process_status_update(
|
|
workflow_run_id: int,
|
|
status: StatusCallbackRequest,
|
|
workflow_run: any
|
|
):
|
|
"""Process status updates from telephony providers."""
|
|
|
|
# Log the status callback
|
|
telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", [])
|
|
telephony_callback_log = {
|
|
"status": status.status,
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"call_id": status.call_id,
|
|
"duration": status.duration,
|
|
**status.extra # Include provider-specific data
|
|
}
|
|
telephony_callback_logs.append(telephony_callback_log)
|
|
|
|
# Update workflow run logs
|
|
await db_client.update_workflow_run(
|
|
run_id=workflow_run_id,
|
|
logs={"telephony_status_callbacks": telephony_callback_logs},
|
|
)
|
|
|
|
# Handle call completion
|
|
if status.status == "completed":
|
|
logger.info(
|
|
f"[run {workflow_run_id}] Call completed with duration: {status.duration}s"
|
|
)
|
|
|
|
# Release concurrent slot if this was a campaign call
|
|
if workflow_run.campaign_id:
|
|
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
|
|
|
|
# Mark workflow run as completed
|
|
await db_client.update_workflow_run(
|
|
run_id=workflow_run_id, is_completed=True
|
|
)
|
|
|
|
# Publish campaign event if applicable
|
|
if workflow_run.campaign_id:
|
|
publisher = await get_campaign_event_publisher()
|
|
await publisher.publish_call_completed(
|
|
campaign_id=workflow_run.campaign_id,
|
|
workflow_run_id=workflow_run_id,
|
|
queued_run_id=workflow_run.queued_run_id,
|
|
call_duration=int(status.duration) if status.duration else 0,
|
|
)
|
|
|
|
elif status.status in ["failed", "busy", "no-answer", "canceled"]:
|
|
logger.warning(f"[run {workflow_run_id}] Call failed with status: {status.status}")
|
|
|
|
# Release concurrent slot for terminal statuses if this was a campaign call
|
|
if workflow_run.campaign_id:
|
|
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
|
|
|
|
# Check if retry is needed for campaign calls (busy/no-answer)
|
|
if status.status in ["busy", "no-answer"] and workflow_run.campaign_id:
|
|
publisher = await get_campaign_event_publisher()
|
|
await publisher.publish_retry_needed(
|
|
workflow_run_id=workflow_run_id,
|
|
reason=status.status.replace("-", "_"), # Convert no-answer to no_answer
|
|
campaign_id=workflow_run.campaign_id,
|
|
queued_run_id=workflow_run.queued_run_id,
|
|
)
|
|
|
|
# Mark workflow run as completed with failure tags
|
|
call_tags = workflow_run.gathered_context.get("call_tags", []) if workflow_run.gathered_context else []
|
|
call_tags.extend(["not_connected", f"telephony_{status.status.lower()}"])
|
|
|
|
await db_client.update_workflow_run(
|
|
run_id=workflow_run_id,
|
|
is_completed=True,
|
|
gathered_context={"call_tags": call_tags}
|
|
)
|
|
|
|
|
|
@router.post("/vonage/events/{workflow_run_id}")
|
|
async def handle_vonage_events(
|
|
request: Request,
|
|
workflow_run_id: int,
|
|
):
|
|
"""Handle Vonage-specific event webhooks.
|
|
|
|
Vonage sends all call events to a single endpoint.
|
|
Events include: started, ringing, answered, complete, failed, etc.
|
|
"""
|
|
# Parse the event data
|
|
event_data = await request.json()
|
|
logger.info(f"[run {workflow_run_id}] Received Vonage event: {event_data}")
|
|
|
|
# Get workflow run for processing
|
|
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
|
if not workflow_run:
|
|
logger.error(f"[run {workflow_run_id}] Workflow run not found")
|
|
return {"status": "error", "message": "Workflow run not found"}
|
|
|
|
# For a completed call that includes cost info, capture it immediately
|
|
if event_data.get("status") == "completed":
|
|
# Vonage sometimes includes price info in the webhook
|
|
if "price" in event_data or "rate" in event_data:
|
|
try:
|
|
if workflow_run.cost_info:
|
|
# Store immediate cost info if available
|
|
cost_info = workflow_run.cost_info.copy()
|
|
if "price" in event_data:
|
|
cost_info["vonage_webhook_price"] = float(event_data["price"])
|
|
if "rate" in event_data:
|
|
cost_info["vonage_webhook_rate"] = float(event_data["rate"])
|
|
if "duration" in event_data:
|
|
cost_info["vonage_webhook_duration"] = int(event_data["duration"])
|
|
|
|
await db_client.update_workflow_run(
|
|
run_id=workflow_run_id,
|
|
cost_info=cost_info
|
|
)
|
|
logger.info(f"[run {workflow_run_id}] Captured Vonage cost info from webhook")
|
|
except Exception as e:
|
|
logger.error(f"[run {workflow_run_id}] Failed to capture Vonage cost from webhook: {e}")
|
|
|
|
# Get workflow and provider
|
|
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
|
|
if not workflow:
|
|
logger.error(f"[run {workflow_run_id}] Workflow not found")
|
|
return {"status": "error", "message": "Workflow not found"}
|
|
|
|
provider = await get_telephony_provider(workflow.organization_id)
|
|
|
|
# Parse the event data into generic format
|
|
parsed_data = provider.parse_status_callback(event_data)
|
|
|
|
# Create StatusCallbackRequest from parsed data
|
|
status_update = StatusCallbackRequest(
|
|
call_id=parsed_data["call_id"],
|
|
status=parsed_data["status"],
|
|
from_number=parsed_data.get("from_number"),
|
|
to_number=parsed_data.get("to_number"),
|
|
direction=parsed_data.get("direction"),
|
|
duration=parsed_data.get("duration"),
|
|
extra=parsed_data.get("extra", {})
|
|
)
|
|
|
|
# Process the status update
|
|
await _process_status_update(workflow_run_id, status_update, workflow_run)
|
|
|
|
# Return 204 No Content as expected by Vonage
|
|
return {"status": "ok"} |