fix: telephony bugs and improve code structure

This commit is contained in:
Sabiha Khan 2025-11-04 13:03:55 +05:30
parent 491e6edd36
commit f080c563b1
24 changed files with 633 additions and 793 deletions

View file

@ -189,11 +189,15 @@ class CampaignCallDispatcher:
# Create workflow run with queued_run_id tracking
workflow_run_name = f"WR-CAMPAIGN-{campaign.id}-{queued_run.id}"
# Get provider first to determine the mode
provider = await self.get_telephony_provider(campaign.organization_id)
workflow_run_mode = provider.PROVIDER_NAME
try:
workflow_run = await db_client.create_workflow_run(
name=workflow_run_name,
workflow_id=campaign.workflow_id,
mode=WorkflowRunMode.TWILIO.value,
mode=workflow_run_mode,
user_id=campaign.created_by,
initial_context=initial_context,
campaign_id=campaign.id,
@ -223,12 +227,11 @@ class CampaignCallDispatcher:
# Initiate call via telephony provider
try:
provider = await self.get_telephony_provider(campaign.organization_id)
# Construct webhook URL with parameters
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
webhook_endpoint = provider.WEBHOOK_ENDPOINT
webhook_url = (
f"https://{backend_endpoint}/api/v1/telephony/twiml"
f"https://{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
f"?workflow_id={campaign.workflow_id}"
f"&user_id={campaign.created_by}"
f"&workflow_run_id={workflow_run.id}"
@ -243,7 +246,7 @@ class CampaignCallDispatcher:
)
logger.info(
f"Call initiated for workflow run {workflow_run.id}, SID: {call_result.get('sid')}"
f"Call initiated for workflow run {workflow_run.id}, Call ID: {call_result.call_id}"
)
except Exception as e:
@ -252,13 +255,13 @@ class CampaignCallDispatcher:
)
# Update workflow run as failed
twilio_callback_logs = workflow_run.logs.get("twilio_status_callbacks", [])
twilio_callback_log = {
telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", [])
telephony_callback_log = {
"status": "failed",
"timestamp": datetime.now(UTC).isoformat(),
"data": {"error": str(e)},
}
twilio_callback_logs.append(twilio_callback_log)
telephony_callback_logs.append(telephony_callback_log)
await db_client.update_workflow_run(
run_id=workflow_run.id,
is_completed=True,
@ -266,7 +269,7 @@ class CampaignCallDispatcher:
"error": str(e),
},
logs={
"twilio_status_callbacks": twilio_callback_logs,
"telephony_status_callbacks": telephony_callback_logs,
},
)

View file

@ -102,13 +102,13 @@ class CampaignRunnerService:
}
async def _count_failed_campaign_calls(self, campaign_id: int) -> int:
"""Count failed calls by examining workflow_run Twilio callbacks"""
"""Count failed calls by examining workflow_run telephony callbacks"""
# Get all workflow runs for this campaign
workflow_runs = await db_client.get_workflow_runs_by_campaign(campaign_id)
failed_count = 0
for run in workflow_runs:
callbacks = run.logs.get("twilio_status_callbacks", [])
callbacks = run.logs.get("telephony_status_callbacks", [])
if callbacks:
# Check final status
final_status = callbacks[-1].get("status", "").lower()

View file

@ -71,8 +71,8 @@ async def run_pipeline_twilio(
)
set_current_run_id(workflow_run_id)
# Store Twilio call SID in cost_info for later cost calculation
cost_info = {"twilio_call_sid": call_sid, "provider": "twilio"}
# Store call ID in cost_info for later cost calculation (provider-agnostic)
cost_info = {"call_id": call_sid}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Get workflow to extract all pipeline configurations
@ -126,8 +126,8 @@ async def run_pipeline_vonage(
logger.info(f"Starting Vonage pipeline for workflow run {workflow_run_id}")
set_current_run_id(workflow_run_id)
# Store Vonage call UUID in cost_info for later cost calculation
cost_info = {"vonage_call_uuid": call_uuid, "provider": "vonage"}
# Store call ID in cost_info for later cost calculation (provider-agnostic)
cost_info = {"call_id": call_uuid}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
# Extract VAD and ambient noise config from workflow

View file

@ -4,7 +4,20 @@ This allows easy switching between different providers (Twilio, Vonage, etc.)
while keeping business logic decoupled from specific implementations.
"""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Dict, List, Optional
if TYPE_CHECKING:
from fastapi import WebSocket
@dataclass
class CallInitiationResult:
"""Standardized response from initiate_call across all providers."""
call_id: str # Provider's call identifier (SID for Twilio, UUID for Vonage)
status: str # Initial status (e.g., "queued", "initiated", "started")
provider_metadata: Dict[str, Any] = field(default_factory=dict) # Data that needs to be persisted
raw_response: Dict[str, Any] = field(default_factory=dict) # Full provider response for debugging
class TelephonyProvider(ABC):
@ -12,6 +25,8 @@ class TelephonyProvider(ABC):
Abstract base class for telephony providers.
All telephony providers must implement these core methods.
"""
PROVIDER_NAME = None
WEBHOOK_ENDPOINT = None
@abstractmethod
async def initiate_call(
@ -20,7 +35,7 @@ class TelephonyProvider(ABC):
webhook_url: str,
workflow_run_id: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
) -> CallInitiationResult:
"""
Initiate an outbound call.
@ -31,7 +46,7 @@ class TelephonyProvider(ABC):
**kwargs: Provider-specific additional parameters
Returns:
Dict containing call details (provider-specific format)
CallInitiationResult with standardized call details
"""
pass
@ -117,4 +132,45 @@ class TelephonyProvider(ABC):
- status: Call completion status
- raw_response: Full provider response for debugging
"""
pass
@abstractmethod
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse provider-specific status callback data into generic format.
Args:
data: Raw callback data from the provider
Returns:
Dict with standardized fields:
- call_id: Provider's call identifier
- status: Standardized status (completed, failed, busy, etc.)
- from_number: Optional caller number
- to_number: Optional recipient number
- duration: Optional call duration
- extra: Provider-specific additional data
"""
pass
@abstractmethod
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""
Handle provider-specific WebSocket connection for real-time call audio.
This method encapsulates all provider-specific WebSocket handshake and
message routing logic, keeping the main websocket endpoint clean.
Args:
websocket: The WebSocket connection
workflow_id: The workflow ID
user_id: The user ID
workflow_run_id: The workflow run ID
"""
pass

View file

@ -33,19 +33,11 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
logger.debug(f"Loading telephony config from database for org {organization_id}")
# Try new key first
config = await db_client.get_configuration(
organization_id,
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
# Fallback to old key for backward compatibility
if not config:
config = await db_client.get_configuration(
organization_id,
OrganizationConfigurationKey.TWILIO_CONFIGURATION.value,
)
if config and config.value:
# Simple single-provider format
provider = config.value.get("provider", "twilio")
@ -87,23 +79,18 @@ async def get_telephony_provider(
Raises:
ValueError: If provider type is unknown or configuration is invalid
"""
# Load configuration from appropriate source
# Load configuration
config = await load_telephony_config(organization_id)
provider_type = config.get("provider", "twilio")
logger.info(f"Creating {provider_type} telephony provider")
# Create provider instance with configuration
# Provider doesn't know or care if config came from env or database
if provider_type == "twilio":
return TwilioProvider(config)
elif provider_type == "vonage":
return VonageProvider(config)
# Future providers can be added here
# elif provider_type == "plivo":
# return PlivoProvider(config)
else:
raise ValueError(f"Unknown telephony provider: {provider_type}")
raise ValueError(f"Unknown telephony provider: {provider_type}")

View file

@ -1,15 +1,20 @@
"""
Twilio implementation of the TelephonyProvider interface.
"""
import json
import random
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
from loguru import logger
from twilio.request_validator import RequestValidator
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.base import CallInitiationResult, TelephonyProvider
from api.utils.tunnel import TunnelURLProvider
from api.enums import WorkflowRunMode
if TYPE_CHECKING:
from fastapi import WebSocket
class TwilioProvider(TelephonyProvider):
@ -17,6 +22,9 @@ class TwilioProvider(TelephonyProvider):
Twilio implementation of TelephonyProvider.
Accepts configuration and works the same regardless of OSS/SaaS mode.
"""
PROVIDER_NAME = WorkflowRunMode.TWILIO.value
WEBHOOK_ENDPOINT = "twiml"
def __init__(self, config: Dict[str, Any]):
"""
@ -44,7 +52,7 @@ class TwilioProvider(TelephonyProvider):
webhook_url: str,
workflow_run_id: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
) -> CallInitiationResult:
"""
Initiate an outbound call via Twilio.
"""
@ -67,14 +75,13 @@ class TwilioProvider(TelephonyProvider):
# Add status callback if workflow_run_id provided
if workflow_run_id:
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
callback_url = f"https://{backend_endpoint}/api/v1/telephony/status-callback/{workflow_run_id}"
callback_url = f"https://{backend_endpoint}/api/v1/telephony/twilio/status-callback/{workflow_run_id}"
data.update({
"StatusCallback": callback_url,
"StatusCallbackEvent": ["initiated", "ringing", "answered", "completed"],
"StatusCallbackMethod": "POST"
})
# Add any additional kwargs
data.update(kwargs)
# Make the API request
@ -85,7 +92,14 @@ class TwilioProvider(TelephonyProvider):
error_data = await response.json()
raise Exception(f"Failed to initiate call: {error_data}")
return await response.json()
response_data = await response.json()
return CallInitiationResult(
call_id=response_data["sid"],
status=response_data.get("status", "queued"),
provider_metadata={}, # Twilio doesn't need to persist extra data
raw_response=response_data
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
"""
@ -201,4 +215,75 @@ class TwilioProvider(TelephonyProvider):
"duration": 0,
"status": "error",
"error": str(e)
}
}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse Twilio status callback data into generic format.
"""
return {
"call_id": data.get("CallSid", ""),
"status": data.get("CallStatus", ""),
"from_number": data.get("From"),
"to_number": data.get("To"),
"direction": data.get("Direction"),
"duration": data.get("CallDuration") or data.get("Duration"),
"extra": data # Include all original data
}
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""
Handle Twilio-specific WebSocket connection.
Twilio sends:
1. "connected" event first
2. "start" event with streamSid and callSid
3. Then audio messages
"""
from api.services.pipecat.run_pipeline import run_pipeline_twilio
try:
# Wait for "connected" event
first_msg = await websocket.receive_text()
msg = json.loads(first_msg)
if msg.get("event") != "connected":
logger.error(f"Expected 'connected' event, got: {msg.get('event')}")
await websocket.close(code=4400, reason="Expected connected event")
return
logger.debug(f"Twilio WebSocket connected for workflow_run {workflow_run_id}")
# Wait for "start" event with stream details
start_msg = await websocket.receive_text()
logger.debug(f"Received start message: {start_msg}")
start_msg = json.loads(start_msg)
if start_msg.get("event") != "start":
logger.error("Expected 'start' event second")
await websocket.close(code=4400, reason="Expected start event")
return
# Extract Twilio-specific identifiers
try:
stream_sid = start_msg["start"]["streamSid"]
call_sid = start_msg["start"]["callSid"]
except KeyError:
logger.error("Missing streamSid or callSid in start message")
await websocket.close(code=4400, reason="Missing stream identifiers")
return
# Run the Twilio pipeline
await run_pipeline_twilio(
websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id
)
except Exception as e:
logger.error(f"Error in Twilio WebSocket handler: {e}")
raise

View file

@ -4,14 +4,18 @@ Vonage (Nexmo) implementation of the TelephonyProvider interface.
import json
import random
import time
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
import jwt
from loguru import logger
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.base import CallInitiationResult, TelephonyProvider
from api.utils.tunnel import TunnelURLProvider
from api.enums import WorkflowRunMode
if TYPE_CHECKING:
from fastapi import WebSocket
class VonageProvider(TelephonyProvider):
@ -19,7 +23,10 @@ class VonageProvider(TelephonyProvider):
Vonage implementation of TelephonyProvider.
Uses JWT authentication and NCCO for call control.
"""
PROVIDER_NAME = WorkflowRunMode.VONAGE.value
WEBHOOK_ENDPOINT = "ncco"
def __init__(self, config: Dict[str, Any]):
"""
Initialize VonageProvider with configuration.
@ -52,8 +59,8 @@ class VonageProvider(TelephonyProvider):
claims = {
"application_id": self.application_id,
"iat": int(time.time()),
"exp": int(time.time()) + 3600, # 1 hour expiry
"jti": str(time.time()) # Unique token ID
"exp": int(time.time()) + 3600,
"jti": str(time.time())
}
return jwt.encode(claims, self.private_key, algorithm="RS256")
@ -64,7 +71,7 @@ class VonageProvider(TelephonyProvider):
webhook_url: str,
workflow_run_id: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
) -> CallInitiationResult:
"""
Initiate an outbound call via Vonage Voice API.
"""
@ -75,7 +82,7 @@ class VonageProvider(TelephonyProvider):
# Select a random phone number
from_number = random.choice(self.from_numbers)
# Remove + prefix for Vonage
# Remove '+' prefix for Vonage
from_number = from_number.replace("+", "")
to_number = to_number.replace("+", "")
@ -98,13 +105,12 @@ class VonageProvider(TelephonyProvider):
# Add event webhook if workflow_run_id provided
if workflow_run_id:
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
event_url = f"https://{backend_endpoint}/api/v1/telephony/events/{workflow_run_id}"
event_url = f"https://{backend_endpoint}/api/v1/telephony/vonage/events/{workflow_run_id}"
data.update({
"event_url": [event_url],
"event_method": "POST"
})
# Add any additional kwargs
data.update(kwargs)
# Generate JWT token
@ -118,7 +124,7 @@ class VonageProvider(TelephonyProvider):
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
json=data, # Use json parameter for proper encoding
json=data,
headers=headers
) as response:
response_data = await response.json()
@ -126,7 +132,14 @@ class VonageProvider(TelephonyProvider):
if response.status != 201:
raise Exception(f"Failed to initiate call: {response_data}")
return response_data
return CallInitiationResult(
call_id=response_data["uuid"],
status=response_data.get("status", "started"),
provider_metadata={
"call_uuid": response_data["uuid"] # Vonage needs UUID persisted for WebSocket
},
raw_response=response_data
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
"""
@ -179,8 +192,7 @@ class VonageProvider(TelephonyProvider):
return False
try:
# Vonage sends JWT in Authorization header
# Verify the JWT signature
# Vonage sends JWT in Authorization header. Verify the JWT signature
decoded = jwt.decode(
signature,
self.api_secret,
@ -213,9 +225,16 @@ class VonageProvider(TelephonyProvider):
}
]
# Return JSON instead of XML
return json.dumps(ncco)
def _get_auth_headers(self) -> Dict[str, str]:
"""Generate authorization headers for Vonage API."""
token = self._generate_jwt()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
"""
Get cost information for a completed Vonage call.
@ -271,4 +290,101 @@ class VonageProvider(TelephonyProvider):
"duration": 0,
"status": "error",
"error": str(e)
}
}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse Vonage event callback data into generic format.
"""
# Map Vonage status to common format
status_map = {
"started": "initiated",
"ringing": "ringing",
"answered": "answered",
"complete": "completed",
"failed": "failed",
"busy": "busy",
"timeout": "no-answer",
"rejected": "busy"
}
return {
"call_id": data.get("uuid", ""),
"status": status_map.get(data.get("status", ""), data.get("status", "")),
"from_number": data.get("from"),
"to_number": data.get("to"),
"direction": data.get("direction"),
"duration": data.get("duration"),
"extra": data # Include all original data
}
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""
Handle Vonage-specific WebSocket connection.
Vonage can send:
1. JSON metadata first (websocket:connected event)
2. Or directly start with binary audio
"""
from api.db import db_client
from api.services.pipecat.run_pipeline import run_pipeline_vonage
try:
# Get workflow run to extract call UUID
workflow_run = await db_client.get_workflow_run(workflow_run_id)
if not workflow_run:
logger.error(f"Workflow run {workflow_run_id} not found")
await websocket.close(code=4404, reason="Workflow run not found")
return
# Get workflow for organization info
workflow = await db_client.get_workflow(workflow_id, user_id)
if not workflow:
logger.error(f"Workflow {workflow_id} not found")
await websocket.close(code=4404, reason="Workflow not found")
return
# Extract call UUID from workflow run context
call_uuid = workflow_run.gathered_context.get("call_uuid") if workflow_run.gathered_context else None
if not call_uuid:
logger.error(f"No call UUID found for Vonage connection in workflow run {workflow_run_id}")
await websocket.close(code=4400, reason="Missing call UUID")
return
logger.info(f"Vonage WebSocket connected for workflow_run {workflow_run_id}, call_uuid: {call_uuid}")
# Peek at first message to see if it's metadata or audio
first_msg = await websocket.receive()
if "text" in first_msg:
# JSON metadata - check if it's the connection event
msg = json.loads(first_msg["text"])
if msg.get("event") == "websocket:connected":
logger.debug(f"Received Vonage connection confirmation for {workflow_run_id}")
# Continue to pipeline regardless of message type
elif "bytes" in first_msg:
# Binary audio - Vonage started with audio immediately
logger.debug(f"Vonage started with binary audio for {workflow_run_id}")
# The pipeline will handle this first audio chunk
# Run the Vonage pipeline
await run_pipeline_vonage(
websocket,
call_uuid,
workflow,
workflow.organization_id,
workflow_id,
workflow_run_id,
user_id
)
except Exception as e:
logger.error(f"Error in Vonage WebSocket handler: {e}")
raise