feat: added vobiz telephony (#65)

* feat: added vobiz telephony

* chore: run formatter

* chore: add migration

* Add tsclient

---------

Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
This commit is contained in:
Piyush Sahoo 2025-11-28 09:36:04 +05:30 committed by GitHub
parent 749a0c557f
commit 09897cb5d8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 994 additions and 19 deletions

View file

@ -0,0 +1,42 @@
"""add vobiz mode for workflow
Revision ID: a188ff90e76f
Revises: e02f387b7538
Create Date: 2025-11-27 21:24:34.072030
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from alembic_postgresql_enum import TableReference
# revision identifiers, used by Alembic.
revision: str = 'a188ff90e76f'
down_revision: Union[str, None] = 'e02f387b7538'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema='public',
enum_name='workflow_run_mode',
new_values=['twilio', 'vonage', 'vobiz', 'stasis', 'webrtc', 'smallwebrtc', 'VOICE', 'CHAT'],
affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')],
enum_values_to_rename=[],
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema='public',
enum_name='workflow_run_mode',
new_values=['twilio', 'vonage', 'stasis', 'webrtc', 'smallwebrtc', 'VOICE', 'CHAT'],
affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')],
enum_values_to_rename=[],
)
# ### end Alembic commands ###

View file

@ -15,6 +15,7 @@ class Environment(Enum):
class WorkflowRunMode(Enum):
TWILIO = "twilio"
VONAGE = "vonage"
VOBIZ = "vobiz"
STASIS = "stasis"
WEBRTC = "webrtc"
SMALLWEBRTC = "smallwebrtc"

View file

@ -9,6 +9,8 @@ from api.schemas.telephony_config import (
TelephonyConfigurationResponse,
TwilioConfigurationRequest,
TwilioConfigurationResponse,
VobizConfigurationRequest,
VobizConfigurationResponse,
VonageConfigurationRequest,
VonageConfigurationResponse,
)
@ -21,6 +23,7 @@ router = APIRouter(prefix="/organizations", tags=["organizations"])
PROVIDER_MASKED_FIELDS = {
"twilio": ["account_sid", "auth_token"],
"vonage": ["private_key", "api_key", "api_secret"],
"vobiz": ["auth_id", "auth_token"],
}
@ -56,6 +59,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
from_numbers=from_numbers,
),
vonage=None,
vobiz=None,
)
elif stored_provider == "vonage":
application_id = config.value.get("application_id", "")
@ -78,6 +82,24 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
api_secret=mask_key(api_secret) if api_secret else None,
from_numbers=from_numbers,
),
vobiz=None,
)
elif stored_provider == "vobiz":
auth_id = config.value.get("auth_id", "")
auth_token = config.value.get("auth_token", "")
from_numbers = (
config.value.get("from_numbers", []) if auth_id and auth_token else []
)
return TelephonyConfigurationResponse(
twilio=None,
vonage=None,
vobiz=VobizConfigurationResponse(
provider="vobiz",
auth_id=mask_key(auth_id) if auth_id else "",
auth_token=mask_key(auth_token) if auth_token else "",
from_numbers=from_numbers,
),
)
else:
return TelephonyConfigurationResponse()
@ -85,7 +107,11 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
@router.post("/telephony-config")
async def save_telephony_configuration(
request: Union[TwilioConfigurationRequest, VonageConfigurationRequest],
request: Union[
TwilioConfigurationRequest,
VonageConfigurationRequest,
VobizConfigurationRequest,
],
user: UserModel = Depends(get_user),
):
"""Save telephony configuration for the user's organization."""
@ -115,6 +141,13 @@ async def save_telephony_configuration(
"api_secret": getattr(request, "api_secret", None),
"from_numbers": request.from_numbers,
}
elif request.provider == "vobiz":
config_value = {
"provider": "vobiz",
"auth_id": request.auth_id,
"auth_token": request.auth_token,
"from_numbers": request.from_numbers,
}
else:
raise HTTPException(
status_code=400, detail=f"Unsupported provider: {request.provider}"

View file

@ -234,7 +234,10 @@ async def websocket_endpoint(
provider_type = workflow_run.gathered_context.get("provider")
if not provider_type:
logger.error(f"No provider type found in workflow run {workflow_run_id}")
logger.error(
f"No provider type found in workflow run {workflow_run_id}. "
f"gathered_context: {workflow_run.gathered_context}, mode: {workflow_run.mode}"
)
await websocket.close(code=4400, reason="Provider type not found")
return
@ -483,3 +486,160 @@ async def handle_vonage_events(
# Return 204 No Content as expected by Vonage
return {"status": "ok"}
@router.post("/vobiz-xml", include_in_schema=False)
async def handle_vobiz_xml_webhook(
workflow_id: int, user_id: int, workflow_run_id: int, organization_id: int
):
"""
Handle initial webhook from Vobiz when call is answered.
Returns Vobiz XML response with Stream element.
Vobiz uses Plivo-compatible XML format similar to Twilio's TwiML.
"""
logger.info(
f"[run {workflow_run_id}] Vobiz XML webhook called - "
f"workflow_id={workflow_id}, user_id={user_id}, org_id={organization_id}"
)
provider = await get_telephony_provider(organization_id)
logger.debug(f"[run {workflow_run_id}] Using provider: {provider.PROVIDER_NAME}")
response_content = await provider.get_webhook_response(
workflow_id, user_id, workflow_run_id
)
logger.debug(
f"[run {workflow_run_id}] Vobiz XML response generated:\n{response_content}"
)
return HTMLResponse(content=response_content, media_type="application/xml")
@router.post("/vobiz/hangup-callback/{workflow_run_id}")
async def handle_vobiz_hangup_callback(
workflow_run_id: int,
request: Request,
):
"""Handle Vobiz hangup callback (sent when call ends).
Vobiz sends callbacks to hangup_url when the call terminates.
This includes call duration, status, and billing information.
"""
# Parse the callback data (Vobiz sends form data or JSON)
try:
callback_data = await request.json()
logger.info(
f"[run {workflow_run_id}] Received Vobiz hangup callback (JSON): "
f"{json.dumps(callback_data)}"
)
except Exception:
# Fallback to form data if JSON fails
form_data = await request.form()
callback_data = dict(form_data)
logger.info(
f"[run {workflow_run_id}] Received Vobiz hangup callback (form): "
f"{json.dumps(callback_data)}"
)
# Get workflow run for processing
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found for Vobiz hangup callback"
)
return {"status": "ignored", "reason": "workflow_run_not_found"}
# Get workflow and provider
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(f"[run {workflow_run_id}] Workflow not found")
return {"status": "ignored", "reason": "workflow_not_found"}
provider = await get_telephony_provider(workflow.organization_id)
logger.debug(
f"[run {workflow_run_id}] Processing Vobiz hangup with provider: {provider.PROVIDER_NAME}"
)
# Parse the callback data into generic format
parsed_data = provider.parse_status_callback(callback_data)
logger.debug(
f"[run {workflow_run_id}] Parsed Vobiz callback data: {json.dumps(parsed_data)}"
)
# Create StatusCallbackRequest from parsed data
status_update = StatusCallbackRequest(
call_id=parsed_data["call_id"],
status=parsed_data["status"],
from_number=parsed_data.get("from_number"),
to_number=parsed_data.get("to_number"),
direction=parsed_data.get("direction"),
duration=parsed_data.get("duration"),
extra=parsed_data.get("extra", {}),
)
# Process the status update
await _process_status_update(workflow_run_id, status_update, workflow_run)
logger.info(f"[run {workflow_run_id}] Vobiz hangup callback processed successfully")
return {"status": "success"}
@router.post("/vobiz/ring-callback/{workflow_run_id}")
async def handle_vobiz_ring_callback(
workflow_run_id: int,
request: Request,
):
"""Handle Vobiz ring callback (sent when call starts ringing).
Vobiz can send callbacks to ring_url when the call starts ringing.
This is optional and used for tracking ringing status.
"""
# Parse the callback data
try:
callback_data = await request.json()
logger.info(
f"[run {workflow_run_id}] Received Vobiz ring callback (JSON): "
f"{json.dumps(callback_data)}"
)
except Exception:
form_data = await request.form()
callback_data = dict(form_data)
logger.info(
f"[run {workflow_run_id}] Received Vobiz ring callback (form): "
f"{json.dumps(callback_data)}"
)
# Get workflow run for processing
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found for Vobiz ring callback"
)
return {"status": "ignored", "reason": "workflow_run_not_found"}
# Log the ringing event
telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", [])
ring_log = {
"status": "ringing",
"timestamp": datetime.now(UTC).isoformat(),
"call_id": callback_data.get("call_uuid", callback_data.get("CallUUID", "")),
"event_type": "ring",
"raw_data": callback_data,
}
telephony_callback_logs.append(ring_log)
# Update workflow run logs
await db_client.update_workflow_run(
run_id=workflow_run_id,
logs={"telephony_status_callbacks": telephony_callback_logs},
)
logger.info(f"[run {workflow_run_id}] Vobiz ring callback logged")
return {"status": "success"}

View file

@ -47,8 +47,31 @@ class VonageConfigurationResponse(BaseModel):
from_numbers: List[str]
class VobizConfigurationRequest(BaseModel):
"""Request schema for Vobiz configuration."""
provider: str = Field(default="vobiz")
auth_id: str = Field(..., description="Vobiz Account ID (e.g., MA_SYQRLN1K)")
auth_token: str = Field(..., description="Vobiz Auth Token")
from_numbers: List[str] = Field(
...,
min_length=1,
description="List of Vobiz phone numbers (E.164 without + prefix)",
)
class VobizConfigurationResponse(BaseModel):
"""Response schema for Vobiz configuration with masked sensitive fields."""
provider: str
auth_id: str # Masked (e.g., "****************L1NK")
auth_token: str # Masked (e.g., "****************KEFO")
from_numbers: List[str]
class TelephonyConfigurationResponse(BaseModel):
"""Top-level telephony configuration response."""
twilio: Optional[TwilioConfigurationResponse] = None
vonage: Optional[VonageConfigurationResponse] = None
vobiz: Optional[VobizConfigurationResponse] = None

View file

@ -85,7 +85,12 @@ def create_audio_config(transport_type: str) -> AudioConfig:
Returns:
AudioConfig instance with appropriate settings
"""
if transport_type in (WorkflowRunMode.STASIS.value, WorkflowRunMode.TWILIO.value):
if transport_type in (
WorkflowRunMode.STASIS.value,
WorkflowRunMode.TWILIO.value,
WorkflowRunMode.VOBIZ.value,
):
# Twilio, Vobiz, and Stasis use MULAW at 8kHz
return AudioConfig(
transport_in_sample_rate=8000,
transport_out_sample_rate=8000,

View file

@ -31,6 +31,7 @@ from api.services.pipecat.tracing_config import setup_pipeline_tracing
from api.services.pipecat.transport_setup import (
create_stasis_transport,
create_twilio_transport,
create_vobiz_transport,
create_vonage_transport,
create_webrtc_transport,
)
@ -174,6 +175,70 @@ async def run_pipeline_vonage(
raise
async def run_pipeline_vobiz(
websocket_client: WebSocket,
stream_id: str,
call_id: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Vobiz using Plivo-compatible WebSocket protocol."""
logger.info(
f"[run {workflow_run_id}] Starting Vobiz pipeline - "
f"stream_id={stream_id}, call_id={call_id}, workflow_id={workflow_id}"
)
set_current_run_id(workflow_run_id)
cost_info = {"call_id": call_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
workflow = await db_client.get_workflow(workflow_id, user_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
audio_config = create_audio_config(WorkflowRunMode.VOBIZ.value)
logger.info(
f"[run {workflow_run_id}] Vobiz audio config: "
f"sample_rate={audio_config.transport_in_sample_rate}Hz, format=MULAW"
)
transport = await create_vobiz_transport(
websocket_client,
stream_id,
call_id,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
logger.info(f"[run {workflow_run_id}] Starting Vobiz pipeline execution")
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
logger.info(f"[run {workflow_run_id}] Vobiz pipeline completed successfully")
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Error in Vobiz pipeline: {e}", exc_info=True
)
raise
async def run_pipeline_smallwebrtc(
webrtc_connection: SmallWebRTCConnection,
workflow_id: int,

View file

@ -21,6 +21,7 @@ from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams
from pipecat.serializers.plivo import PlivoFrameSerializer
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.serializers.vonage import VonageFrameSerializer
from pipecat.transports.base_transport import TransportParams
@ -233,6 +234,117 @@ async def create_vonage_transport(
)
async def create_vobiz_transport(
websocket_client: WebSocket,
stream_id: str,
call_id: str,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
vad_config: dict | None = None,
ambient_noise_config: dict | None = None,
):
"""Create a transport for Vobiz connections.
Vobiz uses Plivo-compatible WebSocket protocol:
- MULAW audio at 8kHz (same as Twilio)
- Base64-encoded audio in JSON messages
- PlivoFrameSerializer handles the protocol
"""
from loguru import logger
logger.info(
f"[run {workflow_run_id}] Creating Vobiz transport - "
f"stream_id={stream_id}, call_id={call_id}"
)
# Load Vobiz configuration from database
from api.services.telephony.factory import load_telephony_config
config = await load_telephony_config(organization_id)
if config.get("provider") != "vobiz":
raise ValueError(f"Expected Vobiz provider, got {config.get('provider')}")
auth_id = config.get("auth_id")
auth_token = config.get("auth_token")
if not auth_id or not auth_token:
raise ValueError(
f"Incomplete Vobiz configuration for organization {organization_id}"
)
logger.debug(
f"[run {workflow_run_id}] Vobiz config loaded - auth_id={auth_id}, "
f"from_numbers={len(config.get('from_numbers', []))} numbers"
)
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
# Use PlivoFrameSerializer for Vobiz (Plivo-compatible protocol)
serializer = PlivoFrameSerializer(
stream_id=stream_id,
call_id=call_id,
auth_id=auth_id,
auth_token=auth_token,
params=PlivoFrameSerializer.InputParams(
plivo_sample_rate=8000, # Vobiz uses MULAW at 8kHz
sample_rate=audio_config.pipeline_sample_rate,
),
)
logger.debug(
f"[run {workflow_run_id}] PlivoFrameSerializer created for Vobiz - "
f"transport_rate=8000Hz, pipeline_rate={audio_config.pipeline_sample_rate}Hz"
)
# Create WebSocket transport (same structure as Twilio/Vonage)
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
vad_analyzer=(
SileroVADAnalyzer(
params=VADParams(
confidence=vad_config.get("confidence", 0.7),
start_secs=vad_config.get("start_seconds", 0.4),
stop_secs=vad_config.get("stop_seconds", 0.8),
min_volume=vad_config.get("minimum_volume", 0.6),
)
)
if vad_config
else SileroVADAnalyzer()
),
audio_out_mixer=(
SoundfileMixer(
sound_files={
"office": APP_ROOT_DIR
/ "assets"
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
},
default_sound="office",
volume=ambient_noise_config.get("volume", 0.3),
)
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
serializer=serializer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
)
logger.info(
f"[run {workflow_run_id}] Vobiz transport created successfully (VAD enabled)"
)
return transport
def create_webrtc_transport(
webrtc_connection: SmallWebRTCConnection,
workflow_run_id: int,

View file

@ -12,6 +12,7 @@ from api.db import db_client
from api.enums import OrganizationConfigurationKey
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.providers.twilio_provider import TwilioProvider
from api.services.telephony.providers.vobiz_provider import VobizProvider
from api.services.telephony.providers.vonage_provider import VonageProvider
@ -58,6 +59,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"api_secret": config.value.get("api_secret"),
"from_numbers": config.value.get("from_numbers", []),
}
elif provider == "vobiz":
return {
"provider": "vobiz",
"auth_id": config.value.get("auth_id"),
"auth_token": config.value.get("auth_token"),
"from_numbers": config.value.get("from_numbers", []),
}
else:
raise ValueError(f"Unknown provider in config: {provider}")
@ -92,5 +100,8 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider:
elif provider_type == "vonage":
return VonageProvider(config)
elif provider_type == "vobiz":
return VobizProvider(config)
else:
raise ValueError(f"Unknown telephony provider: {provider_type}")

View file

@ -0,0 +1,321 @@
"""
Vobiz implementation of the TelephonyProvider interface.
"""
import random
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
from loguru import logger
from api.enums import WorkflowRunMode
from api.services.telephony.base import CallInitiationResult, TelephonyProvider
from api.utils.tunnel import TunnelURLProvider
if TYPE_CHECKING:
from fastapi import WebSocket
class VobizProvider(TelephonyProvider):
"""
Vobiz implementation of TelephonyProvider.
Vobiz uses Plivo-compatible API and WebSocket protocol.
"""
PROVIDER_NAME = WorkflowRunMode.VOBIZ.value
WEBHOOK_ENDPOINT = "vobiz-xml"
def __init__(self, config: Dict[str, Any]):
"""
Initialize VobizProvider with configuration.
Args:
config: Dictionary containing:
- auth_id: Vobiz Account ID (e.g., MA_SYQRLN1K)
- auth_token: Vobiz Auth Token
- from_numbers: List of phone numbers to use (E.164 format without +)
"""
self.auth_id = config.get("auth_id")
self.auth_token = config.get("auth_token")
self.from_numbers = config.get("from_numbers", [])
# Handle both single number (string) and multiple numbers (list)
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
self.base_url = "https://api.vobiz.ai/api"
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
Initiate an outbound call via Vobiz.
Vobiz API differences from Twilio:
- Uses X-Auth-ID and X-Auth-Token headers instead of Basic Auth
- Expects JSON body instead of form data
- Phone numbers in E.164 format WITHOUT + prefix (e.g., 14155551234)
- Returns "call_uuid" instead of "sid"
"""
if not self.validate_config():
raise ValueError("Vobiz provider not properly configured")
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/"
# Select a random phone number
from_number = random.choice(self.from_numbers)
logger.info(f"Selected Vobiz phone number {from_number} for outbound call")
# Remove + prefix if present (Vobiz expects E.164 without +)
to_number_clean = to_number.lstrip("+")
from_number_clean = from_number.lstrip("+")
# Prepare call data (JSON format)
data = {
"from": from_number_clean,
"to": to_number_clean,
"answer_url": webhook_url,
"answer_method": "POST",
}
# Add hangup callback if workflow_run_id provided
if workflow_run_id:
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
hangup_url = f"https://{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}"
ring_url = f"https://{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}"
data.update(
{
"hangup_url": hangup_url,
"hangup_method": "POST",
"ring_url": ring_url,
"ring_method": "POST",
}
)
# Add optional parameters
data.update(kwargs)
# Make the API request
headers = {
"X-Auth-ID": self.auth_id,
"X-Auth-Token": self.auth_token,
"Content-Type": "application/json",
}
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, json=data, headers=headers) as response:
if response.status != 201:
error_data = await response.text()
logger.error(f"Vobiz API error: {error_data}")
raise Exception(f"Failed to initiate Vobiz call: {error_data}")
response_data = await response.json()
logger.info(f"Vobiz API response: {response_data}")
# Extract call_uuid with multiple fallback options
call_id = (
response_data.get("call_uuid")
or response_data.get("CallUUID")
or response_data.get("request_uuid")
or response_data.get("RequestUUID")
)
if not call_id:
logger.error(
f"No call ID found in Vobiz response. Available keys: {list(response_data.keys())}"
)
raise Exception(
f"Vobiz API response missing call identifier. Response: {response_data}"
)
logger.info(f"Vobiz call initiated successfully. Call ID: {call_id}")
return CallInitiationResult(
call_id=call_id,
status="queued", # Vobiz returns "message": "call fired"
provider_metadata={},
raw_response=response_data,
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
"""
Get the current status of a Vobiz call (CDR).
Vobiz returns:
- call_uuid, status, duration, billed_duration
- call_rate, total_cost (for billing)
"""
if not self.validate_config():
raise ValueError("Vobiz provider not properly configured")
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/"
headers = {"X-Auth-ID": self.auth_id, "X-Auth-Token": self.auth_token}
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers=headers) as response:
if response.status != 200:
error_data = await response.text()
logger.error(f"Failed to get Vobiz call status: {error_data}")
raise Exception(f"Failed to get call status: {error_data}")
return await response.json()
async def get_available_phone_numbers(self) -> List[str]:
"""
Get list of available Vobiz phone numbers.
"""
return self.from_numbers
def validate_config(self) -> bool:
"""
Validate Vobiz configuration.
"""
return bool(self.auth_id and self.auth_token and self.from_numbers)
async def verify_webhook_signature(
self, url: str, params: Dict[str, Any], signature: str
) -> bool:
"""
Verify Vobiz webhook signature for security.
Vobiz uses Plivo-compatible signature verification (HMAC-SHA256).
For now, returning True to allow testing.
TODO: Implement proper signature verification based on Vobiz docs.
"""
# Plivo/Vobiz signature verification would go here
# For development, we can skip signature verification
# In production, implement HMAC-SHA256 verification
logger.warning("Vobiz webhook signature verification not yet implemented")
return True
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
"""
Generate Vobiz XML response for starting a call session.
Vobiz uses <Stream> element similar to Twilio but with Plivo-compatible attributes:
- bidirectional: Enable two-way audio
- audioTrack: Which audio to stream (inbound, outbound, both)
- contentType: audio/x-mulaw;rate=8000
"""
backend_endpoint = await TunnelURLProvider.get_tunnel_url()
vobiz_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">wss://{backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}</Stream>
</Response>"""
return vobiz_xml
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
"""
Get cost information for a completed Vobiz call.
Vobiz returns cost in the same CDR endpoint:
- total_cost: Positive string (e.g., "0.04")
- call_rate: Per-minute rate (e.g., "0.02")
- billed_duration: Billable seconds (integer)
Args:
call_id: The Vobiz call_uuid
Returns:
Dict containing cost information
"""
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/"
try:
headers = {"X-Auth-ID": self.auth_id, "X-Auth-Token": self.auth_token}
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers=headers) as response:
if response.status != 200:
error_data = await response.text()
logger.error(f"Failed to get Vobiz call cost: {error_data}")
return {
"cost_usd": 0.0,
"duration": 0,
"status": "error",
"error": str(error_data),
}
call_data = await response.json()
# Vobiz returns cost as positive string (e.g., "0.04")
total_cost_str = call_data.get("total_cost", "0")
cost_usd = float(total_cost_str) if total_cost_str else 0.0
# Duration is billed_duration in seconds (integer)
duration = int(call_data.get("billed_duration", 0))
return {
"cost_usd": cost_usd,
"duration": duration,
"status": call_data.get("status", "unknown"),
"price_unit": "USD", # Vobiz always uses USD
"call_rate": call_data.get("call_rate", "0"),
"raw_response": call_data,
}
except Exception as e:
logger.error(f"Exception fetching Vobiz call cost: {e}")
return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse Vobiz status callback data into generic format.
Vobiz sends callbacks to hangup_url and ring_url with:
- call_uuid (instead of CallSid)
- status, from, to, duration, etc.
"""
return {
"call_id": data.get("call_uuid", data.get("CallUUID", "")),
"status": data.get("status", data.get("Status", "")),
"from_number": data.get("from", data.get("From")),
"to_number": data.get("to", data.get("To")),
"direction": data.get("direction", data.get("Direction")),
"duration": data.get("duration", data.get("Duration")),
"extra": data, # Include all original data
}
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""
Handle Vobiz WebSocket connection using Plivo-compatible protocol.
Uses workflow_run_id as stream/call identifiers and delegates
message handling to PlivoFrameSerializer.
"""
from api.services.pipecat.run_pipeline import run_pipeline_vobiz
try:
stream_id = f"vobiz-stream-{workflow_run_id}"
call_id = f"vobiz-call-{workflow_run_id}"
logger.info(
f"[run {workflow_run_id}] Starting Vobiz WebSocket handler - "
f"stream_id: {stream_id}, call_id: {call_id}"
)
await run_pipeline_vobiz(
websocket, stream_id, call_id, workflow_id, workflow_run_id, user_id
)
logger.info(f"[run {workflow_run_id}] Vobiz pipeline completed")
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Error in Vobiz WebSocket handler: {e}"
)
raise