diff --git a/api/alembic/versions/f2e1d0c9b8a7_add_plivo_mode.py b/api/alembic/versions/f2e1d0c9b8a7_add_plivo_mode.py new file mode 100644 index 0000000..6a59d54 --- /dev/null +++ b/api/alembic/versions/f2e1d0c9b8a7_add_plivo_mode.py @@ -0,0 +1,74 @@ +"""add plivo mode + +Revision ID: f2e1d0c9b8a7 +Revises: a1b2c3d4e5f6, 67a5cf3e09d0 +Create Date: 2026-04-13 16:35:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision: str = "f2e1d0c9b8a7" +down_revision: Union[str, Sequence[str], None] = ( + "a1b2c3d4e5f6", + "67a5cf3e09d0", +) +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.sync_enum_values( + enum_schema="public", + enum_name="workflow_run_mode", + new_values=[ + "ari", + "plivo", + "twilio", + "vonage", + "vobiz", + "cloudonix", + "telnyx", + "webrtc", + "smallwebrtc", + "stasis", + "VOICE", + "CHAT", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="workflow_runs", column_name="mode" + ) + ], + enum_values_to_rename=[], + ) + + +def downgrade() -> None: + op.sync_enum_values( + enum_schema="public", + enum_name="workflow_run_mode", + new_values=[ + "ari", + "twilio", + "vonage", + "vobiz", + "cloudonix", + "telnyx", + "webrtc", + "smallwebrtc", + "stasis", + "VOICE", + "CHAT", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="workflow_runs", column_name="mode" + ) + ], + enum_values_to_rename=[], + ) diff --git a/api/enums.py b/api/enums.py index c07879a..b7655b1 100644 --- a/api/enums.py +++ b/api/enums.py @@ -19,6 +19,7 @@ class CallType(Enum): class WorkflowRunMode(Enum): ARI = "ari" + PLIVO = "plivo" TWILIO = "twilio" VONAGE = "vonage" VOBIZ = "vobiz" diff --git a/api/routes/organization.py b/api/routes/organization.py index f35ef0a..9db9b8a 100644 --- a/api/routes/organization.py +++ b/api/routes/organization.py @@ -12,6 +12,8 @@ from api.schemas.telephony_config import ( ARIConfigurationResponse, CloudonixConfigurationRequest, CloudonixConfigurationResponse, + PlivoConfigurationRequest, + PlivoConfigurationResponse, TelephonyConfigurationResponse, TelnyxConfigurationRequest, TelnyxConfigurationResponse, @@ -33,6 +35,7 @@ router = APIRouter(prefix="/organizations", tags=["organizations"]) # Provider configuration constants PROVIDER_MASKED_FIELDS = { "twilio": ["account_sid", "auth_token"], + "plivo": ["auth_id", "auth_token"], "vonage": ["private_key", "api_key", "api_secret"], "vobiz": ["auth_id", "auth_token"], "cloudonix": ["bearer_token"], @@ -72,6 +75,26 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): auth_token=mask_key(auth_token) if auth_token else "", from_numbers=from_numbers, ), + plivo=None, + vonage=None, + vobiz=None, + cloudonix=None, + ) + elif stored_provider == "plivo": + auth_id = config.value.get("auth_id", "") + auth_token = config.value.get("auth_token", "") + from_numbers = ( + config.value.get("from_numbers", []) if auth_id and auth_token else [] + ) + + return TelephonyConfigurationResponse( + twilio=None, + plivo=PlivoConfigurationResponse( + provider="plivo", + auth_id=mask_key(auth_id) if auth_id else "", + auth_token=mask_key(auth_token) if auth_token else "", + from_numbers=from_numbers, + ), vonage=None, vobiz=None, cloudonix=None, @@ -89,6 +112,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): return TelephonyConfigurationResponse( twilio=None, + plivo=None, vonage=VonageConfigurationResponse( provider="vonage", application_id=application_id, @@ -109,6 +133,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): return TelephonyConfigurationResponse( twilio=None, + plivo=None, vonage=None, vobiz=VobizConfigurationResponse( provider="vobiz", @@ -125,6 +150,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): return TelephonyConfigurationResponse( twilio=None, + plivo=None, vonage=None, cloudonix=CloudonixConfigurationResponse( provider="cloudonix", @@ -175,6 +201,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)): async def save_telephony_configuration( request: Union[ TwilioConfigurationRequest, + PlivoConfigurationRequest, VonageConfigurationRequest, VobizConfigurationRequest, CloudonixConfigurationRequest, @@ -201,6 +228,13 @@ async def save_telephony_configuration( "auth_token": request.auth_token, "from_numbers": request.from_numbers, } + elif request.provider == "plivo": + config_value = { + "provider": "plivo", + "auth_id": request.auth_id, + "auth_token": request.auth_token, + "from_numbers": request.from_numbers, + } elif request.provider == "vonage": config_value = { "provider": "vonage", diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 645c6a5..b9951f2 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -89,6 +89,33 @@ class StatusCallbackRequest(BaseModel): extra=data, ) + @classmethod + def from_plivo(cls, data: dict): + """Convert Plivo callback to generic format""" + status_map = { + "in-progress": "answered", + "ringing": "ringing", + "ring": "ringing", + "completed": "completed", + "hangup": "completed", + "stopstream": "completed", + "busy": "busy", + "no-answer": "no-answer", + "cancel": "canceled", + "cancelled": "canceled", + "timeout": "no-answer", + } + call_status = (data.get("CallStatus") or data.get("Event") or "").lower() + return cls( + call_id=data.get("CallUUID", "") or data.get("RequestUUID", ""), + status=status_map.get(call_status, call_status), + from_number=data.get("From"), + to_number=data.get("To"), + direction=data.get("Direction"), + duration=data.get("Duration"), + extra=data, + ) + @classmethod def from_vonage(cls, data: dict): """Convert Vonage event to generic format""" @@ -340,6 +367,9 @@ async def _validate_inbound_request( webhook_data: dict, webhook_body: str = "", x_twilio_signature: str = None, + x_plivo_signature: str = None, + x_plivo_signature_ma: str = None, + x_plivo_signature_nonce: str = None, x_vobiz_signature: str = None, x_vobiz_timestamp: str = None, x_cx_apikey: str = None, @@ -377,7 +407,14 @@ async def _validate_inbound_request( # Verify webhook signature/API key if provided provider_instance = None - if x_twilio_signature or x_vobiz_signature or x_cx_apikey or telnyx_signature: + if ( + x_twilio_signature + or x_plivo_signature + or x_plivo_signature_ma + or x_vobiz_signature + or x_cx_apikey + or telnyx_signature + ): backend_endpoint, _ = await get_backend_endpoints() webhook_url = f"{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}" @@ -389,6 +426,16 @@ async def _validate_inbound_request( signature_valid = await provider_instance.verify_inbound_signature( webhook_url, webhook_data, x_twilio_signature ) + elif provider_class.PROVIDER_NAME == "plivo" and ( + x_plivo_signature or x_plivo_signature_ma + ): + logger.info(f"Verifying Plivo signature for URL: {webhook_url}") + signature_valid = await provider_instance.verify_inbound_signature( + webhook_url, + webhook_data, + x_plivo_signature or x_plivo_signature_ma, + x_plivo_signature_nonce, + ) elif provider_class.PROVIDER_NAME == "vobiz" and x_vobiz_signature: logger.info(f"Verifying Vobiz signature for URL: {webhook_url}") signature_valid = await provider_instance.verify_inbound_signature( @@ -478,12 +525,6 @@ async def _validate_organization_provider_config( organization_id: int, provider_class, account_id: str ) -> TelephonyError: """Validate provider and account_id, returning specific error type""" - if not account_id: - logger.warning( - f"No account_id provided for provider {provider_class.PROVIDER_NAME}" - ) - return TelephonyError.ACCOUNT_VALIDATION_FAILED - try: config = await db_client.get_configuration( organization_id, @@ -1015,6 +1056,160 @@ async def handle_vobiz_xml_webhook( return HTMLResponse(content=response_content, media_type="application/xml") +@router.post("/plivo-xml", include_in_schema=False) +async def handle_plivo_xml_webhook( + workflow_id: int, + user_id: int, + workflow_run_id: int, + organization_id: int, + request: Request, + x_plivo_signature_v3: Optional[str] = Header(None), + x_plivo_signature_ma_v3: Optional[str] = Header(None), + x_plivo_signature_v3_nonce: Optional[str] = Header(None), +): + """ + Handle initial webhook from Plivo when an outbound call is answered. + Returns Plivo XML response with Stream element. + """ + set_current_run_id(workflow_run_id) + provider = await get_telephony_provider(organization_id) + + form_data = await request.form() + callback_data = dict(form_data) + + signature = x_plivo_signature_v3 or x_plivo_signature_ma_v3 + if signature: + backend_endpoint, _ = await get_backend_endpoints() + full_url = ( + f"{backend_endpoint}/api/v1/telephony/plivo-xml" + f"?workflow_id={workflow_id}" + f"&user_id={user_id}" + f"&workflow_run_id={workflow_run_id}" + f"&organization_id={organization_id}" + ) + is_valid = await provider.verify_inbound_signature( + full_url, callback_data, signature, x_plivo_signature_v3_nonce + ) + if not is_valid: + logger.warning( + f"[run {workflow_run_id}] Invalid Plivo signature on answer webhook" + ) + return provider.generate_error_response( + "invalid_signature", "Invalid webhook signature." + ) + + call_id = callback_data.get("CallUUID") or callback_data.get("RequestUUID") + if call_id: + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + gathered_context = dict(workflow_run.gathered_context or {}) + gathered_context["call_id"] = call_id + await db_client.update_workflow_run( + run_id=workflow_run_id, gathered_context=gathered_context + ) + + response_content = await provider.get_webhook_response( + workflow_id, user_id, workflow_run_id + ) + return HTMLResponse(content=response_content, media_type="application/xml") + + +async def _handle_plivo_status_callback( + workflow_run_id: int, + request: Request, + x_plivo_signature_v3: Optional[str], + x_plivo_signature_ma_v3: Optional[str], + x_plivo_signature_v3_nonce: Optional[str], +): + set_current_run_id(workflow_run_id) + + form_data = await request.form() + callback_data = dict(form_data) + logger.info( + f"[run {workflow_run_id}] Received Plivo callback: {json.dumps(callback_data)}" + ) + + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.warning(f"Workflow run {workflow_run_id} not found for Plivo callback") + return {"status": "ignored", "reason": "workflow_run_not_found"} + + workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) + if not workflow: + logger.warning(f"Workflow {workflow_run.workflow_id} not found") + return {"status": "ignored", "reason": "workflow_not_found"} + + provider = await get_telephony_provider(workflow.organization_id) + + signature = x_plivo_signature_v3 or x_plivo_signature_ma_v3 + if signature: + backend_endpoint, _ = await get_backend_endpoints() + callback_kind = request.url.path.split("/")[-2] + full_url = ( + f"{backend_endpoint}/api/v1/telephony/plivo/{callback_kind}/{workflow_run_id}" + ) + is_valid = await provider.verify_inbound_signature( + full_url, + callback_data, + signature, + x_plivo_signature_v3_nonce, + ) + if not is_valid: + logger.warning( + f"[run {workflow_run_id}] Invalid Plivo webhook signature" + ) + return {"status": "error", "reason": "invalid_signature"} + + parsed_data = provider.parse_status_callback(callback_data) + status_update = StatusCallbackRequest( + call_id=parsed_data["call_id"], + status=parsed_data["status"], + from_number=parsed_data.get("from_number"), + to_number=parsed_data.get("to_number"), + direction=parsed_data.get("direction"), + duration=parsed_data.get("duration"), + extra=parsed_data.get("extra", {}), + ) + + await _process_status_update(workflow_run_id, status_update) + return {"status": "success"} + + +@router.post("/plivo/hangup-callback/{workflow_run_id}") +async def handle_plivo_hangup_callback( + workflow_run_id: int, + request: Request, + x_plivo_signature_v3: Optional[str] = Header(None), + x_plivo_signature_ma_v3: Optional[str] = Header(None), + x_plivo_signature_v3_nonce: Optional[str] = Header(None), +): + """Handle Plivo hangup callbacks.""" + return await _handle_plivo_status_callback( + workflow_run_id, + request, + x_plivo_signature_v3, + x_plivo_signature_ma_v3, + x_plivo_signature_v3_nonce, + ) + + +@router.post("/plivo/ring-callback/{workflow_run_id}") +async def handle_plivo_ring_callback( + workflow_run_id: int, + request: Request, + x_plivo_signature_v3: Optional[str] = Header(None), + x_plivo_signature_ma_v3: Optional[str] = Header(None), + x_plivo_signature_v3_nonce: Optional[str] = Header(None), +): + """Handle Plivo ring callbacks.""" + return await _handle_plivo_status_callback( + workflow_run_id, + request, + x_plivo_signature_v3, + x_plivo_signature_ma_v3, + x_plivo_signature_v3_nonce, + ) + + @router.post("/vobiz/hangup-callback/{workflow_run_id}") async def handle_vobiz_hangup_callback( workflow_run_id: int, @@ -1440,6 +1635,9 @@ async def handle_inbound_telephony( workflow_id: int, request: Request, x_twilio_signature: Optional[str] = Header(None), + x_plivo_signature_v3: Optional[str] = Header(None), + x_plivo_signature_ma_v3: Optional[str] = Header(None), + x_plivo_signature_v3_nonce: Optional[str] = Header(None), x_vobiz_signature: Optional[str] = Header(None), x_vobiz_timestamp: Optional[str] = Header(None), x_cx_apikey: Optional[str] = Header(None), @@ -1495,6 +1693,9 @@ async def handle_inbound_telephony( webhook_data, webhook_body, x_twilio_signature, + x_plivo_signature_v3, + x_plivo_signature_ma_v3, + x_plivo_signature_v3_nonce, x_vobiz_signature, x_vobiz_timestamp, x_cx_apikey, diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py index e1c3525..805060d 100644 --- a/api/schemas/telephony_config.py +++ b/api/schemas/telephony_config.py @@ -23,6 +23,26 @@ class TwilioConfigurationResponse(BaseModel): from_numbers: List[str] +class PlivoConfigurationRequest(BaseModel): + """Request schema for Plivo configuration.""" + + provider: str = Field(default="plivo") + auth_id: str = Field(..., description="Plivo Auth ID") + auth_token: str = Field(..., description="Plivo Auth Token") + from_numbers: List[str] = Field( + ..., min_length=1, description="List of Plivo phone numbers" + ) + + +class PlivoConfigurationResponse(BaseModel): + """Response schema for Plivo configuration with masked sensitive fields.""" + + provider: str + auth_id: str # Masked + auth_token: str # Masked + from_numbers: List[str] + + class VonageConfigurationRequest(BaseModel): """Request schema for Vonage configuration.""" @@ -151,6 +171,7 @@ class TelephonyConfigurationResponse(BaseModel): """Top-level telephony configuration response.""" twilio: Optional[TwilioConfigurationResponse] = None + plivo: Optional[PlivoConfigurationResponse] = None vonage: Optional[VonageConfigurationResponse] = None vobiz: Optional[VobizConfigurationResponse] = None cloudonix: Optional[CloudonixConfigurationResponse] = None diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py index 6f59de2..c0ba9e8 100644 --- a/api/services/pipecat/audio_config.py +++ b/api/services/pipecat/audio_config.py @@ -87,19 +87,20 @@ def create_audio_config(transport_type: str) -> AudioConfig: """Create audio configuration based on transport type. Args: - transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix") + transport_type: Type of transport ("webrtc", "twilio", "plivo", "vonage", "vobiz", "cloudonix") Returns: AudioConfig instance with appropriate settings """ if transport_type in ( WorkflowRunMode.TWILIO.value, + WorkflowRunMode.PLIVO.value, WorkflowRunMode.VOBIZ.value, WorkflowRunMode.CLOUDONIX.value, WorkflowRunMode.ARI.value, WorkflowRunMode.TELNYX.value, ): - # Twilio, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz + # Twilio, Plivo, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz return AudioConfig( transport_in_sample_rate=8000, transport_out_sample_rate=8000, diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 6584433..88234c6 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -47,6 +47,7 @@ from api.services.pipecat.tracing_config import ( from api.services.pipecat.transport_setup import ( create_ari_transport, create_cloudonix_transport, + create_plivo_transport, create_telnyx_transport, create_twilio_transport, create_vobiz_transport, @@ -151,6 +152,69 @@ async def run_pipeline_twilio( ) +async def run_pipeline_plivo( + websocket_client: WebSocket, + stream_id: str, + call_id: str, + workflow_id: int, + workflow_run_id: int, + user_id: int, +) -> None: + """Run pipeline for Plivo WebSocket connections.""" + logger.info( + f"[run {workflow_run_id}] Starting Plivo pipeline - " + f"stream_id={stream_id}, call_id={call_id}, workflow_id={workflow_id}" + ) + set_current_run_id(workflow_run_id) + + cost_info = {"call_id": call_id} + await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info) + + workflow = await db_client.get_workflow(workflow_id, user_id) + + if workflow: + set_current_org_id(workflow.organization_id) + + vad_config = None + ambient_noise_config = None + if workflow and workflow.workflow_configurations: + if "vad_configuration" in workflow.workflow_configurations: + vad_config = workflow.workflow_configurations["vad_configuration"] + if "ambient_noise_configuration" in workflow.workflow_configurations: + ambient_noise_config = workflow.workflow_configurations[ + "ambient_noise_configuration" + ] + + try: + audio_config = create_audio_config(WorkflowRunMode.PLIVO.value) + + transport = await create_plivo_transport( + websocket_client, + stream_id, + call_id, + workflow_run_id, + audio_config, + workflow.organization_id, + vad_config, + ambient_noise_config, + ) + + await _run_pipeline( + transport, + workflow_id, + workflow_run_id, + user_id, + audio_config=audio_config, + ) + logger.info(f"[run {workflow_run_id}] Plivo pipeline completed successfully") + + except Exception as e: + logger.error( + f"[run {workflow_run_id}] Error in Plivo pipeline: {e}", exc_info=True + ) + raise + + async def run_pipeline_vonage( websocket_client, call_uuid: str, diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index 85add83..4b0df33 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -19,6 +19,7 @@ from api.services.telephony.providers.twilio_call_strategies import ( TwilioConferenceStrategy, TwilioHangupStrategy, ) +from pipecat.serializers.plivo import PlivoFrameSerializer from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer from pipecat.serializers.asterisk import AsteriskFrameSerializer @@ -141,6 +142,60 @@ async def create_twilio_transport( ) +async def create_plivo_transport( + websocket_client: WebSocket, + stream_id: str, + call_id: str, + workflow_run_id: int, + audio_config: AudioConfig, + organization_id: int, + vad_config: dict | None = None, + ambient_noise_config: dict | None = None, +): + """Create a transport for Plivo connections.""" + from api.services.telephony.factory import load_telephony_config + + config = await load_telephony_config(organization_id) + + if config.get("provider") != "plivo": + raise ValueError(f"Expected Plivo provider, got {config.get('provider')}") + + auth_id = config.get("auth_id") + auth_token = config.get("auth_token") + + if not auth_id or not auth_token: + raise ValueError( + f"Incomplete Plivo configuration for organization {organization_id}" + ) + + serializer = PlivoFrameSerializer( + stream_id=stream_id, + call_id=call_id, + auth_id=auth_id, + auth_token=auth_token, + params=PlivoFrameSerializer.InputParams( + plivo_sample_rate=8000, + sample_rate=audio_config.pipeline_sample_rate, + ), + ) + + mixer = await _build_audio_out_mixer( + audio_config.transport_out_sample_rate, ambient_noise_config + ) + + return FastAPIWebsocketTransport( + websocket=websocket_client, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=audio_config.transport_in_sample_rate, + audio_out_sample_rate=audio_config.transport_out_sample_rate, + audio_out_mixer=mixer, + serializer=serializer, + ), + ) + + async def create_cloudonix_transport( websocket_client: WebSocket, call_id: str, diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py index 9044dc9..02a5b52 100644 --- a/api/services/telephony/factory.py +++ b/api/services/telephony/factory.py @@ -13,6 +13,7 @@ from api.enums import OrganizationConfigurationKey from api.services.telephony.base import TelephonyProvider from api.services.telephony.providers.ari_provider import ARIProvider from api.services.telephony.providers.cloudonix_provider import CloudonixProvider +from api.services.telephony.providers.plivo_provider import PlivoProvider from api.services.telephony.providers.telnyx_provider import TelnyxProvider from api.services.telephony.providers.twilio_provider import TwilioProvider from api.services.telephony.providers.vobiz_provider import VobizProvider @@ -53,6 +54,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]: "auth_token": config.value.get("auth_token"), "from_numbers": config.value.get("from_numbers", []), } + elif provider == "plivo": + return { + "provider": "plivo", + "auth_id": config.value.get("auth_id"), + "auth_token": config.value.get("auth_token"), + "from_numbers": config.value.get("from_numbers", []), + } elif provider == "vonage": return { "provider": "vonage", @@ -124,6 +132,9 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider: if provider_type == "twilio": return TwilioProvider(config) + elif provider_type == "plivo": + return PlivoProvider(config) + elif provider_type == "vonage": return VonageProvider(config) @@ -154,6 +165,7 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]: return [ ARIProvider, CloudonixProvider, + PlivoProvider, TelnyxProvider, TwilioProvider, VobizProvider, diff --git a/api/services/telephony/providers/plivo_provider.py b/api/services/telephony/providers/plivo_provider.py new file mode 100644 index 0000000..6fb2c6f --- /dev/null +++ b/api/services/telephony/providers/plivo_provider.py @@ -0,0 +1,457 @@ +""" +Plivo implementation of the TelephonyProvider interface. +""" + +import base64 +import hashlib +import hmac +import json +import os +import random +from typing import TYPE_CHECKING, Any, Dict, List, Optional +from urllib.parse import parse_qs, urlparse, urlunparse + +import aiohttp +from fastapi import HTTPException +from loguru import logger + +from api.db import db_client +from api.enums import WorkflowRunMode +from api.services.telephony.base import ( + CallInitiationResult, + NormalizedInboundData, + TelephonyProvider, +) +from api.utils.common import get_backend_endpoints + +if TYPE_CHECKING: + from fastapi import WebSocket + + +class PlivoProvider(TelephonyProvider): + """ + Plivo implementation of TelephonyProvider. + """ + + PROVIDER_NAME = WorkflowRunMode.PLIVO.value + WEBHOOK_ENDPOINT = "plivo-xml" + + def __init__(self, config: Dict[str, Any]): + self.auth_id = config.get("auth_id") + self.auth_token = config.get("auth_token") + self.from_numbers = config.get("from_numbers", []) + + if isinstance(self.from_numbers, str): + self.from_numbers = [self.from_numbers] + + self.base_url = f"https://api.plivo.com/v1/Account/{self.auth_id}" + + async def initiate_call( + self, + to_number: str, + webhook_url: str, + workflow_run_id: Optional[int] = None, + from_number: Optional[str] = None, + **kwargs: Any, + ) -> CallInitiationResult: + if not self.validate_config(): + raise ValueError("Plivo provider not properly configured") + + endpoint = f"{self.base_url}/Call/" + + if from_number is None: + from_number = random.choice(self.from_numbers) + + data = { + "from": from_number.lstrip("+"), + "to": to_number.lstrip("+"), + "answer_url": webhook_url, + "answer_method": "POST", + } + + if workflow_run_id: + backend_endpoint, _ = await get_backend_endpoints() + data.update( + { + "hangup_url": f"{backend_endpoint}/api/v1/telephony/plivo/hangup-callback/{workflow_run_id}", + "hangup_method": "POST", + "ring_url": f"{backend_endpoint}/api/v1/telephony/plivo/ring-callback/{workflow_run_id}", + "ring_method": "POST", + } + ) + + data.update(kwargs) + + async with aiohttp.ClientSession() as session: + auth = aiohttp.BasicAuth(self.auth_id, self.auth_token) + async with session.post(endpoint, json=data, auth=auth) as response: + response_text = await response.text() + if response.status not in (200, 201, 202): + raise HTTPException( + status_code=response.status, + detail=f"Failed to initiate Plivo call: {response_text}", + ) + + response_data = json.loads(response_text) + call_id = ( + response_data.get("request_uuid") + or response_data.get("call_uuid") + or response_data.get("call_uuids", [None])[0] + ) + + if not call_id: + raise HTTPException( + status_code=500, + detail=f"Plivo response missing call identifier: {response_data}", + ) + + return CallInitiationResult( + call_id=call_id, + status=response_data.get("message", "queued"), + caller_number=from_number, + provider_metadata={"call_id": call_id}, + raw_response=response_data, + ) + + async def get_call_status(self, call_id: str) -> Dict[str, Any]: + if not self.validate_config(): + raise ValueError("Plivo provider not properly configured") + + endpoint = f"{self.base_url}/Call/{call_id}/" + + async with aiohttp.ClientSession() as session: + auth = aiohttp.BasicAuth(self.auth_id, self.auth_token) + async with session.get(endpoint, auth=auth) as response: + if response.status != 200: + error_data = await response.text() + raise Exception(f"Failed to get call status: {error_data}") + + return await response.json() + + async def get_available_phone_numbers(self) -> List[str]: + return self.from_numbers + + def validate_config(self) -> bool: + return bool(self.auth_id and self.auth_token and self.from_numbers) + + @staticmethod + def _stringify_signature_value(value: Any) -> Any: + if isinstance(value, bytes): + return "".join(chr(x) for x in bytearray(value)) + if isinstance(value, (int, float, bool)): + return str(value) + if isinstance(value, list): + return [PlivoProvider._stringify_signature_value(item) for item in value] + return value + + @staticmethod + def _query_map(query: str) -> Dict[str, Any]: + return { + PlivoProvider._stringify_signature_value(key): PlivoProvider._stringify_signature_value(value) + for key, value in parse_qs(query, keep_blank_values=True).items() + } + + @staticmethod + def _sorted_query_string(params: Dict[str, Any]) -> str: + parts: list[str] = [] + for key in sorted(params.keys()): + value = params[key] + if isinstance(value, list): + normalized_values = sorted(PlivoProvider._stringify_signature_value(value)) + parts.append("&".join(f"{key}={item}" for item in normalized_values)) + else: + parts.append(f"{key}={PlivoProvider._stringify_signature_value(value)}") + return "&".join(parts) + + @staticmethod + def _sorted_params_string(params: Dict[str, Any]) -> str: + parts: list[str] = [] + for key in sorted(params.keys()): + value = params[key] + if isinstance(value, list): + normalized_values = sorted(PlivoProvider._stringify_signature_value(value)) + parts.append("".join(f"{key}{item}" for item in normalized_values)) + elif isinstance(value, dict): + parts.append(f"{key}{PlivoProvider._sorted_params_string(value)}") + else: + parts.append(f"{key}{PlivoProvider._stringify_signature_value(value)}") + return "".join(parts) + + @staticmethod + def _construct_get_url(uri: str, params: Dict[str, Any], empty_post_params: bool = True) -> str: + parsed_uri = urlparse(uri) + base_url = urlunparse((parsed_uri.scheme, parsed_uri.netloc, parsed_uri.path, "", "", "")) + + combined_params = dict(params) + combined_params.update(PlivoProvider._query_map(parsed_uri.query)) + query_params = PlivoProvider._sorted_query_string(combined_params) + + if query_params or not empty_post_params: + base_url = f"{base_url}?{query_params}" + if query_params and not empty_post_params: + base_url = f"{base_url}." + return base_url + + @staticmethod + def _construct_post_url(uri: str, params: Dict[str, Any]) -> str: + base_url = PlivoProvider._construct_get_url( + uri, + {}, + empty_post_params=(len(params) == 0), + ) + return f"{base_url}{PlivoProvider._sorted_params_string(params)}" + + async def verify_webhook_signature( + self, + url: str, + params: Dict[str, Any], + signature: str, + nonce: str = "", + ) -> bool: + if not self.auth_token or not signature or not nonce: + return False + + payload = f"{self._construct_post_url(url, params)}.{nonce}" + computed = base64.b64encode( + hmac.new( + self.auth_token.encode("utf-8"), + payload.encode("utf-8"), + hashlib.sha256, + ).digest() + ).decode("utf-8") + + candidates = [candidate.strip() for candidate in signature.split(",") if candidate] + return any(hmac.compare_digest(computed, candidate) for candidate in candidates) + + async def get_webhook_response( + self, workflow_id: int, user_id: int, workflow_run_id: int + ) -> str: + _, wss_backend_endpoint = await get_backend_endpoints() + + return f""" + + {wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id} +""" + + async def get_call_cost(self, call_id: str) -> Dict[str, Any]: + endpoint = f"{self.base_url}/Call/{call_id}/" + + try: + async with aiohttp.ClientSession() as session: + auth = aiohttp.BasicAuth(self.auth_id, self.auth_token) + async with session.get(endpoint, auth=auth) as response: + if response.status != 200: + error_data = await response.text() + logger.error(f"Failed to get Plivo call cost: {error_data}") + return { + "cost_usd": 0.0, + "duration": 0, + "status": "error", + "error": str(error_data), + } + + call_data = await response.json() + total_amount = float(call_data.get("total_amount", 0) or 0) + duration = int(call_data.get("duration", 0) or 0) + + return { + "cost_usd": total_amount, + "duration": duration, + "status": call_data.get("call_status", "unknown"), + "price_unit": "USD", + "raw_response": call_data, + } + except Exception as e: + logger.error(f"Exception fetching Plivo call cost: {e}") + return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)} + + def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: + status_map = { + "in-progress": "answered", + "ringing": "ringing", + "ring": "ringing", + "completed": "completed", + "hangup": "completed", + "stopstream": "completed", + "busy": "busy", + "no-answer": "no-answer", + "cancel": "canceled", + "cancelled": "canceled", + "timeout": "no-answer", + } + + call_status = (data.get("CallStatus") or data.get("Event") or "").lower() + return { + "call_id": data.get("CallUUID", "") or data.get("RequestUUID", ""), + "status": status_map.get(call_status, call_status), + "from_number": data.get("From"), + "to_number": data.get("To"), + "direction": data.get("Direction"), + "duration": data.get("Duration"), + "extra": data, + } + + async def handle_websocket( + self, + websocket: "WebSocket", + workflow_id: int, + user_id: int, + workflow_run_id: int, + ) -> None: + from api.services.pipecat.run_pipeline import run_pipeline_plivo + + first_msg = await websocket.receive_text() + start_msg = json.loads(first_msg) + + if start_msg.get("event") != "start": + logger.error(f"Expected 'start' event, got: {start_msg.get('event')}") + await websocket.close(code=4400, reason="Expected start event") + return + + start_data = start_msg.get("start", {}) + stream_id = start_data.get("streamId") or start_msg.get("streamId") + + if not stream_id: + logger.error(f"Missing streamId in start event: {start_msg}") + await websocket.close(code=4400, reason="Missing streamId") + return + + workflow_run = await db_client.get_workflow_run(workflow_run_id) + call_id = None + if workflow_run and workflow_run.gathered_context: + call_id = workflow_run.gathered_context.get("call_id") + + if not call_id: + call_id = start_data.get("callId") or start_data.get("callUUID") + + if not call_id: + logger.error(f"Missing call ID for Plivo workflow run {workflow_run_id}") + await websocket.close(code=4400, reason="Missing call ID") + return + + await run_pipeline_plivo( + websocket, stream_id, call_id, workflow_id, workflow_run_id, user_id + ) + + @classmethod + def can_handle_webhook( + cls, webhook_data: Dict[str, Any], headers: Dict[str, str] + ) -> bool: + has_plivo_signature = ( + "x-plivo-signature-v3" in headers + or "x-plivo-signature-ma-v3" in headers + ) + return has_plivo_signature and "CallUUID" in webhook_data + + @staticmethod + def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: + return NormalizedInboundData( + provider=PlivoProvider.PROVIDER_NAME, + call_id=webhook_data.get("CallUUID", "") or webhook_data.get("RequestUUID", ""), + from_number=PlivoProvider.normalize_phone_number(webhook_data.get("From", "")), + to_number=PlivoProvider.normalize_phone_number(webhook_data.get("To", "")), + direction=webhook_data.get("Direction", ""), + call_status=webhook_data.get("CallStatus", ""), + account_id=webhook_data.get("AuthID") or webhook_data.get("ParentAuthID"), + raw_data=webhook_data, + ) + + @staticmethod + def validate_account_id(config_data: dict, webhook_account_id: str) -> bool: + if webhook_account_id: + return config_data.get("auth_id") == webhook_account_id + # AuthID is not always present in Plivo webhooks (undocumented field). + # Fall back to checking that the org has a Plivo config at all. + logger.warning( + "Plivo webhook missing AuthID/ParentAuthID - " + "falling back to config existence check" + ) + return bool(config_data.get("auth_id")) + + @staticmethod + def normalize_phone_number(phone_number: str) -> str: + if not phone_number: + return "" + + clean_number = phone_number.lstrip("+") + if clean_number.startswith("1") and len(clean_number) == 11: + return f"+{clean_number}" + if len(clean_number) == 10: + return f"+1{clean_number}" + if len(clean_number) > 10: + return f"+{clean_number}" + + return phone_number + + async def verify_inbound_signature( + self, + url: str, + webhook_data: Dict[str, Any], + signature: str, + nonce: str = "", + ) -> bool: + if os.getenv("ENVIRONMENT") == "local": + logger.warning( + "Skipping Plivo inbound signature verification in local environment" + ) + return True + return await self.verify_webhook_signature(url, webhook_data, signature, nonce) + + @staticmethod + async def generate_inbound_response( + websocket_url: str, workflow_run_id: int = None + ) -> tuple: + from fastapi import Response + + hangup_callback_attr = "" + if workflow_run_id: + backend_endpoint, _ = await get_backend_endpoints() + hangup_url = f"{backend_endpoint}/api/v1/telephony/plivo/hangup-callback/{workflow_run_id}" + hangup_callback_attr = f' statusCallbackUrl="{hangup_url}" statusCallbackMethod="POST"' + + plivo_xml = f""" + + {websocket_url} +""" + return Response(content=plivo_xml, media_type="application/xml") + + @staticmethod + def generate_error_response(error_type: str, message: str) -> tuple: + from fastapi import Response + + plivo_xml = f""" + + Sorry, there was an error processing your call. {message} + +""" + return Response(content=plivo_xml, media_type="application/xml") + + @staticmethod + def generate_validation_error_response(error_type) -> tuple: + from fastapi import Response + + from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError + + message = TELEPHONY_ERROR_MESSAGES.get( + error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED] + ) + + plivo_xml = f""" + + {message} + +""" + return Response(content=plivo_xml, media_type="application/xml") + + async def transfer_call( + self, + destination: str, + transfer_id: str, + conference_name: str, + timeout: int = 30, + **kwargs: Any, + ) -> Dict[str, Any]: + raise NotImplementedError("Plivo provider does not support call transfers") + + def supports_transfers(self) -> bool: + return False diff --git a/ui/src/app/telephony-configurations/page.tsx b/ui/src/app/telephony-configurations/page.tsx index d0cd71e..a6d08a1 100644 --- a/ui/src/app/telephony-configurations/page.tsx +++ b/ui/src/app/telephony-configurations/page.tsx @@ -12,6 +12,8 @@ import type { AriConfigurationResponse, CloudonixConfigurationRequest, CloudonixConfigurationResponse, + PlivoConfigurationRequest, + PlivoConfigurationResponse, TelephonyConfigurationResponse, TelnyxConfigurationRequest, TelnyxConfigurationResponse, @@ -49,6 +51,9 @@ interface TelephonyConfigForm { private_key?: string; api_key?: string; api_secret?: string; + // Plivo fields + plivo_auth_id?: string; + plivo_auth_token?: string; // Vobiz fields auth_id?: string; vobiz_auth_token?: string; @@ -146,6 +151,13 @@ export default function ConfigureTelephonyPage() { setValue("auth_id", response.data.vobiz.auth_id); setValue("vobiz_auth_token", response.data.vobiz.auth_token); setValue("from_numbers", response.data.vobiz.from_numbers?.length > 0 ? response.data.vobiz.from_numbers : [""]); + } else if ((response.data as TelephonyConfigurationResponse)?.plivo) { + const plivoConfig = (response.data as TelephonyConfigurationResponse).plivo as PlivoConfigurationResponse; + setHasExistingConfig(true); + setValue("provider", "plivo"); + setValue("plivo_auth_id", plivoConfig.auth_id); + setValue("plivo_auth_token", plivoConfig.auth_token); + setValue("from_numbers", plivoConfig.from_numbers?.length > 0 ? plivoConfig.from_numbers : [""]); } else if ((response.data as TelephonyConfigurationResponse)?.cloudonix) { const cloudonixConfig = (response.data as TelephonyConfigurationResponse).cloudonix as CloudonixConfigurationResponse; setHasExistingConfig(true); @@ -192,6 +204,7 @@ export default function ConfigureTelephonyPage() { // Build the request body based on provider let requestBody: | TwilioConfigurationRequest + | PlivoConfigurationRequest | VonageConfigurationRequest | VobizConfigurationRequest | CloudonixConfigurationRequest @@ -214,7 +227,7 @@ export default function ConfigureTelephonyPage() { let pattern: RegExp; let formatMessage: string; - if (data.provider === "twilio" || data.provider === "telnyx") { + if (data.provider === "twilio" || data.provider === "telnyx" || data.provider === "plivo") { pattern = twilioPattern; formatMessage = "with + prefix (e.g., +1234567890)"; } else if (data.provider === "cloudonix") { @@ -259,6 +272,13 @@ export default function ConfigureTelephonyPage() { auth_id: data.auth_id, auth_token: data.vobiz_auth_token, } as VobizConfigurationRequest; + } else if (data.provider === "plivo") { + requestBody = { + provider: data.provider, + from_numbers: filteredNumbers, + auth_id: data.plivo_auth_id!, + auth_token: data.plivo_auth_token!, + } as PlivoConfigurationRequest; } else if (data.provider === "telnyx") { requestBody = { provider: data.provider, @@ -335,6 +355,8 @@ export default function ConfigureTelephonyPage() { ? "Vonage" : selectedProvider === "vobiz" ? "Vobiz" + : selectedProvider === "plivo" + ? "Plivo" : selectedProvider === "telnyx" ? "Telnyx" : selectedProvider === "ari" @@ -385,6 +407,12 @@ export default function ConfigureTelephonyPage() { {" "} for developer documentation and API reference. + ) : selectedProvider === "plivo" ? ( + <> + Plivo is a cloud communications platform providing voice and messaging + APIs. Use Plivo to build voice applications with real-time audio streaming + and global telephony coverage. + ) : selectedProvider === "vobiz" ? ( <> Vobiz is a telephony provider. Visit their documentation @@ -438,6 +466,25 @@ export default function ConfigureTelephonyPage() {

+ ) : selectedProvider === "plivo" ? ( +
+
+

Getting Started with Plivo:

+
    +
  1. Sign up at plivo.com and go to the Console Dashboard
  2. +
  3. Find your Auth ID and Auth Token on the Dashboard overview page
  4. +
  5. Purchase a phone number under Phone Numbers > Buy Numbers
  6. +
  7. Create an XML Application under Voice > XML Applications
  8. +
  9. Enter your Auth ID, Auth Token, and phone numbers below
  10. +
+
+
+

+ Note: Plivo uses XML-based call control with bidirectional + audio streaming. Phone numbers should be in E.164 format with + prefix (e.g., +1234567890). +

+
+
) : selectedProvider === "twilio" || selectedProvider === "vonage" ? (