"""Vonage telephony routes (webhooks, status callbacks, answer URLs). Mounted under ``/api/v1/telephony`` by ``api.routes.telephony`` via the provider registry — see ProviderSpec.router. """ import json from typing import Optional from fastapi import APIRouter, Request from loguru import logger from pipecat.utils.run_context import set_current_run_id from api.db import db_client from api.services.telephony.factory import get_telephony_provider_for_run from api.services.telephony.status_processor import ( StatusCallbackRequest, _process_status_update, ) router = APIRouter() @router.get("/ncco", include_in_schema=False) async def handle_ncco_webhook( workflow_id: int, user_id: int, workflow_run_id: int, organization_id: Optional[int] = None, ): """Handle NCCO (Nexmo Call Control Objects) webhook for Vonage. Returns JSON response instead of XML like TwiML. """ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) provider = await get_telephony_provider_for_run( workflow_run, organization_id or user_id ) response_content = await provider.get_webhook_response( workflow_id, user_id, workflow_run_id ) return json.loads(response_content) @router.post("/vonage/events/{workflow_run_id}") async def handle_vonage_events( request: Request, workflow_run_id: int, ): """Handle Vonage-specific event webhooks. Vonage sends all call events to a single endpoint. Events include: started, ringing, answered, complete, failed, etc. """ set_current_run_id(workflow_run_id) # Parse the event data event_data = await request.json() logger.info(f"[run {workflow_run_id}] Received Vonage event: {event_data}") # Get workflow run for processing workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: logger.error(f"[run {workflow_run_id}] Workflow run not found") return {"status": "error", "message": "Workflow run not found"} # For a completed call that includes cost info, capture it immediately if event_data.get("status") == "completed": # Vonage sometimes includes price info in the webhook if "price" in event_data or "rate" in event_data: try: if workflow_run.cost_info: # Store immediate cost info if available cost_info = workflow_run.cost_info.copy() if "price" in event_data: cost_info["vonage_webhook_price"] = float(event_data["price"]) if "rate" in event_data: cost_info["vonage_webhook_rate"] = float(event_data["rate"]) if "duration" in event_data: cost_info["vonage_webhook_duration"] = int( event_data["duration"] ) await db_client.update_workflow_run( run_id=workflow_run_id, cost_info=cost_info ) logger.info( f"[run {workflow_run_id}] Captured Vonage cost info from webhook" ) except Exception as e: logger.error( f"[run {workflow_run_id}] Failed to capture Vonage cost from webhook: {e}" ) # Get workflow and provider workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) if not workflow: logger.error(f"[run {workflow_run_id}] Workflow not found") return {"status": "error", "message": "Workflow not found"} provider = await get_telephony_provider_for_run( workflow_run, workflow.organization_id ) # Parse the event data into generic format parsed_data = provider.parse_status_callback(event_data) # Create StatusCallbackRequest from parsed data status_update = StatusCallbackRequest( call_id=parsed_data["call_id"], status=parsed_data["status"], from_number=parsed_data.get("from_number"), to_number=parsed_data.get("to_number"), direction=parsed_data.get("direction"), duration=parsed_data.get("duration"), extra=parsed_data.get("extra", {}), ) # Process the status update await _process_status_update(workflow_run_id, status_update) # Return 204 No Content as expected by Vonage return {"status": "ok"}