From 09897cb5d8d04e196f209176ce239ecbcd938ef6 Mon Sep 17 00:00:00 2001 From: Piyush Sahoo <67003964+Piyush-sahoo@users.noreply.github.com> Date: Fri, 28 Nov 2025 09:36:04 +0530 Subject: [PATCH] feat: added vobiz telephony (#65) * feat: added vobiz telephony * chore: run formatter * chore: add migration * Add tsclient --------- Co-authored-by: Abhishek Kumar --- ...188ff90e76f_add_vobiz_mode_for_workflow.py | 42 +++ api/enums.py | 1 + api/routes/organization.py | 35 +- api/routes/telephony.py | 162 ++++++++- api/schemas/telephony_config.py | 23 ++ api/services/pipecat/audio_config.py | 7 +- api/services/pipecat/run_pipeline.py | 65 ++++ api/services/pipecat/transport_setup.py | 112 ++++++ api/services/telephony/factory.py | 11 + .../telephony/providers/vobiz_provider.py | 321 ++++++++++++++++++ ui/src/app/configure-telephony/page.tsx | 92 ++++- .../components/WorkflowHeader.tsx | 4 +- ui/src/client/client.gen.ts | 7 +- ui/src/client/index.ts | 2 +- ui/src/client/sdk.gen.ts | 35 +- ui/src/client/types.gen.ts | 94 ++++- 16 files changed, 994 insertions(+), 19 deletions(-) create mode 100644 api/alembic/versions/a188ff90e76f_add_vobiz_mode_for_workflow.py create mode 100644 api/services/telephony/providers/vobiz_provider.py diff --git a/api/alembic/versions/a188ff90e76f_add_vobiz_mode_for_workflow.py b/api/alembic/versions/a188ff90e76f_add_vobiz_mode_for_workflow.py new file mode 100644 index 0000000..54a19ef --- /dev/null +++ b/api/alembic/versions/a188ff90e76f_add_vobiz_mode_for_workflow.py @@ -0,0 +1,42 @@ +"""add vobiz mode for workflow + +Revision ID: a188ff90e76f +Revises: e02f387b7538 +Create Date: 2025-11-27 21:24:34.072030 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision: str = 'a188ff90e76f' +down_revision: Union[str, None] = 'e02f387b7538' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema='public', + enum_name='workflow_run_mode', + new_values=['twilio', 'vonage', 'vobiz', 'stasis', 'webrtc', 'smallwebrtc', 'VOICE', 'CHAT'], + affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema='public', + enum_name='workflow_run_mode', + new_values=['twilio', 'vonage', 'stasis', 'webrtc', 'smallwebrtc', 'VOICE', 'CHAT'], + affected_columns=[TableReference(table_schema='public', table_name='workflow_runs', column_name='mode')], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### diff --git a/api/enums.py b/api/enums.py index 4462696..a9e9465 100644 --- a/api/enums.py +++ b/api/enums.py @@ -15,6 +15,7 @@ class Environment(Enum): class WorkflowRunMode(Enum): TWILIO = "twilio" VONAGE = "vonage" + VOBIZ = "vobiz" STASIS = "stasis" WEBRTC = "webrtc" SMALLWEBRTC = "smallwebrtc" diff --git a/api/routes/organization.py b/api/routes/organization.py index a47966b..4fc7abc 100644 --- a/api/routes/organization.py +++ b/api/routes/organization.py @@ -9,6 +9,8 @@ from api.schemas.telephony_config import ( TelephonyConfigurationResponse, TwilioConfigurationRequest, TwilioConfigurationResponse, + VobizConfigurationRequest, + VobizConfigurationResponse, VonageConfigurationRequest, VonageConfigurationResponse, ) @@ -21,6 +23,7 @@ router = APIRouter(prefix="/organizations", tags=["organizations"]) PROVIDER_MASKED_FIELDS = { "twilio": ["account_sid", "auth_token"], "vonage": ["private_key", "api_key", "api_secret"], + "vobiz": ["auth_id", "auth_token"], } @@ -56,6 +59,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): from_numbers=from_numbers, ), vonage=None, + vobiz=None, ) elif stored_provider == "vonage": application_id = config.value.get("application_id", "") @@ -78,6 +82,24 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): api_secret=mask_key(api_secret) if api_secret else None, from_numbers=from_numbers, ), + vobiz=None, + ) + elif stored_provider == "vobiz": + auth_id = config.value.get("auth_id", "") + auth_token = config.value.get("auth_token", "") + from_numbers = ( + config.value.get("from_numbers", []) if auth_id and auth_token else [] + ) + + return TelephonyConfigurationResponse( + twilio=None, + vonage=None, + vobiz=VobizConfigurationResponse( + provider="vobiz", + auth_id=mask_key(auth_id) if auth_id else "", + auth_token=mask_key(auth_token) if auth_token else "", + from_numbers=from_numbers, + ), ) else: return TelephonyConfigurationResponse() @@ -85,7 +107,11 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): @router.post("/telephony-config") async def save_telephony_configuration( - request: Union[TwilioConfigurationRequest, VonageConfigurationRequest], + request: Union[ + TwilioConfigurationRequest, + VonageConfigurationRequest, + VobizConfigurationRequest, + ], user: UserModel = Depends(get_user), ): """Save telephony configuration for the user's organization.""" @@ -115,6 +141,13 @@ async def save_telephony_configuration( "api_secret": getattr(request, "api_secret", None), "from_numbers": request.from_numbers, } + elif request.provider == "vobiz": + config_value = { + "provider": "vobiz", + "auth_id": request.auth_id, + "auth_token": request.auth_token, + "from_numbers": request.from_numbers, + } else: raise HTTPException( status_code=400, detail=f"Unsupported provider: {request.provider}" diff --git a/api/routes/telephony.py b/api/routes/telephony.py index bcc3155..a9ce0ba 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -234,7 +234,10 @@ async def websocket_endpoint( provider_type = workflow_run.gathered_context.get("provider") if not provider_type: - logger.error(f"No provider type found in workflow run {workflow_run_id}") + logger.error( + f"No provider type found in workflow run {workflow_run_id}. " + f"gathered_context: {workflow_run.gathered_context}, mode: {workflow_run.mode}" + ) await websocket.close(code=4400, reason="Provider type not found") return @@ -483,3 +486,160 @@ async def handle_vonage_events( # Return 204 No Content as expected by Vonage return {"status": "ok"} + + +@router.post("/vobiz-xml", include_in_schema=False) +async def handle_vobiz_xml_webhook( + workflow_id: int, user_id: int, workflow_run_id: int, organization_id: int +): + """ + Handle initial webhook from Vobiz when call is answered. + Returns Vobiz XML response with Stream element. + + Vobiz uses Plivo-compatible XML format similar to Twilio's TwiML. + """ + logger.info( + f"[run {workflow_run_id}] Vobiz XML webhook called - " + f"workflow_id={workflow_id}, user_id={user_id}, org_id={organization_id}" + ) + + provider = await get_telephony_provider(organization_id) + + logger.debug(f"[run {workflow_run_id}] Using provider: {provider.PROVIDER_NAME}") + + response_content = await provider.get_webhook_response( + workflow_id, user_id, workflow_run_id + ) + + logger.debug( + f"[run {workflow_run_id}] Vobiz XML response generated:\n{response_content}" + ) + + return HTMLResponse(content=response_content, media_type="application/xml") + + +@router.post("/vobiz/hangup-callback/{workflow_run_id}") +async def handle_vobiz_hangup_callback( + workflow_run_id: int, + request: Request, +): + """Handle Vobiz hangup callback (sent when call ends). + + Vobiz sends callbacks to hangup_url when the call terminates. + This includes call duration, status, and billing information. + """ + # Parse the callback data (Vobiz sends form data or JSON) + try: + callback_data = await request.json() + logger.info( + f"[run {workflow_run_id}] Received Vobiz hangup callback (JSON): " + f"{json.dumps(callback_data)}" + ) + except Exception: + # Fallback to form data if JSON fails + form_data = await request.form() + callback_data = dict(form_data) + logger.info( + f"[run {workflow_run_id}] Received Vobiz hangup callback (form): " + f"{json.dumps(callback_data)}" + ) + + # Get workflow run for processing + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.warning( + f"[run {workflow_run_id}] Workflow run not found for Vobiz hangup callback" + ) + return {"status": "ignored", "reason": "workflow_run_not_found"} + + # Get workflow and provider + workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) + if not workflow: + logger.warning(f"[run {workflow_run_id}] Workflow not found") + return {"status": "ignored", "reason": "workflow_not_found"} + + provider = await get_telephony_provider(workflow.organization_id) + + logger.debug( + f"[run {workflow_run_id}] Processing Vobiz hangup with provider: {provider.PROVIDER_NAME}" + ) + + # Parse the callback data into generic format + parsed_data = provider.parse_status_callback(callback_data) + + logger.debug( + f"[run {workflow_run_id}] Parsed Vobiz callback data: {json.dumps(parsed_data)}" + ) + + # Create StatusCallbackRequest from parsed data + status_update = StatusCallbackRequest( + call_id=parsed_data["call_id"], + status=parsed_data["status"], + from_number=parsed_data.get("from_number"), + to_number=parsed_data.get("to_number"), + direction=parsed_data.get("direction"), + duration=parsed_data.get("duration"), + extra=parsed_data.get("extra", {}), + ) + + # Process the status update + await _process_status_update(workflow_run_id, status_update, workflow_run) + + logger.info(f"[run {workflow_run_id}] Vobiz hangup callback processed successfully") + + return {"status": "success"} + + +@router.post("/vobiz/ring-callback/{workflow_run_id}") +async def handle_vobiz_ring_callback( + workflow_run_id: int, + request: Request, +): + """Handle Vobiz ring callback (sent when call starts ringing). + + Vobiz can send callbacks to ring_url when the call starts ringing. + This is optional and used for tracking ringing status. + """ + # Parse the callback data + try: + callback_data = await request.json() + logger.info( + f"[run {workflow_run_id}] Received Vobiz ring callback (JSON): " + f"{json.dumps(callback_data)}" + ) + except Exception: + form_data = await request.form() + callback_data = dict(form_data) + logger.info( + f"[run {workflow_run_id}] Received Vobiz ring callback (form): " + f"{json.dumps(callback_data)}" + ) + + # Get workflow run for processing + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.warning( + f"[run {workflow_run_id}] Workflow run not found for Vobiz ring callback" + ) + return {"status": "ignored", "reason": "workflow_run_not_found"} + + # Log the ringing event + telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", []) + ring_log = { + "status": "ringing", + "timestamp": datetime.now(UTC).isoformat(), + "call_id": callback_data.get("call_uuid", callback_data.get("CallUUID", "")), + "event_type": "ring", + "raw_data": callback_data, + } + telephony_callback_logs.append(ring_log) + + # Update workflow run logs + await db_client.update_workflow_run( + run_id=workflow_run_id, + logs={"telephony_status_callbacks": telephony_callback_logs}, + ) + + logger.info(f"[run {workflow_run_id}] Vobiz ring callback logged") + + return {"status": "success"} diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py index 5f65bbb..d783cc2 100644 --- a/api/schemas/telephony_config.py +++ b/api/schemas/telephony_config.py @@ -47,8 +47,31 @@ class VonageConfigurationResponse(BaseModel): from_numbers: List[str] +class VobizConfigurationRequest(BaseModel): + """Request schema for Vobiz configuration.""" + + provider: str = Field(default="vobiz") + auth_id: str = Field(..., description="Vobiz Account ID (e.g., MA_SYQRLN1K)") + auth_token: str = Field(..., description="Vobiz Auth Token") + from_numbers: List[str] = Field( + ..., + min_length=1, + description="List of Vobiz phone numbers (E.164 without + prefix)", + ) + + +class VobizConfigurationResponse(BaseModel): + """Response schema for Vobiz configuration with masked sensitive fields.""" + + provider: str + auth_id: str # Masked (e.g., "****************L1NK") + auth_token: str # Masked (e.g., "****************KEFO") + from_numbers: List[str] + + class TelephonyConfigurationResponse(BaseModel): """Top-level telephony configuration response.""" twilio: Optional[TwilioConfigurationResponse] = None vonage: Optional[VonageConfigurationResponse] = None + vobiz: Optional[VobizConfigurationResponse] = None diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py index 42880ba..3c5aa80 100644 --- a/api/services/pipecat/audio_config.py +++ b/api/services/pipecat/audio_config.py @@ -85,7 +85,12 @@ def create_audio_config(transport_type: str) -> AudioConfig: Returns: AudioConfig instance with appropriate settings """ - if transport_type in (WorkflowRunMode.STASIS.value, WorkflowRunMode.TWILIO.value): + if transport_type in ( + WorkflowRunMode.STASIS.value, + WorkflowRunMode.TWILIO.value, + WorkflowRunMode.VOBIZ.value, + ): + # Twilio, Vobiz, and Stasis use MULAW at 8kHz return AudioConfig( transport_in_sample_rate=8000, transport_out_sample_rate=8000, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index bf41477..90c82db 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -31,6 +31,7 @@ from api.services.pipecat.tracing_config import setup_pipeline_tracing from api.services.pipecat.transport_setup import ( create_stasis_transport, create_twilio_transport, + create_vobiz_transport, create_vonage_transport, create_webrtc_transport, ) @@ -174,6 +175,70 @@ async def run_pipeline_vonage( raise +async def run_pipeline_vobiz( + websocket_client: WebSocket, + stream_id: str, + call_id: str, + workflow_id: int, + workflow_run_id: int, + user_id: int, +) -> None: + """Run pipeline for Vobiz using Plivo-compatible WebSocket protocol.""" + logger.info( + f"[run {workflow_run_id}] Starting Vobiz pipeline - " + f"stream_id={stream_id}, call_id={call_id}, workflow_id={workflow_id}" + ) + set_current_run_id(workflow_run_id) + + cost_info = {"call_id": call_id} + await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info) + + workflow = await db_client.get_workflow(workflow_id, user_id) + vad_config = None + ambient_noise_config = None + if workflow and workflow.workflow_configurations: + if "vad_configuration" in workflow.workflow_configurations: + vad_config = workflow.workflow_configurations["vad_configuration"] + if "ambient_noise_configuration" in workflow.workflow_configurations: + ambient_noise_config = workflow.workflow_configurations[ + "ambient_noise_configuration" + ] + + try: + audio_config = create_audio_config(WorkflowRunMode.VOBIZ.value) + logger.info( + f"[run {workflow_run_id}] Vobiz audio config: " + f"sample_rate={audio_config.transport_in_sample_rate}Hz, format=MULAW" + ) + + transport = await create_vobiz_transport( + websocket_client, + stream_id, + call_id, + workflow_run_id, + audio_config, + workflow.organization_id, + vad_config, + ambient_noise_config, + ) + + logger.info(f"[run {workflow_run_id}] Starting Vobiz pipeline execution") + await _run_pipeline( + transport, + workflow_id, + workflow_run_id, + user_id, + audio_config=audio_config, + ) + logger.info(f"[run {workflow_run_id}] Vobiz pipeline completed successfully") + + except Exception as e: + logger.error( + f"[run {workflow_run_id}] Error in Vobiz pipeline: {e}", exc_info=True + ) + raise + + async def run_pipeline_smallwebrtc( webrtc_connection: SmallWebRTCConnection, workflow_id: int, diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index a674ce4..574ddc3 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -21,6 +21,7 @@ from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams +from pipecat.serializers.plivo import PlivoFrameSerializer from pipecat.serializers.twilio import TwilioFrameSerializer from pipecat.serializers.vonage import VonageFrameSerializer from pipecat.transports.base_transport import TransportParams @@ -233,6 +234,117 @@ async def create_vonage_transport( ) +async def create_vobiz_transport( + websocket_client: WebSocket, + stream_id: str, + call_id: str, + workflow_run_id: int, + audio_config: AudioConfig, + organization_id: int, + vad_config: dict | None = None, + ambient_noise_config: dict | None = None, +): + """Create a transport for Vobiz connections. + + Vobiz uses Plivo-compatible WebSocket protocol: + - MULAW audio at 8kHz (same as Twilio) + - Base64-encoded audio in JSON messages + - PlivoFrameSerializer handles the protocol + """ + from loguru import logger + + logger.info( + f"[run {workflow_run_id}] Creating Vobiz transport - " + f"stream_id={stream_id}, call_id={call_id}" + ) + + # Load Vobiz configuration from database + from api.services.telephony.factory import load_telephony_config + + config = await load_telephony_config(organization_id) + + if config.get("provider") != "vobiz": + raise ValueError(f"Expected Vobiz provider, got {config.get('provider')}") + + auth_id = config.get("auth_id") + auth_token = config.get("auth_token") + + if not auth_id or not auth_token: + raise ValueError( + f"Incomplete Vobiz configuration for organization {organization_id}" + ) + + logger.debug( + f"[run {workflow_run_id}] Vobiz config loaded - auth_id={auth_id}, " + f"from_numbers={len(config.get('from_numbers', []))} numbers" + ) + + turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config) + + # Use PlivoFrameSerializer for Vobiz (Plivo-compatible protocol) + serializer = PlivoFrameSerializer( + stream_id=stream_id, + call_id=call_id, + auth_id=auth_id, + auth_token=auth_token, + params=PlivoFrameSerializer.InputParams( + plivo_sample_rate=8000, # Vobiz uses MULAW at 8kHz + sample_rate=audio_config.pipeline_sample_rate, + ), + ) + + logger.debug( + f"[run {workflow_run_id}] PlivoFrameSerializer created for Vobiz - " + f"transport_rate=8000Hz, pipeline_rate={audio_config.pipeline_sample_rate}Hz" + ) + + # Create WebSocket transport (same structure as Twilio/Vonage) + transport = FastAPIWebsocketTransport( + websocket=websocket_client, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=audio_config.transport_in_sample_rate, + audio_out_sample_rate=audio_config.transport_out_sample_rate, + vad_analyzer=( + SileroVADAnalyzer( + params=VADParams( + confidence=vad_config.get("confidence", 0.7), + start_secs=vad_config.get("start_seconds", 0.4), + stop_secs=vad_config.get("stop_seconds", 0.8), + min_volume=vad_config.get("minimum_volume", 0.6), + ) + ) + if vad_config + else SileroVADAnalyzer() + ), + audio_out_mixer=( + SoundfileMixer( + sound_files={ + "office": APP_ROOT_DIR + / "assets" + / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav" + }, + default_sound="office", + volume=ambient_noise_config.get("volume", 0.3), + ) + if ambient_noise_config and ambient_noise_config.get("enabled", False) + else SilenceAudioMixer() + ), + turn_analyzer=turn_analyzer, + serializer=serializer, + audio_in_filter=RNNoiseFilter(library_path=librnnoise_path) + if ENABLE_RNNOISE + else None, + ), + ) + + logger.info( + f"[run {workflow_run_id}] Vobiz transport created successfully (VAD enabled)" + ) + return transport + + def create_webrtc_transport( webrtc_connection: SmallWebRTCConnection, workflow_run_id: int, diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py index f89f8b5..9da13ce 100644 --- a/api/services/telephony/factory.py +++ b/api/services/telephony/factory.py @@ -12,6 +12,7 @@ from api.db import db_client from api.enums import OrganizationConfigurationKey from api.services.telephony.base import TelephonyProvider from api.services.telephony.providers.twilio_provider import TwilioProvider +from api.services.telephony.providers.vobiz_provider import VobizProvider from api.services.telephony.providers.vonage_provider import VonageProvider @@ -58,6 +59,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]: "api_secret": config.value.get("api_secret"), "from_numbers": config.value.get("from_numbers", []), } + elif provider == "vobiz": + return { + "provider": "vobiz", + "auth_id": config.value.get("auth_id"), + "auth_token": config.value.get("auth_token"), + "from_numbers": config.value.get("from_numbers", []), + } else: raise ValueError(f"Unknown provider in config: {provider}") @@ -92,5 +100,8 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider: elif provider_type == "vonage": return VonageProvider(config) + elif provider_type == "vobiz": + return VobizProvider(config) + else: raise ValueError(f"Unknown telephony provider: {provider_type}") diff --git a/api/services/telephony/providers/vobiz_provider.py b/api/services/telephony/providers/vobiz_provider.py new file mode 100644 index 0000000..0ee1ee8 --- /dev/null +++ b/api/services/telephony/providers/vobiz_provider.py @@ -0,0 +1,321 @@ +""" +Vobiz implementation of the TelephonyProvider interface. +""" + +import random +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +import aiohttp +from loguru import logger + +from api.enums import WorkflowRunMode +from api.services.telephony.base import CallInitiationResult, TelephonyProvider +from api.utils.tunnel import TunnelURLProvider + +if TYPE_CHECKING: + from fastapi import WebSocket + + +class VobizProvider(TelephonyProvider): + """ + Vobiz implementation of TelephonyProvider. + Vobiz uses Plivo-compatible API and WebSocket protocol. + """ + + PROVIDER_NAME = WorkflowRunMode.VOBIZ.value + WEBHOOK_ENDPOINT = "vobiz-xml" + + def __init__(self, config: Dict[str, Any]): + """ + Initialize VobizProvider with configuration. + + Args: + config: Dictionary containing: + - auth_id: Vobiz Account ID (e.g., MA_SYQRLN1K) + - auth_token: Vobiz Auth Token + - from_numbers: List of phone numbers to use (E.164 format without +) + """ + self.auth_id = config.get("auth_id") + self.auth_token = config.get("auth_token") + self.from_numbers = config.get("from_numbers", []) + + # Handle both single number (string) and multiple numbers (list) + if isinstance(self.from_numbers, str): + self.from_numbers = [self.from_numbers] + + self.base_url = "https://api.vobiz.ai/api" + + async def initiate_call( + self, + to_number: str, + webhook_url: str, + workflow_run_id: Optional[int] = None, + **kwargs: Any, + ) -> CallInitiationResult: + """ + Initiate an outbound call via Vobiz. + + Vobiz API differences from Twilio: + - Uses X-Auth-ID and X-Auth-Token headers instead of Basic Auth + - Expects JSON body instead of form data + - Phone numbers in E.164 format WITHOUT + prefix (e.g., 14155551234) + - Returns "call_uuid" instead of "sid" + """ + if not self.validate_config(): + raise ValueError("Vobiz provider not properly configured") + + endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/" + + # Select a random phone number + from_number = random.choice(self.from_numbers) + logger.info(f"Selected Vobiz phone number {from_number} for outbound call") + + # Remove + prefix if present (Vobiz expects E.164 without +) + to_number_clean = to_number.lstrip("+") + from_number_clean = from_number.lstrip("+") + + # Prepare call data (JSON format) + data = { + "from": from_number_clean, + "to": to_number_clean, + "answer_url": webhook_url, + "answer_method": "POST", + } + + # Add hangup callback if workflow_run_id provided + if workflow_run_id: + backend_endpoint = await TunnelURLProvider.get_tunnel_url() + hangup_url = f"https://{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}" + ring_url = f"https://{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}" + data.update( + { + "hangup_url": hangup_url, + "hangup_method": "POST", + "ring_url": ring_url, + "ring_method": "POST", + } + ) + + # Add optional parameters + data.update(kwargs) + + # Make the API request + headers = { + "X-Auth-ID": self.auth_id, + "X-Auth-Token": self.auth_token, + "Content-Type": "application/json", + } + + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, json=data, headers=headers) as response: + if response.status != 201: + error_data = await response.text() + logger.error(f"Vobiz API error: {error_data}") + raise Exception(f"Failed to initiate Vobiz call: {error_data}") + + response_data = await response.json() + logger.info(f"Vobiz API response: {response_data}") + + # Extract call_uuid with multiple fallback options + call_id = ( + response_data.get("call_uuid") + or response_data.get("CallUUID") + or response_data.get("request_uuid") + or response_data.get("RequestUUID") + ) + + if not call_id: + logger.error( + f"No call ID found in Vobiz response. Available keys: {list(response_data.keys())}" + ) + raise Exception( + f"Vobiz API response missing call identifier. Response: {response_data}" + ) + + logger.info(f"Vobiz call initiated successfully. Call ID: {call_id}") + + return CallInitiationResult( + call_id=call_id, + status="queued", # Vobiz returns "message": "call fired" + provider_metadata={}, + raw_response=response_data, + ) + + async def get_call_status(self, call_id: str) -> Dict[str, Any]: + """ + Get the current status of a Vobiz call (CDR). + + Vobiz returns: + - call_uuid, status, duration, billed_duration + - call_rate, total_cost (for billing) + """ + if not self.validate_config(): + raise ValueError("Vobiz provider not properly configured") + + endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/" + + headers = {"X-Auth-ID": self.auth_id, "X-Auth-Token": self.auth_token} + + async with aiohttp.ClientSession() as session: + async with session.get(endpoint, headers=headers) as response: + if response.status != 200: + error_data = await response.text() + logger.error(f"Failed to get Vobiz call status: {error_data}") + raise Exception(f"Failed to get call status: {error_data}") + + return await response.json() + + async def get_available_phone_numbers(self) -> List[str]: + """ + Get list of available Vobiz phone numbers. + """ + return self.from_numbers + + def validate_config(self) -> bool: + """ + Validate Vobiz configuration. + """ + return bool(self.auth_id and self.auth_token and self.from_numbers) + + async def verify_webhook_signature( + self, url: str, params: Dict[str, Any], signature: str + ) -> bool: + """ + Verify Vobiz webhook signature for security. + + Vobiz uses Plivo-compatible signature verification (HMAC-SHA256). + For now, returning True to allow testing. + TODO: Implement proper signature verification based on Vobiz docs. + """ + # Plivo/Vobiz signature verification would go here + # For development, we can skip signature verification + # In production, implement HMAC-SHA256 verification + logger.warning("Vobiz webhook signature verification not yet implemented") + return True + + async def get_webhook_response( + self, workflow_id: int, user_id: int, workflow_run_id: int + ) -> str: + """ + Generate Vobiz XML response for starting a call session. + + Vobiz uses element similar to Twilio but with Plivo-compatible attributes: + - bidirectional: Enable two-way audio + - audioTrack: Which audio to stream (inbound, outbound, both) + - contentType: audio/x-mulaw;rate=8000 + """ + backend_endpoint = await TunnelURLProvider.get_tunnel_url() + + vobiz_xml = f""" + + wss://{backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id} +""" + return vobiz_xml + + async def get_call_cost(self, call_id: str) -> Dict[str, Any]: + """ + Get cost information for a completed Vobiz call. + + Vobiz returns cost in the same CDR endpoint: + - total_cost: Positive string (e.g., "0.04") + - call_rate: Per-minute rate (e.g., "0.02") + - billed_duration: Billable seconds (integer) + + Args: + call_id: The Vobiz call_uuid + + Returns: + Dict containing cost information + """ + endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/" + + try: + headers = {"X-Auth-ID": self.auth_id, "X-Auth-Token": self.auth_token} + + async with aiohttp.ClientSession() as session: + async with session.get(endpoint, headers=headers) as response: + if response.status != 200: + error_data = await response.text() + logger.error(f"Failed to get Vobiz call cost: {error_data}") + return { + "cost_usd": 0.0, + "duration": 0, + "status": "error", + "error": str(error_data), + } + + call_data = await response.json() + + # Vobiz returns cost as positive string (e.g., "0.04") + total_cost_str = call_data.get("total_cost", "0") + cost_usd = float(total_cost_str) if total_cost_str else 0.0 + + # Duration is billed_duration in seconds (integer) + duration = int(call_data.get("billed_duration", 0)) + + return { + "cost_usd": cost_usd, + "duration": duration, + "status": call_data.get("status", "unknown"), + "price_unit": "USD", # Vobiz always uses USD + "call_rate": call_data.get("call_rate", "0"), + "raw_response": call_data, + } + + except Exception as e: + logger.error(f"Exception fetching Vobiz call cost: {e}") + return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)} + + def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse Vobiz status callback data into generic format. + + Vobiz sends callbacks to hangup_url and ring_url with: + - call_uuid (instead of CallSid) + - status, from, to, duration, etc. + """ + return { + "call_id": data.get("call_uuid", data.get("CallUUID", "")), + "status": data.get("status", data.get("Status", "")), + "from_number": data.get("from", data.get("From")), + "to_number": data.get("to", data.get("To")), + "direction": data.get("direction", data.get("Direction")), + "duration": data.get("duration", data.get("Duration")), + "extra": data, # Include all original data + } + + async def handle_websocket( + self, + websocket: "WebSocket", + workflow_id: int, + user_id: int, + workflow_run_id: int, + ) -> None: + """ + Handle Vobiz WebSocket connection using Plivo-compatible protocol. + + Uses workflow_run_id as stream/call identifiers and delegates + message handling to PlivoFrameSerializer. + """ + from api.services.pipecat.run_pipeline import run_pipeline_vobiz + + try: + stream_id = f"vobiz-stream-{workflow_run_id}" + call_id = f"vobiz-call-{workflow_run_id}" + + logger.info( + f"[run {workflow_run_id}] Starting Vobiz WebSocket handler - " + f"stream_id: {stream_id}, call_id: {call_id}" + ) + + await run_pipeline_vobiz( + websocket, stream_id, call_id, workflow_id, workflow_run_id, user_id + ) + + logger.info(f"[run {workflow_run_id}] Vobiz pipeline completed") + + except Exception as e: + logger.error( + f"[run {workflow_run_id}] Error in Vobiz WebSocket handler: {e}" + ) + raise diff --git a/ui/src/app/configure-telephony/page.tsx b/ui/src/app/configure-telephony/page.tsx index 01d97b9..5754178 100644 --- a/ui/src/app/configure-telephony/page.tsx +++ b/ui/src/app/configure-telephony/page.tsx @@ -6,7 +6,7 @@ import { useForm } from "react-hook-form"; import { toast } from "sonner"; import { getTelephonyConfigurationApiV1OrganizationsTelephonyConfigGet, saveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPost } from "@/client/sdk.gen"; -import type { TwilioConfigurationRequest, VonageConfigurationRequest } from "@/client/types.gen"; +import type { TwilioConfigurationRequest, VobizConfigurationRequest,VonageConfigurationRequest } from "@/client/types.gen"; import { Button } from "@/components/ui/button"; import { Card, @@ -37,6 +37,9 @@ interface TelephonyConfigForm { private_key?: string; api_key?: string; api_secret?: string; + // Vobiz fields + auth_id?: string; + vobiz_auth_token?: string; // Common field from_number: string; } @@ -99,6 +102,14 @@ export default function ConfigureTelephonyPage() { if (response.data.vonage.from_numbers?.length > 0) { setValue("from_number", response.data.vonage.from_numbers[0]); } + } else if (response.data?.vobiz) { + setHasExistingConfig(true); + setValue("provider", "vobiz"); + setValue("auth_id", response.data.vobiz.auth_id); + setValue("vobiz_auth_token", response.data.vobiz.auth_token); + if (response.data.vobiz.from_numbers?.length > 0) { + setValue("from_number", response.data.vobiz.from_numbers[0]); + } } } } catch (error) { @@ -116,7 +127,7 @@ export default function ConfigureTelephonyPage() { const accessToken = await getAccessToken(); // Build the request body based on provider - let requestBody: TwilioConfigurationRequest | VonageConfigurationRequest; + let requestBody: TwilioConfigurationRequest | VonageConfigurationRequest | VobizConfigurationRequest; if (data.provider === "twilio") { requestBody = { @@ -125,7 +136,7 @@ export default function ConfigureTelephonyPage() { account_sid: data.account_sid, auth_token: data.auth_token, } as TwilioConfigurationRequest; - } else { + } else if (data.provider === "vonage") { requestBody = { provider: data.provider, from_numbers: [data.from_number], @@ -134,6 +145,13 @@ export default function ConfigureTelephonyPage() { api_key: data.api_key || undefined, api_secret: data.api_secret || undefined, } as VonageConfigurationRequest; + } else { + requestBody = { + provider: data.provider, + from_numbers: [data.from_number], + auth_id: data.auth_id, + auth_token: data.vobiz_auth_token, + } as VobizConfigurationRequest; } const response = await saveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPost({ @@ -223,6 +241,7 @@ export default function ConfigureTelephonyPage() { Twilio Vonage + Vobiz {hasExistingConfig && ( @@ -381,6 +400,73 @@ export default function ConfigureTelephonyPage() { )} + {/* Vobiz-specific fields */} + {selectedProvider === "vobiz" && ( + <> +
+ + + {errors.auth_id && ( +

+ {errors.auth_id.message} +

+ )} +
+ +
+ + + {errors.vobiz_auth_token && ( +

+ {errors.vobiz_auth_token.message} +

+ )} +
+ +
+ + + {errors.from_number && ( +

+ {errors.from_number.message} +

+ )} +
+ + )} +