""" Vonage (Nexmo) implementation of the TelephonyProvider interface. """ import json import random import time from typing import TYPE_CHECKING, Any, Dict, List, Optional import aiohttp import jwt from fastapi import HTTPException, Response from loguru import logger from api.enums import WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, ProviderSyncResult, TelephonyProvider, ) from api.utils.common import get_backend_endpoints if TYPE_CHECKING: from fastapi import WebSocket class VonageProvider(TelephonyProvider): """ Vonage implementation of TelephonyProvider. Uses JWT authentication and NCCO for call control. """ PROVIDER_NAME = WorkflowRunMode.VONAGE.value WEBHOOK_ENDPOINT = "ncco" def __init__(self, config: Dict[str, Any]): """ Initialize VonageProvider with configuration. Args: config: Dictionary containing: - api_key: Vonage API Key - api_secret: Vonage API Secret - application_id: Vonage Application ID - private_key: Private key for JWT generation - from_numbers: List of phone numbers to use """ self.api_key = config.get("api_key") self.api_secret = config.get("api_secret") self.application_id = config.get("application_id") self.private_key = config.get("private_key") self.from_numbers = config.get("from_numbers", []) # Handle both single number (string) and multiple numbers (list) if isinstance(self.from_numbers, str): self.from_numbers = [self.from_numbers] self.base_url = "https://api.nexmo.com" def _generate_jwt(self) -> str: """Generate JWT token for Vonage API authentication.""" if not self.application_id or not self.private_key: raise ValueError( "Application ID and private key required for JWT generation" ) claims = { "application_id": self.application_id, "iat": int(time.time()), "exp": int(time.time()) + 3600, "jti": str(time.time()), } return jwt.encode(claims, self.private_key, algorithm="RS256") async def initiate_call( self, to_number: str, webhook_url: str, workflow_run_id: Optional[int] = None, from_number: Optional[str] = None, **kwargs: Any, ) -> CallInitiationResult: """ Initiate an outbound call via Vonage Voice API. """ if not self.validate_config(): raise ValueError("Vonage provider not properly configured") endpoint = f"{self.base_url}/v1/calls" # Use provided from_number or select a random one if from_number is None: from_number = random.choice(self.from_numbers) # Remove '+' prefix for Vonage from_number = from_number.replace("+", "") to_number = to_number.replace("+", "") logger.info(f"Selected phone number {from_number} for outbound call") # Prepare call data data = { "to": [{"type": "phone", "number": to_number}], "from": {"type": "phone", "number": from_number}, "answer_url": [webhook_url], "answer_method": "GET", } # Add event webhook if workflow_run_id provided if workflow_run_id: backend_endpoint, _ = await get_backend_endpoints() event_url = ( f"{backend_endpoint}/api/v1/telephony/vonage/events/{workflow_run_id}" ) data.update({"event_url": [event_url], "event_method": "POST"}) data.update(kwargs) # Generate JWT token token = self._generate_jwt() headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } # Make the API request async with aiohttp.ClientSession() as session: async with session.post(endpoint, json=data, headers=headers) as response: response_data = await response.json() if response.status != 201: raise HTTPException( status_code=response.status, detail=f"Failed to initiate Vonage call: {response_data}", ) return CallInitiationResult( call_id=response_data["uuid"], status=response_data.get("status", "started"), caller_number=from_number, provider_metadata={ "call_id": response_data["uuid"], "call_uuid": response_data["uuid"], }, # Vonage needs UUID persisted for WebSocket raw_response=response_data, ) async def get_call_status(self, call_id: str) -> Dict[str, Any]: """ Get the current status of a Vonage call. """ if not self.validate_config(): raise ValueError("Vonage provider not properly configured") endpoint = f"{self.base_url}/v1/calls/{call_id}" # Generate JWT token token = self._generate_jwt() headers = {"Authorization": f"Bearer {token}"} async with aiohttp.ClientSession() as session: async with session.get(endpoint, headers=headers) as response: if response.status != 200: error_data = await response.json() raise Exception(f"Failed to get call status: {error_data}") return await response.json() async def get_available_phone_numbers(self) -> List[str]: """ Get list of available Vonage phone numbers. """ return self.from_numbers def validate_config(self) -> bool: """ Validate Vonage configuration. """ return bool(self.application_id and self.private_key and self.from_numbers) async def verify_webhook_signature( self, url: str, params: Dict[str, Any], signature: str ) -> bool: """ Verify Vonage webhook signature for security. Vonage uses JWT for webhook signatures. """ if not self.api_secret: logger.error("No API secret available for webhook signature verification") return False try: # Vonage sends JWT in Authorization header. Verify the JWT signature decoded = jwt.decode( signature, self.api_secret, algorithms=["HS256"], options={"verify_signature": True}, ) return True except jwt.InvalidTokenError: return False async def get_webhook_response( self, workflow_id: int, user_id: int, workflow_run_id: int ) -> str: """ Generate NCCO response for starting a call session. NCCO (Nexmo Call Control Objects) is JSON-based, unlike TwiML which is XML. """ _, wss_backend_endpoint = await get_backend_endpoints() # NCCO for WebSocket connection ncco = [ { "action": "connect", "endpoint": [ { "type": "websocket", "uri": f"{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}", "content-type": "audio/l16;rate=16000", # 16kHz Linear PCM "headers": {}, } ], } ] return json.dumps(ncco) def _get_auth_headers(self) -> Dict[str, str]: """Generate authorization headers for Vonage API.""" token = self._generate_jwt() return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} async def get_call_cost(self, call_id: str) -> Dict[str, Any]: """ Get cost information for a completed Vonage call. Args: call_id: The Vonage Call UUID Returns: Dict containing cost information """ headers = self._get_auth_headers() endpoint = f"https://api.nexmo.com/v1/calls/{call_id}" try: async with aiohttp.ClientSession() as session: async with session.get(endpoint, headers=headers) as response: if response.status != 200: error_data = await response.json() logger.error(f"Failed to get Vonage call cost: {error_data}") return { "cost_usd": 0.0, "duration": 0, "status": "error", "error": str(error_data), } call_data = await response.json() # Vonage returns price and rate # Price is the total cost, rate is the per-minute rate price = float(call_data.get("price", 0)) cost_usd = price # Vonage returns positive values # Duration is in seconds duration = int(call_data.get("duration", 0)) # Get the call status status = call_data.get("status", "unknown") return { "cost_usd": cost_usd, "duration": duration, "status": status, "price_unit": "USD", # Vonage uses USD by default "rate": call_data.get("rate", 0), # Per-minute rate "raw_response": call_data, } except Exception as e: logger.error(f"Exception fetching Vonage call cost: {e}") return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)} def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Parse Vonage event callback data into generic format. """ # Map Vonage status to common format status_map = { "started": "initiated", "ringing": "ringing", "answered": "answered", "complete": "completed", "failed": "failed", "busy": "busy", "timeout": "no-answer", "rejected": "busy", } return { "call_id": data.get("uuid", ""), "status": status_map.get(data.get("status", ""), data.get("status", "")), "from_number": data.get("from"), "to_number": data.get("to"), "direction": data.get("direction"), "duration": data.get("duration"), "extra": data, # Include all original data } async def handle_websocket( self, websocket: "WebSocket", workflow_id: int, user_id: int, workflow_run_id: int, ) -> None: """ Handle Vonage-specific WebSocket connection. Vonage can send: 1. JSON metadata first (websocket:connected event) 2. Or directly start with binary audio """ from api.db import db_client from api.services.pipecat.run_pipeline import run_pipeline_telephony try: # Get workflow run to extract call UUID workflow_run = await db_client.get_workflow_run(workflow_run_id) if not workflow_run: logger.error(f"Workflow run {workflow_run_id} not found") await websocket.close(code=4404, reason="Workflow run not found") return # Get workflow for organization info workflow = await db_client.get_workflow(workflow_id, user_id) if not workflow: logger.error(f"Workflow {workflow_id} not found") await websocket.close(code=4404, reason="Workflow not found") return # Extract call UUID from workflow run context call_uuid = ( workflow_run.gathered_context.get("call_uuid") if workflow_run.gathered_context else None ) if not call_uuid: logger.error( f"No call UUID found for Vonage connection in workflow run {workflow_run_id}" ) await websocket.close(code=4400, reason="Missing call UUID") return logger.info( f"Vonage WebSocket connected for workflow_run {workflow_run_id}, call_uuid: {call_uuid}" ) # Peek at first message to see if it's metadata or audio first_msg = await websocket.receive() if "text" in first_msg: # JSON metadata - check if it's the connection event msg = json.loads(first_msg["text"]) if msg.get("event") == "websocket:connected": logger.debug( f"Received Vonage connection confirmation for {workflow_run_id}" ) # Continue to pipeline regardless of message type elif "bytes" in first_msg: # Binary audio - Vonage started with audio immediately logger.debug(f"Vonage started with binary audio for {workflow_run_id}") # The pipeline will handle this first audio chunk await run_pipeline_telephony( websocket, provider_name=self.PROVIDER_NAME, workflow_id=workflow_id, workflow_run_id=workflow_run_id, user_id=user_id, call_id=call_uuid, transport_kwargs={"call_uuid": call_uuid}, ) except Exception as e: logger.error(f"Error in Vonage WebSocket handler: {e}") raise # ======== INBOUND CALL METHODS ======== @classmethod def can_handle_webhook( cls, webhook_data: Dict[str, Any], headers: Dict[str, str] ) -> bool: """ Determine if this provider can handle the incoming webhook. """ return False @staticmethod def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: """ Parse Vonage-specific inbound webhook data into normalized format. """ return NormalizedInboundData( provider=VonageProvider.PROVIDER_NAME, call_id=webhook_data.get("uuid", ""), from_number=webhook_data.get("from", ""), to_number=webhook_data.get("to", ""), direction=webhook_data.get("direction", ""), call_status=webhook_data.get("status", ""), account_id=webhook_data.get("account_id"), from_country=None, to_country=None, raw_data=webhook_data, ) @staticmethod def validate_account_id(config_data: dict, webhook_account_id: str) -> bool: """Validate Vonage account_id from webhook matches configuration""" if not webhook_account_id: return False stored_api_key = config_data.get("api_key") return stored_api_key == webhook_account_id async def verify_inbound_signature( self, url: str, webhook_data: Dict[str, Any], headers: Dict[str, str], body: str = "", ) -> bool: """ Vonage inbound signature verification - minimalist implementation. """ return True async def configure_inbound( self, address: str, webhook_url: Optional[str] ) -> ProviderSyncResult: """Update the answer_url on Vonage's Application for ``address``. Vonage routes inbound calls per-application: a single ``answer_url`` on ``self.application_id`` applies to every number attached to it. The ``address`` argument is informational — every call to this method rewrites (or leaves alone) the application's webhook, regardless of which number triggered the sync. Vonage's PUT /v2/applications/{id} is full-replacement, so we GET the current application, mutate ``capabilities.voice.webhooks.answer_url``, and PUT the result back. ``api_key`` and ``api_secret`` are used for Basic auth on the application API (the JWT auth used elsewhere is for the Voice API, not the Application API). Clearing (``webhook_url=None``) is a no-op on the Vonage side: the URL is shared across all numbers on this application, so unsetting it for one number would silently break inbound for every other number still attached. The DB-level disconnect is sufficient — inbound calls without a matching workflow are rejected by the backend. """ if webhook_url is None: logger.info( f"Vonage configure_inbound clear for {address}: skipping " f"application update (answer_url is shared across all numbers " f"on application {self.application_id})" ) return ProviderSyncResult(ok=True) if not self.validate_config(): return ProviderSyncResult( ok=False, message="Vonage provider not properly configured" ) if not (self.api_key and self.api_secret): return ProviderSyncResult( ok=False, message=( "Vonage api_key and api_secret are required to update the " "application's answer_url" ), ) app_endpoint = f"{self.base_url}/v2/applications/{self.application_id}" auth = aiohttp.BasicAuth(self.api_key, self.api_secret) try: async with aiohttp.ClientSession() as session: async with session.get(app_endpoint, auth=auth) as response: if response.status != 200: body = await response.text() logger.error( f"Failed to fetch Vonage application " f"{self.application_id}: {response.status} {body}" ) return ProviderSyncResult( ok=False, message=f"Vonage API {response.status}: {body}", ) app_data = await response.json() except Exception as e: logger.error(f"Exception fetching Vonage application: {e}") return ProviderSyncResult(ok=False, message=f"Vonage lookup failed: {e}") capabilities = app_data.get("capabilities") or {} voice = capabilities.get("voice") or {} webhooks = voice.get("webhooks") or {} webhooks["answer_url"] = { "address": webhook_url, "http_method": "POST", } voice["webhooks"] = webhooks capabilities["voice"] = voice update_body = { "name": app_data.get("name"), "capabilities": capabilities, } if "privacy" in app_data: update_body["privacy"] = app_data["privacy"] try: async with aiohttp.ClientSession() as session: async with session.put( app_endpoint, json=update_body, auth=auth ) as response: if response.status not in (200, 201): body = await response.text() logger.error( f"Vonage application update failed for " f"{self.application_id}: {response.status} {body}" ) return ProviderSyncResult( ok=False, message=f"Vonage API {response.status}: {body}", ) except Exception as e: logger.error(f"Exception updating Vonage application: {e}") return ProviderSyncResult(ok=False, message=f"Vonage update failed: {e}") logger.info( f"Vonage answer_url set on application {self.application_id} " f"(triggered by address {address})" ) return ProviderSyncResult(ok=True) async def start_inbound_stream( self, *, websocket_url: str, workflow_run_id: int, normalized_data, backend_endpoint: str, ): """ Generate NCCO response for inbound Vonage webhook. """ # Minimalist NCCO response for interface compliance ncco_response = [ { "action": "talk", "text": "Vonage inbound calls are not currently supported.", }, {"action": "hangup"}, ] return Response( content=json.dumps(ncco_response), media_type="application/json" ) @staticmethod def generate_error_response(error_type: str, message: str) -> tuple: """ Generate a Vonage-specific error response. """ from fastapi import Response error_ncco = [ { "action": "talk", "text": f"Sorry, there was an error processing your call. {message}", }, {"action": "hangup"}, ] return Response(content=json.dumps(error_ncco), media_type="application/json") # ======== CALL TRANSFER METHODS ======== async def transfer_call( self, destination: str, transfer_id: str, conference_name: str, timeout: int = 30, **kwargs: Any, ) -> Dict[str, Any]: """ Vonage provider does not support call transfers. Raises: NotImplementedError: call transfers are yet to be implemented """ raise NotImplementedError("Vonage provider does not support call transfers") def supports_transfers(self) -> bool: """ Vonage does not support call transfers. Returns: False - Vonage provider does not support call transfers """ return False