From 2218ba8ad9278251a6d2fb9eb7dad3f2f67734a0 Mon Sep 17 00:00:00 2001
From: dilipevents2007-cpu
Date: Sat, 25 Apr 2026 20:41:46 +0530
Subject: [PATCH] feat: add Plivo telephony provider support (#245)
* Add Plivo telephony provider support
* add Plivo telephony UI, fix audio config, and improve inbound call handling
---------
Co-authored-by: Dilip Tiwari
Co-authored-by: Sabiha Khan
Co-authored-by: Abhishek
---
.../versions/f2e1d0c9b8a7_add_plivo_mode.py | 74 +++
api/enums.py | 1 +
api/routes/organization.py | 34 ++
api/routes/telephony.py | 215 +++++++-
api/schemas/telephony_config.py | 21 +
api/services/pipecat/audio_config.py | 5 +-
api/services/pipecat/run_pipeline.py | 64 +++
api/services/pipecat/transport_setup.py | 55 +++
api/services/telephony/factory.py | 12 +
.../telephony/providers/plivo_provider.py | 457 ++++++++++++++++++
ui/src/app/telephony-configurations/page.tsx | 136 +++++-
.../components/PhoneCallDialog.tsx | 2 +-
ui/src/client/types.gen.ts | 57 ++-
ui/src/constants/workflowRunModes.ts | 3 +-
14 files changed, 1123 insertions(+), 13 deletions(-)
create mode 100644 api/alembic/versions/f2e1d0c9b8a7_add_plivo_mode.py
create mode 100644 api/services/telephony/providers/plivo_provider.py
diff --git a/api/alembic/versions/f2e1d0c9b8a7_add_plivo_mode.py b/api/alembic/versions/f2e1d0c9b8a7_add_plivo_mode.py
new file mode 100644
index 0000000..6a59d54
--- /dev/null
+++ b/api/alembic/versions/f2e1d0c9b8a7_add_plivo_mode.py
@@ -0,0 +1,74 @@
+"""add plivo mode
+
+Revision ID: f2e1d0c9b8a7
+Revises: a1b2c3d4e5f6, 67a5cf3e09d0
+Create Date: 2026-04-13 16:35:00.000000
+
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+from alembic_postgresql_enum import TableReference
+
+# revision identifiers, used by Alembic.
+revision: str = "f2e1d0c9b8a7"
+down_revision: Union[str, Sequence[str], None] = (
+ "a1b2c3d4e5f6",
+ "67a5cf3e09d0",
+)
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.sync_enum_values(
+ enum_schema="public",
+ enum_name="workflow_run_mode",
+ new_values=[
+ "ari",
+ "plivo",
+ "twilio",
+ "vonage",
+ "vobiz",
+ "cloudonix",
+ "telnyx",
+ "webrtc",
+ "smallwebrtc",
+ "stasis",
+ "VOICE",
+ "CHAT",
+ ],
+ affected_columns=[
+ TableReference(
+ table_schema="public", table_name="workflow_runs", column_name="mode"
+ )
+ ],
+ enum_values_to_rename=[],
+ )
+
+
+def downgrade() -> None:
+ op.sync_enum_values(
+ enum_schema="public",
+ enum_name="workflow_run_mode",
+ new_values=[
+ "ari",
+ "twilio",
+ "vonage",
+ "vobiz",
+ "cloudonix",
+ "telnyx",
+ "webrtc",
+ "smallwebrtc",
+ "stasis",
+ "VOICE",
+ "CHAT",
+ ],
+ affected_columns=[
+ TableReference(
+ table_schema="public", table_name="workflow_runs", column_name="mode"
+ )
+ ],
+ enum_values_to_rename=[],
+ )
diff --git a/api/enums.py b/api/enums.py
index c07879a..b7655b1 100644
--- a/api/enums.py
+++ b/api/enums.py
@@ -19,6 +19,7 @@ class CallType(Enum):
class WorkflowRunMode(Enum):
ARI = "ari"
+ PLIVO = "plivo"
TWILIO = "twilio"
VONAGE = "vonage"
VOBIZ = "vobiz"
diff --git a/api/routes/organization.py b/api/routes/organization.py
index f35ef0a..9db9b8a 100644
--- a/api/routes/organization.py
+++ b/api/routes/organization.py
@@ -12,6 +12,8 @@ from api.schemas.telephony_config import (
ARIConfigurationResponse,
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
+ PlivoConfigurationRequest,
+ PlivoConfigurationResponse,
TelephonyConfigurationResponse,
TelnyxConfigurationRequest,
TelnyxConfigurationResponse,
@@ -33,6 +35,7 @@ router = APIRouter(prefix="/organizations", tags=["organizations"])
# Provider configuration constants
PROVIDER_MASKED_FIELDS = {
"twilio": ["account_sid", "auth_token"],
+ "plivo": ["auth_id", "auth_token"],
"vonage": ["private_key", "api_key", "api_secret"],
"vobiz": ["auth_id", "auth_token"],
"cloudonix": ["bearer_token"],
@@ -72,6 +75,26 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
auth_token=mask_key(auth_token) if auth_token else "",
from_numbers=from_numbers,
),
+ plivo=None,
+ vonage=None,
+ vobiz=None,
+ cloudonix=None,
+ )
+ elif stored_provider == "plivo":
+ auth_id = config.value.get("auth_id", "")
+ auth_token = config.value.get("auth_token", "")
+ from_numbers = (
+ config.value.get("from_numbers", []) if auth_id and auth_token else []
+ )
+
+ return TelephonyConfigurationResponse(
+ twilio=None,
+ plivo=PlivoConfigurationResponse(
+ provider="plivo",
+ auth_id=mask_key(auth_id) if auth_id else "",
+ auth_token=mask_key(auth_token) if auth_token else "",
+ from_numbers=from_numbers,
+ ),
vonage=None,
vobiz=None,
cloudonix=None,
@@ -89,6 +112,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
return TelephonyConfigurationResponse(
twilio=None,
+ plivo=None,
vonage=VonageConfigurationResponse(
provider="vonage",
application_id=application_id,
@@ -109,6 +133,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
return TelephonyConfigurationResponse(
twilio=None,
+ plivo=None,
vonage=None,
vobiz=VobizConfigurationResponse(
provider="vobiz",
@@ -125,6 +150,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
return TelephonyConfigurationResponse(
twilio=None,
+ plivo=None,
vonage=None,
cloudonix=CloudonixConfigurationResponse(
provider="cloudonix",
@@ -175,6 +201,7 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
async def save_telephony_configuration(
request: Union[
TwilioConfigurationRequest,
+ PlivoConfigurationRequest,
VonageConfigurationRequest,
VobizConfigurationRequest,
CloudonixConfigurationRequest,
@@ -201,6 +228,13 @@ async def save_telephony_configuration(
"auth_token": request.auth_token,
"from_numbers": request.from_numbers,
}
+ elif request.provider == "plivo":
+ config_value = {
+ "provider": "plivo",
+ "auth_id": request.auth_id,
+ "auth_token": request.auth_token,
+ "from_numbers": request.from_numbers,
+ }
elif request.provider == "vonage":
config_value = {
"provider": "vonage",
diff --git a/api/routes/telephony.py b/api/routes/telephony.py
index 645c6a5..b9951f2 100644
--- a/api/routes/telephony.py
+++ b/api/routes/telephony.py
@@ -89,6 +89,33 @@ class StatusCallbackRequest(BaseModel):
extra=data,
)
+ @classmethod
+ def from_plivo(cls, data: dict):
+ """Convert Plivo callback to generic format"""
+ status_map = {
+ "in-progress": "answered",
+ "ringing": "ringing",
+ "ring": "ringing",
+ "completed": "completed",
+ "hangup": "completed",
+ "stopstream": "completed",
+ "busy": "busy",
+ "no-answer": "no-answer",
+ "cancel": "canceled",
+ "cancelled": "canceled",
+ "timeout": "no-answer",
+ }
+ call_status = (data.get("CallStatus") or data.get("Event") or "").lower()
+ return cls(
+ call_id=data.get("CallUUID", "") or data.get("RequestUUID", ""),
+ status=status_map.get(call_status, call_status),
+ from_number=data.get("From"),
+ to_number=data.get("To"),
+ direction=data.get("Direction"),
+ duration=data.get("Duration"),
+ extra=data,
+ )
+
@classmethod
def from_vonage(cls, data: dict):
"""Convert Vonage event to generic format"""
@@ -340,6 +367,9 @@ async def _validate_inbound_request(
webhook_data: dict,
webhook_body: str = "",
x_twilio_signature: str = None,
+ x_plivo_signature: str = None,
+ x_plivo_signature_ma: str = None,
+ x_plivo_signature_nonce: str = None,
x_vobiz_signature: str = None,
x_vobiz_timestamp: str = None,
x_cx_apikey: str = None,
@@ -377,7 +407,14 @@ async def _validate_inbound_request(
# Verify webhook signature/API key if provided
provider_instance = None
- if x_twilio_signature or x_vobiz_signature or x_cx_apikey or telnyx_signature:
+ if (
+ x_twilio_signature
+ or x_plivo_signature
+ or x_plivo_signature_ma
+ or x_vobiz_signature
+ or x_cx_apikey
+ or telnyx_signature
+ ):
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}"
@@ -389,6 +426,16 @@ async def _validate_inbound_request(
signature_valid = await provider_instance.verify_inbound_signature(
webhook_url, webhook_data, x_twilio_signature
)
+ elif provider_class.PROVIDER_NAME == "plivo" and (
+ x_plivo_signature or x_plivo_signature_ma
+ ):
+ logger.info(f"Verifying Plivo signature for URL: {webhook_url}")
+ signature_valid = await provider_instance.verify_inbound_signature(
+ webhook_url,
+ webhook_data,
+ x_plivo_signature or x_plivo_signature_ma,
+ x_plivo_signature_nonce,
+ )
elif provider_class.PROVIDER_NAME == "vobiz" and x_vobiz_signature:
logger.info(f"Verifying Vobiz signature for URL: {webhook_url}")
signature_valid = await provider_instance.verify_inbound_signature(
@@ -478,12 +525,6 @@ async def _validate_organization_provider_config(
organization_id: int, provider_class, account_id: str
) -> TelephonyError:
"""Validate provider and account_id, returning specific error type"""
- if not account_id:
- logger.warning(
- f"No account_id provided for provider {provider_class.PROVIDER_NAME}"
- )
- return TelephonyError.ACCOUNT_VALIDATION_FAILED
-
try:
config = await db_client.get_configuration(
organization_id,
@@ -1015,6 +1056,160 @@ async def handle_vobiz_xml_webhook(
return HTMLResponse(content=response_content, media_type="application/xml")
+@router.post("/plivo-xml", include_in_schema=False)
+async def handle_plivo_xml_webhook(
+ workflow_id: int,
+ user_id: int,
+ workflow_run_id: int,
+ organization_id: int,
+ request: Request,
+ x_plivo_signature_v3: Optional[str] = Header(None),
+ x_plivo_signature_ma_v3: Optional[str] = Header(None),
+ x_plivo_signature_v3_nonce: Optional[str] = Header(None),
+):
+ """
+ Handle initial webhook from Plivo when an outbound call is answered.
+ Returns Plivo XML response with Stream element.
+ """
+ set_current_run_id(workflow_run_id)
+ provider = await get_telephony_provider(organization_id)
+
+ form_data = await request.form()
+ callback_data = dict(form_data)
+
+ signature = x_plivo_signature_v3 or x_plivo_signature_ma_v3
+ if signature:
+ backend_endpoint, _ = await get_backend_endpoints()
+ full_url = (
+ f"{backend_endpoint}/api/v1/telephony/plivo-xml"
+ f"?workflow_id={workflow_id}"
+ f"&user_id={user_id}"
+ f"&workflow_run_id={workflow_run_id}"
+ f"&organization_id={organization_id}"
+ )
+ is_valid = await provider.verify_inbound_signature(
+ full_url, callback_data, signature, x_plivo_signature_v3_nonce
+ )
+ if not is_valid:
+ logger.warning(
+ f"[run {workflow_run_id}] Invalid Plivo signature on answer webhook"
+ )
+ return provider.generate_error_response(
+ "invalid_signature", "Invalid webhook signature."
+ )
+
+ call_id = callback_data.get("CallUUID") or callback_data.get("RequestUUID")
+ if call_id:
+ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
+ gathered_context = dict(workflow_run.gathered_context or {})
+ gathered_context["call_id"] = call_id
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id, gathered_context=gathered_context
+ )
+
+ response_content = await provider.get_webhook_response(
+ workflow_id, user_id, workflow_run_id
+ )
+ return HTMLResponse(content=response_content, media_type="application/xml")
+
+
+async def _handle_plivo_status_callback(
+ workflow_run_id: int,
+ request: Request,
+ x_plivo_signature_v3: Optional[str],
+ x_plivo_signature_ma_v3: Optional[str],
+ x_plivo_signature_v3_nonce: Optional[str],
+):
+ set_current_run_id(workflow_run_id)
+
+ form_data = await request.form()
+ callback_data = dict(form_data)
+ logger.info(
+ f"[run {workflow_run_id}] Received Plivo callback: {json.dumps(callback_data)}"
+ )
+
+ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
+ if not workflow_run:
+ logger.warning(f"Workflow run {workflow_run_id} not found for Plivo callback")
+ return {"status": "ignored", "reason": "workflow_run_not_found"}
+
+ workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
+ if not workflow:
+ logger.warning(f"Workflow {workflow_run.workflow_id} not found")
+ return {"status": "ignored", "reason": "workflow_not_found"}
+
+ provider = await get_telephony_provider(workflow.organization_id)
+
+ signature = x_plivo_signature_v3 or x_plivo_signature_ma_v3
+ if signature:
+ backend_endpoint, _ = await get_backend_endpoints()
+ callback_kind = request.url.path.split("/")[-2]
+ full_url = (
+ f"{backend_endpoint}/api/v1/telephony/plivo/{callback_kind}/{workflow_run_id}"
+ )
+ is_valid = await provider.verify_inbound_signature(
+ full_url,
+ callback_data,
+ signature,
+ x_plivo_signature_v3_nonce,
+ )
+ if not is_valid:
+ logger.warning(
+ f"[run {workflow_run_id}] Invalid Plivo webhook signature"
+ )
+ return {"status": "error", "reason": "invalid_signature"}
+
+ parsed_data = provider.parse_status_callback(callback_data)
+ status_update = StatusCallbackRequest(
+ call_id=parsed_data["call_id"],
+ status=parsed_data["status"],
+ from_number=parsed_data.get("from_number"),
+ to_number=parsed_data.get("to_number"),
+ direction=parsed_data.get("direction"),
+ duration=parsed_data.get("duration"),
+ extra=parsed_data.get("extra", {}),
+ )
+
+ await _process_status_update(workflow_run_id, status_update)
+ return {"status": "success"}
+
+
+@router.post("/plivo/hangup-callback/{workflow_run_id}")
+async def handle_plivo_hangup_callback(
+ workflow_run_id: int,
+ request: Request,
+ x_plivo_signature_v3: Optional[str] = Header(None),
+ x_plivo_signature_ma_v3: Optional[str] = Header(None),
+ x_plivo_signature_v3_nonce: Optional[str] = Header(None),
+):
+ """Handle Plivo hangup callbacks."""
+ return await _handle_plivo_status_callback(
+ workflow_run_id,
+ request,
+ x_plivo_signature_v3,
+ x_plivo_signature_ma_v3,
+ x_plivo_signature_v3_nonce,
+ )
+
+
+@router.post("/plivo/ring-callback/{workflow_run_id}")
+async def handle_plivo_ring_callback(
+ workflow_run_id: int,
+ request: Request,
+ x_plivo_signature_v3: Optional[str] = Header(None),
+ x_plivo_signature_ma_v3: Optional[str] = Header(None),
+ x_plivo_signature_v3_nonce: Optional[str] = Header(None),
+):
+ """Handle Plivo ring callbacks."""
+ return await _handle_plivo_status_callback(
+ workflow_run_id,
+ request,
+ x_plivo_signature_v3,
+ x_plivo_signature_ma_v3,
+ x_plivo_signature_v3_nonce,
+ )
+
+
@router.post("/vobiz/hangup-callback/{workflow_run_id}")
async def handle_vobiz_hangup_callback(
workflow_run_id: int,
@@ -1440,6 +1635,9 @@ async def handle_inbound_telephony(
workflow_id: int,
request: Request,
x_twilio_signature: Optional[str] = Header(None),
+ x_plivo_signature_v3: Optional[str] = Header(None),
+ x_plivo_signature_ma_v3: Optional[str] = Header(None),
+ x_plivo_signature_v3_nonce: Optional[str] = Header(None),
x_vobiz_signature: Optional[str] = Header(None),
x_vobiz_timestamp: Optional[str] = Header(None),
x_cx_apikey: Optional[str] = Header(None),
@@ -1495,6 +1693,9 @@ async def handle_inbound_telephony(
webhook_data,
webhook_body,
x_twilio_signature,
+ x_plivo_signature_v3,
+ x_plivo_signature_ma_v3,
+ x_plivo_signature_v3_nonce,
x_vobiz_signature,
x_vobiz_timestamp,
x_cx_apikey,
diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py
index e1c3525..805060d 100644
--- a/api/schemas/telephony_config.py
+++ b/api/schemas/telephony_config.py
@@ -23,6 +23,26 @@ class TwilioConfigurationResponse(BaseModel):
from_numbers: List[str]
+class PlivoConfigurationRequest(BaseModel):
+ """Request schema for Plivo configuration."""
+
+ provider: str = Field(default="plivo")
+ auth_id: str = Field(..., description="Plivo Auth ID")
+ auth_token: str = Field(..., description="Plivo Auth Token")
+ from_numbers: List[str] = Field(
+ ..., min_length=1, description="List of Plivo phone numbers"
+ )
+
+
+class PlivoConfigurationResponse(BaseModel):
+ """Response schema for Plivo configuration with masked sensitive fields."""
+
+ provider: str
+ auth_id: str # Masked
+ auth_token: str # Masked
+ from_numbers: List[str]
+
+
class VonageConfigurationRequest(BaseModel):
"""Request schema for Vonage configuration."""
@@ -151,6 +171,7 @@ class TelephonyConfigurationResponse(BaseModel):
"""Top-level telephony configuration response."""
twilio: Optional[TwilioConfigurationResponse] = None
+ plivo: Optional[PlivoConfigurationResponse] = None
vonage: Optional[VonageConfigurationResponse] = None
vobiz: Optional[VobizConfigurationResponse] = None
cloudonix: Optional[CloudonixConfigurationResponse] = None
diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py
index 6f59de2..c0ba9e8 100644
--- a/api/services/pipecat/audio_config.py
+++ b/api/services/pipecat/audio_config.py
@@ -87,19 +87,20 @@ def create_audio_config(transport_type: str) -> AudioConfig:
"""Create audio configuration based on transport type.
Args:
- transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix")
+ transport_type: Type of transport ("webrtc", "twilio", "plivo", "vonage", "vobiz", "cloudonix")
Returns:
AudioConfig instance with appropriate settings
"""
if transport_type in (
WorkflowRunMode.TWILIO.value,
+ WorkflowRunMode.PLIVO.value,
WorkflowRunMode.VOBIZ.value,
WorkflowRunMode.CLOUDONIX.value,
WorkflowRunMode.ARI.value,
WorkflowRunMode.TELNYX.value,
):
- # Twilio, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz
+ # Twilio, Plivo, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz
return AudioConfig(
transport_in_sample_rate=8000,
transport_out_sample_rate=8000,
diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py
index 6584433..88234c6 100644
--- a/api/services/pipecat/run_pipeline.py
+++ b/api/services/pipecat/run_pipeline.py
@@ -47,6 +47,7 @@ from api.services.pipecat.tracing_config import (
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
+ create_plivo_transport,
create_telnyx_transport,
create_twilio_transport,
create_vobiz_transport,
@@ -151,6 +152,69 @@ async def run_pipeline_twilio(
)
+async def run_pipeline_plivo(
+ websocket_client: WebSocket,
+ stream_id: str,
+ call_id: str,
+ workflow_id: int,
+ workflow_run_id: int,
+ user_id: int,
+) -> None:
+ """Run pipeline for Plivo WebSocket connections."""
+ logger.info(
+ f"[run {workflow_run_id}] Starting Plivo pipeline - "
+ f"stream_id={stream_id}, call_id={call_id}, workflow_id={workflow_id}"
+ )
+ set_current_run_id(workflow_run_id)
+
+ cost_info = {"call_id": call_id}
+ await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
+
+ workflow = await db_client.get_workflow(workflow_id, user_id)
+
+ if workflow:
+ set_current_org_id(workflow.organization_id)
+
+ vad_config = None
+ ambient_noise_config = None
+ if workflow and workflow.workflow_configurations:
+ if "vad_configuration" in workflow.workflow_configurations:
+ vad_config = workflow.workflow_configurations["vad_configuration"]
+ if "ambient_noise_configuration" in workflow.workflow_configurations:
+ ambient_noise_config = workflow.workflow_configurations[
+ "ambient_noise_configuration"
+ ]
+
+ try:
+ audio_config = create_audio_config(WorkflowRunMode.PLIVO.value)
+
+ transport = await create_plivo_transport(
+ websocket_client,
+ stream_id,
+ call_id,
+ workflow_run_id,
+ audio_config,
+ workflow.organization_id,
+ vad_config,
+ ambient_noise_config,
+ )
+
+ await _run_pipeline(
+ transport,
+ workflow_id,
+ workflow_run_id,
+ user_id,
+ audio_config=audio_config,
+ )
+ logger.info(f"[run {workflow_run_id}] Plivo pipeline completed successfully")
+
+ except Exception as e:
+ logger.error(
+ f"[run {workflow_run_id}] Error in Plivo pipeline: {e}", exc_info=True
+ )
+ raise
+
+
async def run_pipeline_vonage(
websocket_client,
call_uuid: str,
diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py
index 85add83..4b0df33 100644
--- a/api/services/pipecat/transport_setup.py
+++ b/api/services/pipecat/transport_setup.py
@@ -19,6 +19,7 @@ from api.services.telephony.providers.twilio_call_strategies import (
TwilioConferenceStrategy,
TwilioHangupStrategy,
)
+from pipecat.serializers.plivo import PlivoFrameSerializer
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
@@ -141,6 +142,60 @@ async def create_twilio_transport(
)
+async def create_plivo_transport(
+ websocket_client: WebSocket,
+ stream_id: str,
+ call_id: str,
+ workflow_run_id: int,
+ audio_config: AudioConfig,
+ organization_id: int,
+ vad_config: dict | None = None,
+ ambient_noise_config: dict | None = None,
+):
+ """Create a transport for Plivo connections."""
+ from api.services.telephony.factory import load_telephony_config
+
+ config = await load_telephony_config(organization_id)
+
+ if config.get("provider") != "plivo":
+ raise ValueError(f"Expected Plivo provider, got {config.get('provider')}")
+
+ auth_id = config.get("auth_id")
+ auth_token = config.get("auth_token")
+
+ if not auth_id or not auth_token:
+ raise ValueError(
+ f"Incomplete Plivo configuration for organization {organization_id}"
+ )
+
+ serializer = PlivoFrameSerializer(
+ stream_id=stream_id,
+ call_id=call_id,
+ auth_id=auth_id,
+ auth_token=auth_token,
+ params=PlivoFrameSerializer.InputParams(
+ plivo_sample_rate=8000,
+ sample_rate=audio_config.pipeline_sample_rate,
+ ),
+ )
+
+ mixer = await _build_audio_out_mixer(
+ audio_config.transport_out_sample_rate, ambient_noise_config
+ )
+
+ return FastAPIWebsocketTransport(
+ websocket=websocket_client,
+ params=FastAPIWebsocketParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ audio_in_sample_rate=audio_config.transport_in_sample_rate,
+ audio_out_sample_rate=audio_config.transport_out_sample_rate,
+ audio_out_mixer=mixer,
+ serializer=serializer,
+ ),
+ )
+
+
async def create_cloudonix_transport(
websocket_client: WebSocket,
call_id: str,
diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py
index 9044dc9..02a5b52 100644
--- a/api/services/telephony/factory.py
+++ b/api/services/telephony/factory.py
@@ -13,6 +13,7 @@ from api.enums import OrganizationConfigurationKey
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.providers.ari_provider import ARIProvider
from api.services.telephony.providers.cloudonix_provider import CloudonixProvider
+from api.services.telephony.providers.plivo_provider import PlivoProvider
from api.services.telephony.providers.telnyx_provider import TelnyxProvider
from api.services.telephony.providers.twilio_provider import TwilioProvider
from api.services.telephony.providers.vobiz_provider import VobizProvider
@@ -53,6 +54,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"auth_token": config.value.get("auth_token"),
"from_numbers": config.value.get("from_numbers", []),
}
+ elif provider == "plivo":
+ return {
+ "provider": "plivo",
+ "auth_id": config.value.get("auth_id"),
+ "auth_token": config.value.get("auth_token"),
+ "from_numbers": config.value.get("from_numbers", []),
+ }
elif provider == "vonage":
return {
"provider": "vonage",
@@ -124,6 +132,9 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider:
if provider_type == "twilio":
return TwilioProvider(config)
+ elif provider_type == "plivo":
+ return PlivoProvider(config)
+
elif provider_type == "vonage":
return VonageProvider(config)
@@ -154,6 +165,7 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
return [
ARIProvider,
CloudonixProvider,
+ PlivoProvider,
TelnyxProvider,
TwilioProvider,
VobizProvider,
diff --git a/api/services/telephony/providers/plivo_provider.py b/api/services/telephony/providers/plivo_provider.py
new file mode 100644
index 0000000..6fb2c6f
--- /dev/null
+++ b/api/services/telephony/providers/plivo_provider.py
@@ -0,0 +1,457 @@
+"""
+Plivo implementation of the TelephonyProvider interface.
+"""
+
+import base64
+import hashlib
+import hmac
+import json
+import os
+import random
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from urllib.parse import parse_qs, urlparse, urlunparse
+
+import aiohttp
+from fastapi import HTTPException
+from loguru import logger
+
+from api.db import db_client
+from api.enums import WorkflowRunMode
+from api.services.telephony.base import (
+ CallInitiationResult,
+ NormalizedInboundData,
+ TelephonyProvider,
+)
+from api.utils.common import get_backend_endpoints
+
+if TYPE_CHECKING:
+ from fastapi import WebSocket
+
+
+class PlivoProvider(TelephonyProvider):
+ """
+ Plivo implementation of TelephonyProvider.
+ """
+
+ PROVIDER_NAME = WorkflowRunMode.PLIVO.value
+ WEBHOOK_ENDPOINT = "plivo-xml"
+
+ def __init__(self, config: Dict[str, Any]):
+ self.auth_id = config.get("auth_id")
+ self.auth_token = config.get("auth_token")
+ self.from_numbers = config.get("from_numbers", [])
+
+ if isinstance(self.from_numbers, str):
+ self.from_numbers = [self.from_numbers]
+
+ self.base_url = f"https://api.plivo.com/v1/Account/{self.auth_id}"
+
+ async def initiate_call(
+ self,
+ to_number: str,
+ webhook_url: str,
+ workflow_run_id: Optional[int] = None,
+ from_number: Optional[str] = None,
+ **kwargs: Any,
+ ) -> CallInitiationResult:
+ if not self.validate_config():
+ raise ValueError("Plivo provider not properly configured")
+
+ endpoint = f"{self.base_url}/Call/"
+
+ if from_number is None:
+ from_number = random.choice(self.from_numbers)
+
+ data = {
+ "from": from_number.lstrip("+"),
+ "to": to_number.lstrip("+"),
+ "answer_url": webhook_url,
+ "answer_method": "POST",
+ }
+
+ if workflow_run_id:
+ backend_endpoint, _ = await get_backend_endpoints()
+ data.update(
+ {
+ "hangup_url": f"{backend_endpoint}/api/v1/telephony/plivo/hangup-callback/{workflow_run_id}",
+ "hangup_method": "POST",
+ "ring_url": f"{backend_endpoint}/api/v1/telephony/plivo/ring-callback/{workflow_run_id}",
+ "ring_method": "POST",
+ }
+ )
+
+ data.update(kwargs)
+
+ async with aiohttp.ClientSession() as session:
+ auth = aiohttp.BasicAuth(self.auth_id, self.auth_token)
+ async with session.post(endpoint, json=data, auth=auth) as response:
+ response_text = await response.text()
+ if response.status not in (200, 201, 202):
+ raise HTTPException(
+ status_code=response.status,
+ detail=f"Failed to initiate Plivo call: {response_text}",
+ )
+
+ response_data = json.loads(response_text)
+ call_id = (
+ response_data.get("request_uuid")
+ or response_data.get("call_uuid")
+ or response_data.get("call_uuids", [None])[0]
+ )
+
+ if not call_id:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Plivo response missing call identifier: {response_data}",
+ )
+
+ return CallInitiationResult(
+ call_id=call_id,
+ status=response_data.get("message", "queued"),
+ caller_number=from_number,
+ provider_metadata={"call_id": call_id},
+ raw_response=response_data,
+ )
+
+ async def get_call_status(self, call_id: str) -> Dict[str, Any]:
+ if not self.validate_config():
+ raise ValueError("Plivo provider not properly configured")
+
+ endpoint = f"{self.base_url}/Call/{call_id}/"
+
+ async with aiohttp.ClientSession() as session:
+ auth = aiohttp.BasicAuth(self.auth_id, self.auth_token)
+ async with session.get(endpoint, auth=auth) as response:
+ if response.status != 200:
+ error_data = await response.text()
+ raise Exception(f"Failed to get call status: {error_data}")
+
+ return await response.json()
+
+ async def get_available_phone_numbers(self) -> List[str]:
+ return self.from_numbers
+
+ def validate_config(self) -> bool:
+ return bool(self.auth_id and self.auth_token and self.from_numbers)
+
+ @staticmethod
+ def _stringify_signature_value(value: Any) -> Any:
+ if isinstance(value, bytes):
+ return "".join(chr(x) for x in bytearray(value))
+ if isinstance(value, (int, float, bool)):
+ return str(value)
+ if isinstance(value, list):
+ return [PlivoProvider._stringify_signature_value(item) for item in value]
+ return value
+
+ @staticmethod
+ def _query_map(query: str) -> Dict[str, Any]:
+ return {
+ PlivoProvider._stringify_signature_value(key): PlivoProvider._stringify_signature_value(value)
+ for key, value in parse_qs(query, keep_blank_values=True).items()
+ }
+
+ @staticmethod
+ def _sorted_query_string(params: Dict[str, Any]) -> str:
+ parts: list[str] = []
+ for key in sorted(params.keys()):
+ value = params[key]
+ if isinstance(value, list):
+ normalized_values = sorted(PlivoProvider._stringify_signature_value(value))
+ parts.append("&".join(f"{key}={item}" for item in normalized_values))
+ else:
+ parts.append(f"{key}={PlivoProvider._stringify_signature_value(value)}")
+ return "&".join(parts)
+
+ @staticmethod
+ def _sorted_params_string(params: Dict[str, Any]) -> str:
+ parts: list[str] = []
+ for key in sorted(params.keys()):
+ value = params[key]
+ if isinstance(value, list):
+ normalized_values = sorted(PlivoProvider._stringify_signature_value(value))
+ parts.append("".join(f"{key}{item}" for item in normalized_values))
+ elif isinstance(value, dict):
+ parts.append(f"{key}{PlivoProvider._sorted_params_string(value)}")
+ else:
+ parts.append(f"{key}{PlivoProvider._stringify_signature_value(value)}")
+ return "".join(parts)
+
+ @staticmethod
+ def _construct_get_url(uri: str, params: Dict[str, Any], empty_post_params: bool = True) -> str:
+ parsed_uri = urlparse(uri)
+ base_url = urlunparse((parsed_uri.scheme, parsed_uri.netloc, parsed_uri.path, "", "", ""))
+
+ combined_params = dict(params)
+ combined_params.update(PlivoProvider._query_map(parsed_uri.query))
+ query_params = PlivoProvider._sorted_query_string(combined_params)
+
+ if query_params or not empty_post_params:
+ base_url = f"{base_url}?{query_params}"
+ if query_params and not empty_post_params:
+ base_url = f"{base_url}."
+ return base_url
+
+ @staticmethod
+ def _construct_post_url(uri: str, params: Dict[str, Any]) -> str:
+ base_url = PlivoProvider._construct_get_url(
+ uri,
+ {},
+ empty_post_params=(len(params) == 0),
+ )
+ return f"{base_url}{PlivoProvider._sorted_params_string(params)}"
+
+ async def verify_webhook_signature(
+ self,
+ url: str,
+ params: Dict[str, Any],
+ signature: str,
+ nonce: str = "",
+ ) -> bool:
+ if not self.auth_token or not signature or not nonce:
+ return False
+
+ payload = f"{self._construct_post_url(url, params)}.{nonce}"
+ computed = base64.b64encode(
+ hmac.new(
+ self.auth_token.encode("utf-8"),
+ payload.encode("utf-8"),
+ hashlib.sha256,
+ ).digest()
+ ).decode("utf-8")
+
+ candidates = [candidate.strip() for candidate in signature.split(",") if candidate]
+ return any(hmac.compare_digest(computed, candidate) for candidate in candidates)
+
+ async def get_webhook_response(
+ self, workflow_id: int, user_id: int, workflow_run_id: int
+ ) -> str:
+ _, wss_backend_endpoint = await get_backend_endpoints()
+
+ return f"""
+
+ {wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}
+"""
+
+ async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
+ endpoint = f"{self.base_url}/Call/{call_id}/"
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ auth = aiohttp.BasicAuth(self.auth_id, self.auth_token)
+ async with session.get(endpoint, auth=auth) as response:
+ if response.status != 200:
+ error_data = await response.text()
+ logger.error(f"Failed to get Plivo call cost: {error_data}")
+ return {
+ "cost_usd": 0.0,
+ "duration": 0,
+ "status": "error",
+ "error": str(error_data),
+ }
+
+ call_data = await response.json()
+ total_amount = float(call_data.get("total_amount", 0) or 0)
+ duration = int(call_data.get("duration", 0) or 0)
+
+ return {
+ "cost_usd": total_amount,
+ "duration": duration,
+ "status": call_data.get("call_status", "unknown"),
+ "price_unit": "USD",
+ "raw_response": call_data,
+ }
+ except Exception as e:
+ logger.error(f"Exception fetching Plivo call cost: {e}")
+ return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
+
+ def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ status_map = {
+ "in-progress": "answered",
+ "ringing": "ringing",
+ "ring": "ringing",
+ "completed": "completed",
+ "hangup": "completed",
+ "stopstream": "completed",
+ "busy": "busy",
+ "no-answer": "no-answer",
+ "cancel": "canceled",
+ "cancelled": "canceled",
+ "timeout": "no-answer",
+ }
+
+ call_status = (data.get("CallStatus") or data.get("Event") or "").lower()
+ return {
+ "call_id": data.get("CallUUID", "") or data.get("RequestUUID", ""),
+ "status": status_map.get(call_status, call_status),
+ "from_number": data.get("From"),
+ "to_number": data.get("To"),
+ "direction": data.get("Direction"),
+ "duration": data.get("Duration"),
+ "extra": data,
+ }
+
+ async def handle_websocket(
+ self,
+ websocket: "WebSocket",
+ workflow_id: int,
+ user_id: int,
+ workflow_run_id: int,
+ ) -> None:
+ from api.services.pipecat.run_pipeline import run_pipeline_plivo
+
+ first_msg = await websocket.receive_text()
+ start_msg = json.loads(first_msg)
+
+ if start_msg.get("event") != "start":
+ logger.error(f"Expected 'start' event, got: {start_msg.get('event')}")
+ await websocket.close(code=4400, reason="Expected start event")
+ return
+
+ start_data = start_msg.get("start", {})
+ stream_id = start_data.get("streamId") or start_msg.get("streamId")
+
+ if not stream_id:
+ logger.error(f"Missing streamId in start event: {start_msg}")
+ await websocket.close(code=4400, reason="Missing streamId")
+ return
+
+ workflow_run = await db_client.get_workflow_run(workflow_run_id)
+ call_id = None
+ if workflow_run and workflow_run.gathered_context:
+ call_id = workflow_run.gathered_context.get("call_id")
+
+ if not call_id:
+ call_id = start_data.get("callId") or start_data.get("callUUID")
+
+ if not call_id:
+ logger.error(f"Missing call ID for Plivo workflow run {workflow_run_id}")
+ await websocket.close(code=4400, reason="Missing call ID")
+ return
+
+ await run_pipeline_plivo(
+ websocket, stream_id, call_id, workflow_id, workflow_run_id, user_id
+ )
+
+ @classmethod
+ def can_handle_webhook(
+ cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
+ ) -> bool:
+ has_plivo_signature = (
+ "x-plivo-signature-v3" in headers
+ or "x-plivo-signature-ma-v3" in headers
+ )
+ return has_plivo_signature and "CallUUID" in webhook_data
+
+ @staticmethod
+ def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
+ return NormalizedInboundData(
+ provider=PlivoProvider.PROVIDER_NAME,
+ call_id=webhook_data.get("CallUUID", "") or webhook_data.get("RequestUUID", ""),
+ from_number=PlivoProvider.normalize_phone_number(webhook_data.get("From", "")),
+ to_number=PlivoProvider.normalize_phone_number(webhook_data.get("To", "")),
+ direction=webhook_data.get("Direction", ""),
+ call_status=webhook_data.get("CallStatus", ""),
+ account_id=webhook_data.get("AuthID") or webhook_data.get("ParentAuthID"),
+ raw_data=webhook_data,
+ )
+
+ @staticmethod
+ def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
+ if webhook_account_id:
+ return config_data.get("auth_id") == webhook_account_id
+ # AuthID is not always present in Plivo webhooks (undocumented field).
+ # Fall back to checking that the org has a Plivo config at all.
+ logger.warning(
+ "Plivo webhook missing AuthID/ParentAuthID - "
+ "falling back to config existence check"
+ )
+ return bool(config_data.get("auth_id"))
+
+ @staticmethod
+ def normalize_phone_number(phone_number: str) -> str:
+ if not phone_number:
+ return ""
+
+ clean_number = phone_number.lstrip("+")
+ if clean_number.startswith("1") and len(clean_number) == 11:
+ return f"+{clean_number}"
+ if len(clean_number) == 10:
+ return f"+1{clean_number}"
+ if len(clean_number) > 10:
+ return f"+{clean_number}"
+
+ return phone_number
+
+ async def verify_inbound_signature(
+ self,
+ url: str,
+ webhook_data: Dict[str, Any],
+ signature: str,
+ nonce: str = "",
+ ) -> bool:
+ if os.getenv("ENVIRONMENT") == "local":
+ logger.warning(
+ "Skipping Plivo inbound signature verification in local environment"
+ )
+ return True
+ return await self.verify_webhook_signature(url, webhook_data, signature, nonce)
+
+ @staticmethod
+ async def generate_inbound_response(
+ websocket_url: str, workflow_run_id: int = None
+ ) -> tuple:
+ from fastapi import Response
+
+ hangup_callback_attr = ""
+ if workflow_run_id:
+ backend_endpoint, _ = await get_backend_endpoints()
+ hangup_url = f"{backend_endpoint}/api/v1/telephony/plivo/hangup-callback/{workflow_run_id}"
+ hangup_callback_attr = f' statusCallbackUrl="{hangup_url}" statusCallbackMethod="POST"'
+
+ plivo_xml = f"""
+
+ {websocket_url}
+"""
+ return Response(content=plivo_xml, media_type="application/xml")
+
+ @staticmethod
+ def generate_error_response(error_type: str, message: str) -> tuple:
+ from fastapi import Response
+
+ plivo_xml = f"""
+
+ Sorry, there was an error processing your call. {message}
+
+"""
+ return Response(content=plivo_xml, media_type="application/xml")
+
+ @staticmethod
+ def generate_validation_error_response(error_type) -> tuple:
+ from fastapi import Response
+
+ from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
+
+ message = TELEPHONY_ERROR_MESSAGES.get(
+ error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
+ )
+
+ plivo_xml = f"""
+
+ {message}
+
+"""
+ return Response(content=plivo_xml, media_type="application/xml")
+
+ async def transfer_call(
+ self,
+ destination: str,
+ transfer_id: str,
+ conference_name: str,
+ timeout: int = 30,
+ **kwargs: Any,
+ ) -> Dict[str, Any]:
+ raise NotImplementedError("Plivo provider does not support call transfers")
+
+ def supports_transfers(self) -> bool:
+ return False
diff --git a/ui/src/app/telephony-configurations/page.tsx b/ui/src/app/telephony-configurations/page.tsx
index d0cd71e..a6d08a1 100644
--- a/ui/src/app/telephony-configurations/page.tsx
+++ b/ui/src/app/telephony-configurations/page.tsx
@@ -12,6 +12,8 @@ import type {
AriConfigurationResponse,
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
+ PlivoConfigurationRequest,
+ PlivoConfigurationResponse,
TelephonyConfigurationResponse,
TelnyxConfigurationRequest,
TelnyxConfigurationResponse,
@@ -49,6 +51,9 @@ interface TelephonyConfigForm {
private_key?: string;
api_key?: string;
api_secret?: string;
+ // Plivo fields
+ plivo_auth_id?: string;
+ plivo_auth_token?: string;
// Vobiz fields
auth_id?: string;
vobiz_auth_token?: string;
@@ -146,6 +151,13 @@ export default function ConfigureTelephonyPage() {
setValue("auth_id", response.data.vobiz.auth_id);
setValue("vobiz_auth_token", response.data.vobiz.auth_token);
setValue("from_numbers", response.data.vobiz.from_numbers?.length > 0 ? response.data.vobiz.from_numbers : [""]);
+ } else if ((response.data as TelephonyConfigurationResponse)?.plivo) {
+ const plivoConfig = (response.data as TelephonyConfigurationResponse).plivo as PlivoConfigurationResponse;
+ setHasExistingConfig(true);
+ setValue("provider", "plivo");
+ setValue("plivo_auth_id", plivoConfig.auth_id);
+ setValue("plivo_auth_token", plivoConfig.auth_token);
+ setValue("from_numbers", plivoConfig.from_numbers?.length > 0 ? plivoConfig.from_numbers : [""]);
} else if ((response.data as TelephonyConfigurationResponse)?.cloudonix) {
const cloudonixConfig = (response.data as TelephonyConfigurationResponse).cloudonix as CloudonixConfigurationResponse;
setHasExistingConfig(true);
@@ -192,6 +204,7 @@ export default function ConfigureTelephonyPage() {
// Build the request body based on provider
let requestBody:
| TwilioConfigurationRequest
+ | PlivoConfigurationRequest
| VonageConfigurationRequest
| VobizConfigurationRequest
| CloudonixConfigurationRequest
@@ -214,7 +227,7 @@ export default function ConfigureTelephonyPage() {
let pattern: RegExp;
let formatMessage: string;
- if (data.provider === "twilio" || data.provider === "telnyx") {
+ if (data.provider === "twilio" || data.provider === "telnyx" || data.provider === "plivo") {
pattern = twilioPattern;
formatMessage = "with + prefix (e.g., +1234567890)";
} else if (data.provider === "cloudonix") {
@@ -259,6 +272,13 @@ export default function ConfigureTelephonyPage() {
auth_id: data.auth_id,
auth_token: data.vobiz_auth_token,
} as VobizConfigurationRequest;
+ } else if (data.provider === "plivo") {
+ requestBody = {
+ provider: data.provider,
+ from_numbers: filteredNumbers,
+ auth_id: data.plivo_auth_id!,
+ auth_token: data.plivo_auth_token!,
+ } as PlivoConfigurationRequest;
} else if (data.provider === "telnyx") {
requestBody = {
provider: data.provider,
@@ -335,6 +355,8 @@ export default function ConfigureTelephonyPage() {
? "Vonage"
: selectedProvider === "vobiz"
? "Vobiz"
+ : selectedProvider === "plivo"
+ ? "Plivo"
: selectedProvider === "telnyx"
? "Telnyx"
: selectedProvider === "ari"
@@ -385,6 +407,12 @@ export default function ConfigureTelephonyPage() {
{" "}
for developer documentation and API reference.
>
+ ) : selectedProvider === "plivo" ? (
+ <>
+ Plivo is a cloud communications platform providing voice and messaging
+ APIs. Use Plivo to build voice applications with real-time audio streaming
+ and global telephony coverage.
+ >
) : selectedProvider === "vobiz" ? (
<>
Vobiz is a telephony provider. Visit their documentation
@@ -438,6 +466,25 @@ export default function ConfigureTelephonyPage() {
+ ) : selectedProvider === "plivo" ? (
+
+
+
Getting Started with Plivo:
+
+ - Sign up at plivo.com and go to the Console Dashboard
+ - Find your Auth ID and Auth Token on the Dashboard overview page
+ - Purchase a phone number under Phone Numbers > Buy Numbers
+ - Create an XML Application under Voice > XML Applications
+ - Enter your Auth ID, Auth Token, and phone numbers below
+
+
+
+
+ Note: Plivo uses XML-based call control with bidirectional
+ audio streaming. Phone numbers should be in E.164 format with + prefix (e.g., +1234567890).
+
+
+
) : selectedProvider === "twilio" || selectedProvider === "vonage" ? (