mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-10 08:05:22 +02:00
619 lines
22 KiB
Python
619 lines
22 KiB
Python
"""
|
|
Vonage (Nexmo) implementation of the TelephonyProvider interface.
|
|
"""
|
|
|
|
import json
|
|
import random
|
|
import time
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
|
|
|
import aiohttp
|
|
import jwt
|
|
from fastapi import HTTPException, Response
|
|
from loguru import logger
|
|
|
|
from api.enums import WorkflowRunMode
|
|
from api.services.telephony.base import (
|
|
CallInitiationResult,
|
|
NormalizedInboundData,
|
|
ProviderSyncResult,
|
|
TelephonyProvider,
|
|
)
|
|
from api.utils.common import get_backend_endpoints
|
|
|
|
if TYPE_CHECKING:
|
|
from fastapi import WebSocket
|
|
|
|
|
|
class VonageProvider(TelephonyProvider):
|
|
"""
|
|
Vonage implementation of TelephonyProvider.
|
|
Uses JWT authentication and NCCO for call control.
|
|
"""
|
|
|
|
PROVIDER_NAME = WorkflowRunMode.VONAGE.value
|
|
WEBHOOK_ENDPOINT = "ncco"
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
"""
|
|
Initialize VonageProvider with configuration.
|
|
|
|
Args:
|
|
config: Dictionary containing:
|
|
- api_key: Vonage API Key
|
|
- api_secret: Vonage API Secret
|
|
- application_id: Vonage Application ID
|
|
- private_key: Private key for JWT generation
|
|
- from_numbers: List of phone numbers to use
|
|
"""
|
|
self.api_key = config.get("api_key")
|
|
self.api_secret = config.get("api_secret")
|
|
self.application_id = config.get("application_id")
|
|
self.private_key = config.get("private_key")
|
|
self.from_numbers = config.get("from_numbers", [])
|
|
|
|
# Handle both single number (string) and multiple numbers (list)
|
|
if isinstance(self.from_numbers, str):
|
|
self.from_numbers = [self.from_numbers]
|
|
|
|
self.base_url = "https://api.nexmo.com"
|
|
|
|
def _generate_jwt(self) -> str:
|
|
"""Generate JWT token for Vonage API authentication."""
|
|
if not self.application_id or not self.private_key:
|
|
raise ValueError(
|
|
"Application ID and private key required for JWT generation"
|
|
)
|
|
|
|
claims = {
|
|
"application_id": self.application_id,
|
|
"iat": int(time.time()),
|
|
"exp": int(time.time()) + 3600,
|
|
"jti": str(time.time()),
|
|
}
|
|
|
|
return jwt.encode(claims, self.private_key, algorithm="RS256")
|
|
|
|
async def initiate_call(
|
|
self,
|
|
to_number: str,
|
|
webhook_url: str,
|
|
workflow_run_id: Optional[int] = None,
|
|
from_number: Optional[str] = None,
|
|
**kwargs: Any,
|
|
) -> CallInitiationResult:
|
|
"""
|
|
Initiate an outbound call via Vonage Voice API.
|
|
"""
|
|
if not self.validate_config():
|
|
raise ValueError("Vonage provider not properly configured")
|
|
|
|
endpoint = f"{self.base_url}/v1/calls"
|
|
|
|
# Use provided from_number or select a random one
|
|
if from_number is None:
|
|
from_number = random.choice(self.from_numbers)
|
|
# Remove '+' prefix for Vonage
|
|
from_number = from_number.replace("+", "")
|
|
to_number = to_number.replace("+", "")
|
|
|
|
logger.info(f"Selected phone number {from_number} for outbound call")
|
|
|
|
# Prepare call data
|
|
data = {
|
|
"to": [{"type": "phone", "number": to_number}],
|
|
"from": {"type": "phone", "number": from_number},
|
|
"answer_url": [webhook_url],
|
|
"answer_method": "GET",
|
|
}
|
|
|
|
# Add event webhook if workflow_run_id provided
|
|
if workflow_run_id:
|
|
backend_endpoint, _ = await get_backend_endpoints()
|
|
event_url = (
|
|
f"{backend_endpoint}/api/v1/telephony/vonage/events/{workflow_run_id}"
|
|
)
|
|
data.update({"event_url": [event_url], "event_method": "POST"})
|
|
|
|
data.update(kwargs)
|
|
|
|
# Generate JWT token
|
|
token = self._generate_jwt()
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
# Make the API request
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(endpoint, json=data, headers=headers) as response:
|
|
response_data = await response.json()
|
|
|
|
if response.status != 201:
|
|
raise HTTPException(
|
|
status_code=response.status,
|
|
detail=f"Failed to initiate Vonage call: {response_data}",
|
|
)
|
|
|
|
return CallInitiationResult(
|
|
call_id=response_data["uuid"],
|
|
status=response_data.get("status", "started"),
|
|
caller_number=from_number,
|
|
provider_metadata={
|
|
"call_id": response_data["uuid"],
|
|
"call_uuid": response_data["uuid"],
|
|
}, # Vonage needs UUID persisted for WebSocket
|
|
raw_response=response_data,
|
|
)
|
|
|
|
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get the current status of a Vonage call.
|
|
"""
|
|
if not self.validate_config():
|
|
raise ValueError("Vonage provider not properly configured")
|
|
|
|
endpoint = f"{self.base_url}/v1/calls/{call_id}"
|
|
|
|
# Generate JWT token
|
|
token = self._generate_jwt()
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(endpoint, headers=headers) as response:
|
|
if response.status != 200:
|
|
error_data = await response.json()
|
|
raise Exception(f"Failed to get call status: {error_data}")
|
|
|
|
return await response.json()
|
|
|
|
async def get_available_phone_numbers(self) -> List[str]:
|
|
"""
|
|
Get list of available Vonage phone numbers.
|
|
"""
|
|
return self.from_numbers
|
|
|
|
def validate_config(self) -> bool:
|
|
"""
|
|
Validate Vonage configuration.
|
|
"""
|
|
return bool(self.application_id and self.private_key and self.from_numbers)
|
|
|
|
async def verify_webhook_signature(
|
|
self, url: str, params: Dict[str, Any], signature: str
|
|
) -> bool:
|
|
"""
|
|
Verify Vonage webhook signature for security.
|
|
Vonage uses JWT for webhook signatures.
|
|
"""
|
|
if not self.api_secret:
|
|
logger.error("No API secret available for webhook signature verification")
|
|
return False
|
|
|
|
try:
|
|
# Vonage sends JWT in Authorization header. Verify the JWT signature
|
|
decoded = jwt.decode(
|
|
signature,
|
|
self.api_secret,
|
|
algorithms=["HS256"],
|
|
options={"verify_signature": True},
|
|
)
|
|
return True
|
|
except jwt.InvalidTokenError:
|
|
return False
|
|
|
|
async def get_webhook_response(
|
|
self, workflow_id: int, user_id: int, workflow_run_id: int
|
|
) -> str:
|
|
"""
|
|
Generate NCCO response for starting a call session.
|
|
NCCO (Nexmo Call Control Objects) is JSON-based, unlike TwiML which is XML.
|
|
"""
|
|
_, wss_backend_endpoint = await get_backend_endpoints()
|
|
|
|
# NCCO for WebSocket connection
|
|
ncco = [
|
|
{
|
|
"action": "connect",
|
|
"endpoint": [
|
|
{
|
|
"type": "websocket",
|
|
"uri": f"{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}",
|
|
"content-type": "audio/l16;rate=16000", # 16kHz Linear PCM
|
|
"headers": {},
|
|
}
|
|
],
|
|
}
|
|
]
|
|
|
|
return json.dumps(ncco)
|
|
|
|
def _get_auth_headers(self) -> Dict[str, str]:
|
|
"""Generate authorization headers for Vonage API."""
|
|
token = self._generate_jwt()
|
|
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get cost information for a completed Vonage call.
|
|
|
|
Args:
|
|
call_id: The Vonage Call UUID
|
|
|
|
Returns:
|
|
Dict containing cost information
|
|
"""
|
|
headers = self._get_auth_headers()
|
|
endpoint = f"https://api.nexmo.com/v1/calls/{call_id}"
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(endpoint, headers=headers) as response:
|
|
if response.status != 200:
|
|
error_data = await response.json()
|
|
logger.error(f"Failed to get Vonage call cost: {error_data}")
|
|
return {
|
|
"cost_usd": 0.0,
|
|
"duration": 0,
|
|
"status": "error",
|
|
"error": str(error_data),
|
|
}
|
|
|
|
call_data = await response.json()
|
|
|
|
# Vonage returns price and rate
|
|
# Price is the total cost, rate is the per-minute rate
|
|
price = float(call_data.get("price", 0))
|
|
cost_usd = price # Vonage returns positive values
|
|
|
|
# Duration is in seconds
|
|
duration = int(call_data.get("duration", 0))
|
|
|
|
# Get the call status
|
|
status = call_data.get("status", "unknown")
|
|
|
|
return {
|
|
"cost_usd": cost_usd,
|
|
"duration": duration,
|
|
"status": status,
|
|
"price_unit": "USD", # Vonage uses USD by default
|
|
"rate": call_data.get("rate", 0), # Per-minute rate
|
|
"raw_response": call_data,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Exception fetching Vonage call cost: {e}")
|
|
return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
|
|
|
|
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Parse Vonage event callback data into generic format.
|
|
"""
|
|
# Map Vonage status to common format
|
|
status_map = {
|
|
"started": "initiated",
|
|
"ringing": "ringing",
|
|
"answered": "answered",
|
|
"complete": "completed",
|
|
"failed": "failed",
|
|
"busy": "busy",
|
|
"timeout": "no-answer",
|
|
"rejected": "busy",
|
|
}
|
|
|
|
return {
|
|
"call_id": data.get("uuid", ""),
|
|
"status": status_map.get(data.get("status", ""), data.get("status", "")),
|
|
"from_number": data.get("from"),
|
|
"to_number": data.get("to"),
|
|
"direction": data.get("direction"),
|
|
"duration": data.get("duration"),
|
|
"extra": data, # Include all original data
|
|
}
|
|
|
|
async def handle_websocket(
|
|
self,
|
|
websocket: "WebSocket",
|
|
workflow_id: int,
|
|
user_id: int,
|
|
workflow_run_id: int,
|
|
) -> None:
|
|
"""
|
|
Handle Vonage-specific WebSocket connection.
|
|
|
|
Vonage can send:
|
|
1. JSON metadata first (websocket:connected event)
|
|
2. Or directly start with binary audio
|
|
"""
|
|
from api.db import db_client
|
|
from api.services.pipecat.run_pipeline import run_pipeline_telephony
|
|
|
|
try:
|
|
# Get workflow run to extract call UUID
|
|
workflow_run = await db_client.get_workflow_run(workflow_run_id)
|
|
if not workflow_run:
|
|
logger.error(f"Workflow run {workflow_run_id} not found")
|
|
await websocket.close(code=4404, reason="Workflow run not found")
|
|
return
|
|
|
|
# Get workflow for organization info
|
|
workflow = await db_client.get_workflow(workflow_id, user_id)
|
|
if not workflow:
|
|
logger.error(f"Workflow {workflow_id} not found")
|
|
await websocket.close(code=4404, reason="Workflow not found")
|
|
return
|
|
|
|
# Extract call UUID from workflow run context
|
|
call_uuid = (
|
|
workflow_run.gathered_context.get("call_uuid")
|
|
if workflow_run.gathered_context
|
|
else None
|
|
)
|
|
|
|
if not call_uuid:
|
|
logger.error(
|
|
f"No call UUID found for Vonage connection in workflow run {workflow_run_id}"
|
|
)
|
|
await websocket.close(code=4400, reason="Missing call UUID")
|
|
return
|
|
|
|
logger.info(
|
|
f"Vonage WebSocket connected for workflow_run {workflow_run_id}, call_uuid: {call_uuid}"
|
|
)
|
|
|
|
# Peek at first message to see if it's metadata or audio
|
|
first_msg = await websocket.receive()
|
|
|
|
if "text" in first_msg:
|
|
# JSON metadata - check if it's the connection event
|
|
msg = json.loads(first_msg["text"])
|
|
if msg.get("event") == "websocket:connected":
|
|
logger.debug(
|
|
f"Received Vonage connection confirmation for {workflow_run_id}"
|
|
)
|
|
# Continue to pipeline regardless of message type
|
|
elif "bytes" in first_msg:
|
|
# Binary audio - Vonage started with audio immediately
|
|
logger.debug(f"Vonage started with binary audio for {workflow_run_id}")
|
|
# The pipeline will handle this first audio chunk
|
|
|
|
await run_pipeline_telephony(
|
|
websocket,
|
|
provider_name=self.PROVIDER_NAME,
|
|
workflow_id=workflow_id,
|
|
workflow_run_id=workflow_run_id,
|
|
user_id=user_id,
|
|
call_id=call_uuid,
|
|
transport_kwargs={"call_uuid": call_uuid},
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Vonage WebSocket handler: {e}")
|
|
raise
|
|
|
|
# ======== INBOUND CALL METHODS ========
|
|
|
|
@classmethod
|
|
def can_handle_webhook(
|
|
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
|
|
) -> bool:
|
|
"""
|
|
Determine if this provider can handle the incoming webhook.
|
|
"""
|
|
return False
|
|
|
|
@staticmethod
|
|
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
|
|
"""
|
|
Parse Vonage-specific inbound webhook data into normalized format.
|
|
"""
|
|
return NormalizedInboundData(
|
|
provider=VonageProvider.PROVIDER_NAME,
|
|
call_id=webhook_data.get("uuid", ""),
|
|
from_number=webhook_data.get("from", ""),
|
|
to_number=webhook_data.get("to", ""),
|
|
direction=webhook_data.get("direction", ""),
|
|
call_status=webhook_data.get("status", ""),
|
|
account_id=webhook_data.get("account_id"),
|
|
from_country=None,
|
|
to_country=None,
|
|
raw_data=webhook_data,
|
|
)
|
|
|
|
@staticmethod
|
|
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
|
|
"""Validate Vonage account_id from webhook matches configuration"""
|
|
if not webhook_account_id:
|
|
return False
|
|
|
|
stored_api_key = config_data.get("api_key")
|
|
return stored_api_key == webhook_account_id
|
|
|
|
async def verify_inbound_signature(
|
|
self,
|
|
url: str,
|
|
webhook_data: Dict[str, Any],
|
|
headers: Dict[str, str],
|
|
body: str = "",
|
|
) -> bool:
|
|
"""
|
|
Vonage inbound signature verification - minimalist implementation.
|
|
"""
|
|
return True
|
|
|
|
async def configure_inbound(
|
|
self, address: str, webhook_url: Optional[str]
|
|
) -> ProviderSyncResult:
|
|
"""Update the answer_url on Vonage's Application for ``address``.
|
|
|
|
Vonage routes inbound calls per-application: a single ``answer_url`` on
|
|
``self.application_id`` applies to every number attached to it. The
|
|
``address`` argument is informational — every call to this method
|
|
rewrites (or leaves alone) the application's webhook, regardless of
|
|
which number triggered the sync.
|
|
|
|
Vonage's PUT /v2/applications/{id} is full-replacement, so we GET the
|
|
current application, mutate ``capabilities.voice.webhooks.answer_url``,
|
|
and PUT the result back. ``api_key`` and ``api_secret`` are used for
|
|
Basic auth on the application API (the JWT auth used elsewhere is for
|
|
the Voice API, not the Application API).
|
|
|
|
Clearing (``webhook_url=None``) is a no-op on the Vonage side: the URL
|
|
is shared across all numbers on this application, so unsetting it for
|
|
one number would silently break inbound for every other number still
|
|
attached. The DB-level disconnect is sufficient — inbound calls
|
|
without a matching workflow are rejected by the backend.
|
|
"""
|
|
if webhook_url is None:
|
|
logger.info(
|
|
f"Vonage configure_inbound clear for {address}: skipping "
|
|
f"application update (answer_url is shared across all numbers "
|
|
f"on application {self.application_id})"
|
|
)
|
|
return ProviderSyncResult(ok=True)
|
|
|
|
if not self.validate_config():
|
|
return ProviderSyncResult(
|
|
ok=False, message="Vonage provider not properly configured"
|
|
)
|
|
|
|
if not (self.api_key and self.api_secret):
|
|
return ProviderSyncResult(
|
|
ok=False,
|
|
message=(
|
|
"Vonage api_key and api_secret are required to update the "
|
|
"application's answer_url"
|
|
),
|
|
)
|
|
|
|
app_endpoint = f"{self.base_url}/v2/applications/{self.application_id}"
|
|
auth = aiohttp.BasicAuth(self.api_key, self.api_secret)
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(app_endpoint, auth=auth) as response:
|
|
if response.status != 200:
|
|
body = await response.text()
|
|
logger.error(
|
|
f"Failed to fetch Vonage application "
|
|
f"{self.application_id}: {response.status} {body}"
|
|
)
|
|
return ProviderSyncResult(
|
|
ok=False,
|
|
message=f"Vonage API {response.status}: {body}",
|
|
)
|
|
app_data = await response.json()
|
|
except Exception as e:
|
|
logger.error(f"Exception fetching Vonage application: {e}")
|
|
return ProviderSyncResult(ok=False, message=f"Vonage lookup failed: {e}")
|
|
|
|
capabilities = app_data.get("capabilities") or {}
|
|
voice = capabilities.get("voice") or {}
|
|
webhooks = voice.get("webhooks") or {}
|
|
|
|
webhooks["answer_url"] = {
|
|
"address": webhook_url,
|
|
"http_method": "POST",
|
|
}
|
|
voice["webhooks"] = webhooks
|
|
capabilities["voice"] = voice
|
|
|
|
update_body = {
|
|
"name": app_data.get("name"),
|
|
"capabilities": capabilities,
|
|
}
|
|
if "privacy" in app_data:
|
|
update_body["privacy"] = app_data["privacy"]
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.put(
|
|
app_endpoint, json=update_body, auth=auth
|
|
) as response:
|
|
if response.status not in (200, 201):
|
|
body = await response.text()
|
|
logger.error(
|
|
f"Vonage application update failed for "
|
|
f"{self.application_id}: {response.status} {body}"
|
|
)
|
|
return ProviderSyncResult(
|
|
ok=False,
|
|
message=f"Vonage API {response.status}: {body}",
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Exception updating Vonage application: {e}")
|
|
return ProviderSyncResult(ok=False, message=f"Vonage update failed: {e}")
|
|
|
|
logger.info(
|
|
f"Vonage answer_url set on application {self.application_id} "
|
|
f"(triggered by address {address})"
|
|
)
|
|
return ProviderSyncResult(ok=True)
|
|
|
|
async def start_inbound_stream(
|
|
self,
|
|
*,
|
|
websocket_url: str,
|
|
workflow_run_id: int,
|
|
normalized_data,
|
|
backend_endpoint: str,
|
|
):
|
|
"""
|
|
Generate NCCO response for inbound Vonage webhook.
|
|
"""
|
|
# Minimalist NCCO response for interface compliance
|
|
ncco_response = [
|
|
{
|
|
"action": "talk",
|
|
"text": "Vonage inbound calls are not currently supported.",
|
|
},
|
|
{"action": "hangup"},
|
|
]
|
|
|
|
return Response(
|
|
content=json.dumps(ncco_response), media_type="application/json"
|
|
)
|
|
|
|
@staticmethod
|
|
def generate_error_response(error_type: str, message: str) -> tuple:
|
|
"""
|
|
Generate a Vonage-specific error response.
|
|
"""
|
|
from fastapi import Response
|
|
|
|
error_ncco = [
|
|
{
|
|
"action": "talk",
|
|
"text": f"Sorry, there was an error processing your call. {message}",
|
|
},
|
|
{"action": "hangup"},
|
|
]
|
|
|
|
return Response(content=json.dumps(error_ncco), media_type="application/json")
|
|
|
|
# ======== CALL TRANSFER METHODS ========
|
|
|
|
async def transfer_call(
|
|
self,
|
|
destination: str,
|
|
transfer_id: str,
|
|
conference_name: str,
|
|
timeout: int = 30,
|
|
**kwargs: Any,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Vonage provider does not support call transfers.
|
|
|
|
Raises:
|
|
NotImplementedError: call transfers are yet to be implemented
|
|
"""
|
|
raise NotImplementedError("Vonage provider does not support call transfers")
|
|
|
|
def supports_transfers(self) -> bool:
|
|
"""
|
|
Vonage does not support call transfers.
|
|
|
|
Returns:
|
|
False - Vonage provider does not support call transfers
|
|
"""
|
|
return False
|