dograh/api/services/telephony/providers/vobiz/provider.py

663 lines
24 KiB
Python
Raw Permalink Normal View History

"""
Vobiz implementation of the TelephonyProvider interface.
"""
import base64
import hashlib
import hmac
import json
import random
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlparse, urlunparse
import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
NormalizedInboundData,
ProviderSyncResult,
TelephonyProvider,
)
from api.utils.common import get_backend_endpoints
from api.utils.telephony_address import normalize_telephony_address
if TYPE_CHECKING:
from fastapi import WebSocket
class VobizProvider(TelephonyProvider):
"""
Vobiz implementation of TelephonyProvider.
Vobiz uses Plivo-compatible API and WebSocket protocol.
"""
PROVIDER_NAME = WorkflowRunMode.VOBIZ.value
WEBHOOK_ENDPOINT = "vobiz-xml"
def __init__(self, config: Dict[str, Any]):
"""
Initialize VobizProvider with configuration.
Args:
config: Dictionary containing:
- auth_id: Vobiz Account ID (e.g., MA_SYQRLN1K)
- auth_token: Vobiz Auth Token
- application_id: Vobiz Application ID whose answer_url is
updated by ``configure_inbound``
- from_numbers: List of phone numbers to use (E.164 format without +)
"""
self.auth_id = config.get("auth_id")
self.auth_token = config.get("auth_token")
self.application_id = config.get("application_id")
self.from_numbers = config.get("from_numbers", [])
# Handle both single number (string) and multiple numbers (list)
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
self.base_url = "https://api.vobiz.ai/api"
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
Initiate an outbound call via Vobiz.
Vobiz API differences from Twilio:
- Uses X-Auth-ID and X-Auth-Token headers instead of Basic Auth
- Expects JSON body instead of form data
- Phone numbers in E.164 format WITHOUT + prefix (e.g., 14155551234)
- Returns "call_uuid" instead of "sid"
"""
if not self.validate_config():
raise ValueError("Vobiz provider not properly configured")
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/"
# Use provided from_number or select a random one
if from_number is None:
from_number = random.choice(self.from_numbers)
logger.info(f"Selected Vobiz phone number {from_number} for outbound call")
# Remove + prefix if present (Vobiz expects E.164 without +)
to_number_clean = to_number.lstrip("+")
from_number_clean = from_number.lstrip("+")
# Prepare call data (JSON format)
data = {
"from": from_number_clean,
"to": to_number_clean,
"answer_url": webhook_url,
"answer_method": "POST",
}
# Add hangup callback if workflow_run_id provided
if workflow_run_id:
backend_endpoint, _ = await get_backend_endpoints()
hangup_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}"
ring_url = f"{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}"
data.update(
{
"hangup_url": hangup_url,
"hangup_method": "POST",
"ring_url": ring_url,
"ring_method": "POST",
}
)
# Add optional parameters
data.update(kwargs)
# Make the API request
headers = {
"X-Auth-ID": self.auth_id,
"X-Auth-Token": self.auth_token,
"Content-Type": "application/json",
}
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, json=data, headers=headers) as response:
if response.status != 201:
error_data = await response.text()
logger.error(f"Vobiz API error: {error_data}")
raise HTTPException(
status_code=response.status,
detail=f"Failed to initiate Vobiz call: {error_data}",
)
response_data = await response.json()
logger.info(f"Vobiz API response: {response_data}")
# Extract call_uuid with multiple fallback options
call_id = (
response_data.get("call_uuid")
or response_data.get("CallUUID")
or response_data.get("request_uuid")
or response_data.get("RequestUUID")
)
if not call_id:
logger.error(
f"No call ID found in Vobiz response. Available keys: {list(response_data.keys())}"
)
raise HTTPException(
status_code=response.status,
detail=f"Vobiz API response missing call identifier. Response: {response_data}"
f"Vobiz API response missing call identifier. Response: {response_data}",
)
logger.info(f"Vobiz call initiated successfully. Call ID: {call_id}")
return CallInitiationResult(
call_id=call_id,
status="queued", # Vobiz returns "message": "call fired"
2026-04-02 14:48:13 +05:30
caller_number=from_number,
2026-02-13 11:44:10 +05:30
provider_metadata={"call_id": call_id},
raw_response=response_data,
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
"""
Get the current status of a Vobiz call (CDR).
Vobiz returns:
- call_uuid, status, duration, billed_duration
- call_rate, total_cost (for billing)
"""
if not self.validate_config():
raise ValueError("Vobiz provider not properly configured")
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/"
headers = {"X-Auth-ID": self.auth_id, "X-Auth-Token": self.auth_token}
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers=headers) as response:
if response.status != 200:
error_data = await response.text()
logger.error(f"Failed to get Vobiz call status: {error_data}")
raise Exception(f"Failed to get call status: {error_data}")
return await response.json()
async def get_available_phone_numbers(self) -> List[str]:
"""
Get list of available Vobiz phone numbers.
"""
return self.from_numbers
def validate_config(self) -> bool:
"""
Validate Vobiz configuration.
"""
return bool(self.auth_id and self.auth_token and self.from_numbers)
async def verify_webhook_signature(
self,
url: str,
params: Dict[str, Any],
signature: str,
nonce: str = None,
body: str = "",
signature_version: str = "v3",
) -> bool:
"""
Verify Vobiz webhook signature for security.
Vobiz signs the callback base URL (query parameters stripped) with
the account auth token and a request nonce:
- V2: base64(HMAC-SHA256(auth_token, baseURL + nonce))
- V3: base64(HMAC-SHA256(auth_token, baseURL + "." + nonce))
"""
if not signature or not nonce:
logger.warning("Missing signature or nonce headers for Vobiz webhook")
return False
if not self.auth_token:
logger.error(
"No auth_token available for Vobiz webhook signature verification"
)
return False
version = signature_version.lower()
if version not in {"v2", "v3"}:
logger.warning(f"Unsupported Vobiz signature version: {signature_version}")
return False
parsed_url = urlparse(url)
base_url = urlunparse(
(parsed_url.scheme, parsed_url.netloc, parsed_url.path, "", "", "")
)
signed_payload = base_url + (f".{nonce}" if version == "v3" else nonce)
expected_signature = base64.b64encode(
hmac.new(
self.auth_token.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
).digest()
).decode("ascii")
is_valid = hmac.compare_digest(expected_signature, signature)
if not is_valid:
logger.warning(
f"Vobiz webhook signature mismatch. Expected: {expected_signature[:8]}..., Got: {signature[:8]}..."
)
return is_valid
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
"""
Generate Vobiz XML response for starting a call session.
Vobiz uses <Stream> element similar to Twilio but with Plivo-compatible attributes:
- bidirectional: Enable two-way audio
- audioTrack: Which audio to stream (inbound, outbound, both)
- contentType: audio/x-mulaw;rate=8000
"""
_, wss_backend_endpoint = await get_backend_endpoints()
vobiz_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}</Stream>
</Response>"""
return vobiz_xml
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
"""
Get cost information for a completed Vobiz call.
Vobiz returns cost in the same CDR endpoint:
- total_cost: Positive string (e.g., "0.04")
- call_rate: Per-minute rate (e.g., "0.02")
- billed_duration: Billable seconds (integer)
Args:
call_id: The Vobiz call_uuid
Returns:
Dict containing cost information
"""
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/"
try:
headers = {"X-Auth-ID": self.auth_id, "X-Auth-Token": self.auth_token}
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers=headers) as response:
if response.status != 200:
error_data = await response.text()
logger.error(f"Failed to get Vobiz call cost: {error_data}")
return {
"cost_usd": 0.0,
"duration": 0,
"status": "error",
"error": str(error_data),
}
call_data = await response.json()
# Vobiz returns cost as positive string (e.g., "0.04")
total_cost_str = call_data.get("total_cost", "0")
cost_usd = float(total_cost_str) if total_cost_str else 0.0
# Duration is billed_duration in seconds (integer)
duration = int(call_data.get("billed_duration", 0))
return {
"cost_usd": cost_usd,
"duration": duration,
"status": call_data.get("status", "unknown"),
"price_unit": "USD", # Vobiz always uses USD
"call_rate": call_data.get("call_rate", "0"),
"raw_response": call_data,
}
except Exception as e:
logger.error(f"Exception fetching Vobiz call cost: {e}")
return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse Vobiz status callback data into generic format.
Vobiz sends callbacks to hangup_url and ring_url with:
- call_uuid (instead of CallSid)
- status, from, to, duration, etc.
"""
return {
"call_id": data.get("CallUUID", ""),
"status": data.get("CallStatus", ""),
"from_number": data.get("From"),
"to_number": data.get("To"),
"direction": data.get("Direction"),
"duration": data.get("Duration"),
"extra": data,
}
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""
Handle Vobiz WebSocket connection using Vobiz WebSocket protocol.
Extracts stream_id and call_id from the start event and delegates
message handling to VobizFrameSerializer.
"""
from api.services.pipecat.run_pipeline import run_pipeline_telephony
first_msg = await websocket.receive_text()
start_msg = json.loads(first_msg)
logger.debug(f"Received the first message: {start_msg}")
# Validate that this is a start event
if start_msg.get("event") != "start":
logger.error(f"Expected 'start' event, got: {start_msg.get('event')}")
await websocket.close(code=4400, reason="Expected start event")
return
logger.debug(f"Vobiz WebSocket connected for workflow_run {workflow_run_id}")
try:
# Extract stream_id and call_id from the start event
start_data = start_msg.get("start", {})
stream_id = start_data.get("streamId")
call_id = start_data.get("callId")
if not stream_id or not call_id:
logger.error(f"Missing streamId or callId in start event: {start_data}")
await websocket.close(code=4400, reason="Missing streamId or callId")
return
logger.info(
f"[run {workflow_run_id}] Starting Vobiz WebSocket handler - "
f"stream_id: {stream_id}, call_id: {call_id}"
)
await run_pipeline_telephony(
websocket,
provider_name=self.PROVIDER_NAME,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
user_id=user_id,
call_id=call_id,
transport_kwargs={"stream_id": stream_id, "call_id": call_id},
)
logger.info(f"[run {workflow_run_id}] Vobiz pipeline completed")
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Error in Vobiz WebSocket handler: {e}"
)
raise
# ======== INBOUND CALL METHODS ========
@classmethod
def can_handle_webhook(
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
) -> bool:
"""
Determine if this provider can handle the incoming webhook.
Vobiz webhooks contain CallUUID field.
"""
return "vobiz" in headers.get("user-agent", "").lower()
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
"""
Parse Vobiz-specific inbound webhook data into normalized format.
"""
# Vobiz webhooks don't carry country info, and our deployment is
# India-only today — hardcode "IN" so leading-0 trunk-prefix numbers
# (e.g. "02271264296") normalize to the right E.164 ("+912271264296").
# Revisit if/when we onboard a non-Indian Vobiz customer.
country = "IN"
from_raw = webhook_data.get("From", "")
to_raw = webhook_data.get("To", "")
return NormalizedInboundData(
provider=VobizProvider.PROVIDER_NAME,
call_id=webhook_data.get("CallUUID", ""),
from_number=normalize_telephony_address(
from_raw, country_hint=country
).canonical
if from_raw
else "",
to_number=normalize_telephony_address(
to_raw, country_hint=country
).canonical
if to_raw
else "",
direction=webhook_data.get("Direction", ""),
call_status=webhook_data.get("CallStatus", ""),
account_id=webhook_data.get("ParentAuthID"),
from_country=country,
to_country=country,
raw_data=webhook_data,
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
"""Validate Vobiz auth_id from webhook matches configuration"""
if not webhook_account_id:
return False
stored_auth_id = config_data.get("auth_id")
return stored_auth_id == webhook_account_id
async def verify_inbound_signature(
self,
url: str,
webhook_data: Dict[str, Any],
headers: Dict[str, str],
body: str = "",
) -> bool:
"""
Verify the signature of an inbound Vobiz webhook for security.
Uses Vobiz's documented V3/V2 HMAC-SHA256 callback signatures.
"""
normalized_headers = {key.lower(): value for key, value in headers.items()}
signature = normalized_headers.get(
"x-vobiz-signature-v3"
) or normalized_headers.get("x-vobiz-signature-ma-v3", "")
nonce = normalized_headers.get("x-vobiz-signature-v3-nonce")
signature_version = "v3"
if not signature:
signature = normalized_headers.get(
"x-vobiz-signature-v2"
) or normalized_headers.get("x-vobiz-signature-ma-v2", "")
nonce = normalized_headers.get("x-vobiz-signature-v2-nonce")
signature_version = "v2"
if not signature:
logger.warning("Inbound Vobiz webhook missing X-Vobiz-Signature-V3/V2")
return False
return await self.verify_webhook_signature(
url,
webhook_data,
signature,
nonce,
body,
signature_version=signature_version,
)
async def configure_inbound(
self, address: str, webhook_url: Optional[str]
) -> ProviderSyncResult:
"""Update answer_url on the Vobiz Application (Plivo-compatible model).
Vobiz's update is partial so we POST only ``answer_url`` and
``answer_method`` ``app_name``, ``hangup_url``, etc. stay as the
user set them. The URL is shared across every number on the
application clearing is a no-op to avoid silently breaking
inbound for sibling numbers.
"""
if webhook_url is None:
logger.info(
f"Vobiz configure_inbound clear for {address}: skipping "
f"application update (answer_url is shared across all numbers "
f"on application {self.application_id})"
)
return ProviderSyncResult(ok=True)
if not self.validate_config():
return ProviderSyncResult(
ok=False, message="Vobiz provider not properly configured"
)
if not self.application_id:
return ProviderSyncResult(
ok=False,
message=(
"Vobiz application_id is not configured. Set it in the "
"telephony configuration so inbound webhooks can be "
"synced to the right Application."
),
)
app_endpoint = (
f"{self.base_url}/v1/Account/{self.auth_id}/Application/"
f"{self.application_id}/"
)
data = {
"answer_url": webhook_url,
"answer_method": "POST",
}
headers = {
"X-Auth-ID": self.auth_id,
"X-Auth-Token": self.auth_token,
"Content-Type": "application/json",
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
app_endpoint, json=data, headers=headers
) as response:
if response.status not in (200, 202):
body = await response.text()
logger.error(
f"Vobiz application update failed for "
f"{self.application_id}: {response.status} {body}"
)
return ProviderSyncResult(
ok=False,
message=f"Vobiz API {response.status}: {body}",
)
except Exception as e:
logger.error(
f"Exception updating Vobiz application {self.application_id}: {e}"
)
return ProviderSyncResult(ok=False, message=f"Vobiz update failed: {e}")
logger.info(
f"Vobiz answer_url set on application {self.application_id} "
f"(triggered by address {address})"
)
return ProviderSyncResult(ok=True)
async def start_inbound_stream(
self,
*,
websocket_url: str,
workflow_run_id: int,
normalized_data,
backend_endpoint: str,
):
"""
Generate Vobiz XML response for an inbound webhook.
Note: For hangup callbacks, configure the hangup_url manually in Vobiz dashboard
to point to: /api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}
"""
from fastapi import Response
vobiz_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">{websocket_url}</Stream>
</Response>"""
return Response(content=vobiz_xml, media_type="application/xml")
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:
"""
Generate a Vobiz-specific error response.
"""
from fastapi import Response
# Vobiz error responses should be valid XML like Plivo
vobiz_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Speak voice="WOMAN">Sorry, there was an error processing your call. {message}</Speak>
<Hangup/>
</Response>"""
return Response(content=vobiz_xml, media_type="application/xml")
@staticmethod
def generate_validation_error_response(error_type) -> tuple:
"""
Generate Vobiz-specific error response for validation failures with organizational debugging info.
"""
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
)
vobiz_xml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Speak voice="WOMAN">{message}</Speak>
<Hangup/>
</Response>"""
return Response(content=vobiz_xml_content, media_type="application/xml")
# ======== CALL TRANSFER METHODS ========
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Vobiz provider does not support call transfers.
Raises:
NotImplementedError: Vobiz call transfers are yet to be implemented
"""
raise NotImplementedError("Vobiz provider does not support call transfers")
def supports_transfers(self) -> bool:
"""
Vobiz does not support call transfers.
Returns:
False - Vobiz provider does not support call transfers
"""
return False