mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-10 08:05:22 +02:00
* feat: add deployment configuration options * Simplify EmbedDialog * Add options for inline vs floating embedding of agent
380 lines
13 KiB
Python
380 lines
13 KiB
Python
"""
|
|
Vonage (Nexmo) implementation of the TelephonyProvider interface.
|
|
"""
|
|
|
|
import json
|
|
import random
|
|
import time
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
|
|
|
import aiohttp
|
|
import jwt
|
|
from loguru import logger
|
|
|
|
from api.enums import WorkflowRunMode
|
|
from api.services.telephony.base import CallInitiationResult, TelephonyProvider
|
|
from api.utils.tunnel import TunnelURLProvider
|
|
|
|
if TYPE_CHECKING:
|
|
from fastapi import WebSocket
|
|
|
|
|
|
class VonageProvider(TelephonyProvider):
|
|
"""
|
|
Vonage implementation of TelephonyProvider.
|
|
Uses JWT authentication and NCCO for call control.
|
|
"""
|
|
|
|
PROVIDER_NAME = WorkflowRunMode.VONAGE.value
|
|
WEBHOOK_ENDPOINT = "ncco"
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
"""
|
|
Initialize VonageProvider with configuration.
|
|
|
|
Args:
|
|
config: Dictionary containing:
|
|
- api_key: Vonage API Key
|
|
- api_secret: Vonage API Secret
|
|
- application_id: Vonage Application ID
|
|
- private_key: Private key for JWT generation
|
|
- from_numbers: List of phone numbers to use
|
|
"""
|
|
self.api_key = config.get("api_key")
|
|
self.api_secret = config.get("api_secret")
|
|
self.application_id = config.get("application_id")
|
|
self.private_key = config.get("private_key")
|
|
self.from_numbers = config.get("from_numbers", [])
|
|
|
|
# Handle both single number (string) and multiple numbers (list)
|
|
if isinstance(self.from_numbers, str):
|
|
self.from_numbers = [self.from_numbers]
|
|
|
|
self.base_url = "https://api.nexmo.com"
|
|
|
|
def _generate_jwt(self) -> str:
|
|
"""Generate JWT token for Vonage API authentication."""
|
|
if not self.application_id or not self.private_key:
|
|
raise ValueError(
|
|
"Application ID and private key required for JWT generation"
|
|
)
|
|
|
|
claims = {
|
|
"application_id": self.application_id,
|
|
"iat": int(time.time()),
|
|
"exp": int(time.time()) + 3600,
|
|
"jti": str(time.time()),
|
|
}
|
|
|
|
return jwt.encode(claims, self.private_key, algorithm="RS256")
|
|
|
|
async def initiate_call(
|
|
self,
|
|
to_number: str,
|
|
webhook_url: str,
|
|
workflow_run_id: Optional[int] = None,
|
|
**kwargs: Any,
|
|
) -> CallInitiationResult:
|
|
"""
|
|
Initiate an outbound call via Vonage Voice API.
|
|
"""
|
|
if not self.validate_config():
|
|
raise ValueError("Vonage provider not properly configured")
|
|
|
|
endpoint = f"{self.base_url}/v1/calls"
|
|
|
|
# Select a random phone number
|
|
from_number = random.choice(self.from_numbers)
|
|
# Remove '+' prefix for Vonage
|
|
from_number = from_number.replace("+", "")
|
|
to_number = to_number.replace("+", "")
|
|
|
|
logger.info(f"Selected phone number {from_number} for outbound call")
|
|
|
|
# Prepare call data
|
|
data = {
|
|
"to": [{"type": "phone", "number": to_number}],
|
|
"from": {"type": "phone", "number": from_number},
|
|
"answer_url": [webhook_url],
|
|
"answer_method": "GET",
|
|
}
|
|
|
|
# Add event webhook if workflow_run_id provided
|
|
if workflow_run_id:
|
|
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
|
event_url = f"https://{backend_endpoint}/api/v1/telephony/vonage/events/{workflow_run_id}"
|
|
data.update({"event_url": [event_url], "event_method": "POST"})
|
|
|
|
data.update(kwargs)
|
|
|
|
# Generate JWT token
|
|
token = self._generate_jwt()
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
# Make the API request
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(endpoint, json=data, headers=headers) as response:
|
|
response_data = await response.json()
|
|
|
|
if response.status != 201:
|
|
raise Exception(f"Failed to initiate call: {response_data}")
|
|
|
|
return CallInitiationResult(
|
|
call_id=response_data["uuid"],
|
|
status=response_data.get("status", "started"),
|
|
provider_metadata={
|
|
"call_uuid": response_data[
|
|
"uuid"
|
|
] # Vonage needs UUID persisted for WebSocket
|
|
},
|
|
raw_response=response_data,
|
|
)
|
|
|
|
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get the current status of a Vonage call.
|
|
"""
|
|
if not self.validate_config():
|
|
raise ValueError("Vonage provider not properly configured")
|
|
|
|
endpoint = f"{self.base_url}/v1/calls/{call_id}"
|
|
|
|
# Generate JWT token
|
|
token = self._generate_jwt()
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(endpoint, headers=headers) as response:
|
|
if response.status != 200:
|
|
error_data = await response.json()
|
|
raise Exception(f"Failed to get call status: {error_data}")
|
|
|
|
return await response.json()
|
|
|
|
async def get_available_phone_numbers(self) -> List[str]:
|
|
"""
|
|
Get list of available Vonage phone numbers.
|
|
"""
|
|
return self.from_numbers
|
|
|
|
def validate_config(self) -> bool:
|
|
"""
|
|
Validate Vonage configuration.
|
|
"""
|
|
return bool(self.application_id and self.private_key and self.from_numbers)
|
|
|
|
async def verify_webhook_signature(
|
|
self, url: str, params: Dict[str, Any], signature: str
|
|
) -> bool:
|
|
"""
|
|
Verify Vonage webhook signature for security.
|
|
Vonage uses JWT for webhook signatures.
|
|
"""
|
|
if not self.api_secret:
|
|
logger.error("No API secret available for webhook signature verification")
|
|
return False
|
|
|
|
try:
|
|
# Vonage sends JWT in Authorization header. Verify the JWT signature
|
|
decoded = jwt.decode(
|
|
signature,
|
|
self.api_secret,
|
|
algorithms=["HS256"],
|
|
options={"verify_signature": True},
|
|
)
|
|
return True
|
|
except jwt.InvalidTokenError:
|
|
return False
|
|
|
|
async def get_webhook_response(
|
|
self, workflow_id: int, user_id: int, workflow_run_id: int
|
|
) -> str:
|
|
"""
|
|
Generate NCCO response for starting a call session.
|
|
NCCO (Nexmo Call Control Objects) is JSON-based, unlike TwiML which is XML.
|
|
"""
|
|
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
|
|
|
|
# NCCO for WebSocket connection
|
|
ncco = [
|
|
{
|
|
"action": "connect",
|
|
"endpoint": [
|
|
{
|
|
"type": "websocket",
|
|
"uri": f"wss://{backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}",
|
|
"content-type": "audio/l16;rate=16000", # 16kHz Linear PCM
|
|
"headers": {},
|
|
}
|
|
],
|
|
}
|
|
]
|
|
|
|
return json.dumps(ncco)
|
|
|
|
def _get_auth_headers(self) -> Dict[str, str]:
|
|
"""Generate authorization headers for Vonage API."""
|
|
token = self._generate_jwt()
|
|
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get cost information for a completed Vonage call.
|
|
|
|
Args:
|
|
call_id: The Vonage Call UUID
|
|
|
|
Returns:
|
|
Dict containing cost information
|
|
"""
|
|
headers = self._get_auth_headers()
|
|
endpoint = f"https://api.nexmo.com/v1/calls/{call_id}"
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(endpoint, headers=headers) as response:
|
|
if response.status != 200:
|
|
error_data = await response.json()
|
|
logger.error(f"Failed to get Vonage call cost: {error_data}")
|
|
return {
|
|
"cost_usd": 0.0,
|
|
"duration": 0,
|
|
"status": "error",
|
|
"error": str(error_data),
|
|
}
|
|
|
|
call_data = await response.json()
|
|
|
|
# Vonage returns price and rate
|
|
# Price is the total cost, rate is the per-minute rate
|
|
price = float(call_data.get("price", 0))
|
|
cost_usd = price # Vonage returns positive values
|
|
|
|
# Duration is in seconds
|
|
duration = int(call_data.get("duration", 0))
|
|
|
|
# Get the call status
|
|
status = call_data.get("status", "unknown")
|
|
|
|
return {
|
|
"cost_usd": cost_usd,
|
|
"duration": duration,
|
|
"status": status,
|
|
"price_unit": "USD", # Vonage uses USD by default
|
|
"rate": call_data.get("rate", 0), # Per-minute rate
|
|
"raw_response": call_data,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Exception fetching Vonage call cost: {e}")
|
|
return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
|
|
|
|
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Parse Vonage event callback data into generic format.
|
|
"""
|
|
# Map Vonage status to common format
|
|
status_map = {
|
|
"started": "initiated",
|
|
"ringing": "ringing",
|
|
"answered": "answered",
|
|
"complete": "completed",
|
|
"failed": "failed",
|
|
"busy": "busy",
|
|
"timeout": "no-answer",
|
|
"rejected": "busy",
|
|
}
|
|
|
|
return {
|
|
"call_id": data.get("uuid", ""),
|
|
"status": status_map.get(data.get("status", ""), data.get("status", "")),
|
|
"from_number": data.get("from"),
|
|
"to_number": data.get("to"),
|
|
"direction": data.get("direction"),
|
|
"duration": data.get("duration"),
|
|
"extra": data, # Include all original data
|
|
}
|
|
|
|
async def handle_websocket(
|
|
self,
|
|
websocket: "WebSocket",
|
|
workflow_id: int,
|
|
user_id: int,
|
|
workflow_run_id: int,
|
|
) -> None:
|
|
"""
|
|
Handle Vonage-specific WebSocket connection.
|
|
|
|
Vonage can send:
|
|
1. JSON metadata first (websocket:connected event)
|
|
2. Or directly start with binary audio
|
|
"""
|
|
from api.db import db_client
|
|
from api.services.pipecat.run_pipeline import run_pipeline_vonage
|
|
|
|
try:
|
|
# Get workflow run to extract call UUID
|
|
workflow_run = await db_client.get_workflow_run(workflow_run_id)
|
|
if not workflow_run:
|
|
logger.error(f"Workflow run {workflow_run_id} not found")
|
|
await websocket.close(code=4404, reason="Workflow run not found")
|
|
return
|
|
|
|
# Get workflow for organization info
|
|
workflow = await db_client.get_workflow(workflow_id, user_id)
|
|
if not workflow:
|
|
logger.error(f"Workflow {workflow_id} not found")
|
|
await websocket.close(code=4404, reason="Workflow not found")
|
|
return
|
|
|
|
# Extract call UUID from workflow run context
|
|
call_uuid = (
|
|
workflow_run.gathered_context.get("call_uuid")
|
|
if workflow_run.gathered_context
|
|
else None
|
|
)
|
|
|
|
if not call_uuid:
|
|
logger.error(
|
|
f"No call UUID found for Vonage connection in workflow run {workflow_run_id}"
|
|
)
|
|
await websocket.close(code=4400, reason="Missing call UUID")
|
|
return
|
|
|
|
logger.info(
|
|
f"Vonage WebSocket connected for workflow_run {workflow_run_id}, call_uuid: {call_uuid}"
|
|
)
|
|
|
|
# Peek at first message to see if it's metadata or audio
|
|
first_msg = await websocket.receive()
|
|
|
|
if "text" in first_msg:
|
|
# JSON metadata - check if it's the connection event
|
|
msg = json.loads(first_msg["text"])
|
|
if msg.get("event") == "websocket:connected":
|
|
logger.debug(
|
|
f"Received Vonage connection confirmation for {workflow_run_id}"
|
|
)
|
|
# Continue to pipeline regardless of message type
|
|
elif "bytes" in first_msg:
|
|
# Binary audio - Vonage started with audio immediately
|
|
logger.debug(f"Vonage started with binary audio for {workflow_run_id}")
|
|
# The pipeline will handle this first audio chunk
|
|
|
|
# Run the Vonage pipeline
|
|
await run_pipeline_vonage(
|
|
websocket,
|
|
call_uuid,
|
|
workflow,
|
|
workflow.organization_id,
|
|
workflow_id,
|
|
workflow_run_id,
|
|
user_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Vonage WebSocket handler: {e}")
|
|
raise
|