feat: add Plivo telephony provider support (#245)

* Add Plivo telephony provider support

* add Plivo telephony UI, fix audio config, and improve inbound call handling

---------

Co-authored-by: Dilip Tiwari <digitalapache20@gmail.com>
Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
Co-authored-by: Abhishek <abhishek@a6k.me>
This commit is contained in:
dilipevents2007-cpu 2026-04-25 20:41:46 +05:30 committed by GitHub
parent 3e3773f400
commit 2218ba8ad9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1123 additions and 13 deletions

View file

@ -87,19 +87,20 @@ def create_audio_config(transport_type: str) -> AudioConfig:
"""Create audio configuration based on transport type.
Args:
transport_type: Type of transport ("webrtc", "twilio", "vonage", "vobiz", "cloudonix")
transport_type: Type of transport ("webrtc", "twilio", "plivo", "vonage", "vobiz", "cloudonix")
Returns:
AudioConfig instance with appropriate settings
"""
if transport_type in (
WorkflowRunMode.TWILIO.value,
WorkflowRunMode.PLIVO.value,
WorkflowRunMode.VOBIZ.value,
WorkflowRunMode.CLOUDONIX.value,
WorkflowRunMode.ARI.value,
WorkflowRunMode.TELNYX.value,
):
# Twilio, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz
# Twilio, Plivo, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz
return AudioConfig(
transport_in_sample_rate=8000,
transport_out_sample_rate=8000,

View file

@ -47,6 +47,7 @@ from api.services.pipecat.tracing_config import (
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
create_plivo_transport,
create_telnyx_transport,
create_twilio_transport,
create_vobiz_transport,
@ -151,6 +152,69 @@ async def run_pipeline_twilio(
)
async def run_pipeline_plivo(
websocket_client: WebSocket,
stream_id: str,
call_id: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Plivo WebSocket connections."""
logger.info(
f"[run {workflow_run_id}] Starting Plivo pipeline - "
f"stream_id={stream_id}, call_id={call_id}, workflow_id={workflow_id}"
)
set_current_run_id(workflow_run_id)
cost_info = {"call_id": call_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
workflow = await db_client.get_workflow(workflow_id, user_id)
if workflow:
set_current_org_id(workflow.organization_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
audio_config = create_audio_config(WorkflowRunMode.PLIVO.value)
transport = await create_plivo_transport(
websocket_client,
stream_id,
call_id,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
logger.info(f"[run {workflow_run_id}] Plivo pipeline completed successfully")
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Error in Plivo pipeline: {e}", exc_info=True
)
raise
async def run_pipeline_vonage(
websocket_client,
call_uuid: str,

View file

@ -19,6 +19,7 @@ from api.services.telephony.providers.twilio_call_strategies import (
TwilioConferenceStrategy,
TwilioHangupStrategy,
)
from pipecat.serializers.plivo import PlivoFrameSerializer
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
@ -141,6 +142,60 @@ async def create_twilio_transport(
)
async def create_plivo_transport(
websocket_client: WebSocket,
stream_id: str,
call_id: str,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
vad_config: dict | None = None,
ambient_noise_config: dict | None = None,
):
"""Create a transport for Plivo connections."""
from api.services.telephony.factory import load_telephony_config
config = await load_telephony_config(organization_id)
if config.get("provider") != "plivo":
raise ValueError(f"Expected Plivo provider, got {config.get('provider')}")
auth_id = config.get("auth_id")
auth_token = config.get("auth_token")
if not auth_id or not auth_token:
raise ValueError(
f"Incomplete Plivo configuration for organization {organization_id}"
)
serializer = PlivoFrameSerializer(
stream_id=stream_id,
call_id=call_id,
auth_id=auth_id,
auth_token=auth_token,
params=PlivoFrameSerializer.InputParams(
plivo_sample_rate=8000,
sample_rate=audio_config.pipeline_sample_rate,
),
)
mixer = await _build_audio_out_mixer(
audio_config.transport_out_sample_rate, ambient_noise_config
)
return FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=mixer,
serializer=serializer,
),
)
async def create_cloudonix_transport(
websocket_client: WebSocket,
call_id: str,

View file

@ -13,6 +13,7 @@ from api.enums import OrganizationConfigurationKey
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.providers.ari_provider import ARIProvider
from api.services.telephony.providers.cloudonix_provider import CloudonixProvider
from api.services.telephony.providers.plivo_provider import PlivoProvider
from api.services.telephony.providers.telnyx_provider import TelnyxProvider
from api.services.telephony.providers.twilio_provider import TwilioProvider
from api.services.telephony.providers.vobiz_provider import VobizProvider
@ -53,6 +54,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"auth_token": config.value.get("auth_token"),
"from_numbers": config.value.get("from_numbers", []),
}
elif provider == "plivo":
return {
"provider": "plivo",
"auth_id": config.value.get("auth_id"),
"auth_token": config.value.get("auth_token"),
"from_numbers": config.value.get("from_numbers", []),
}
elif provider == "vonage":
return {
"provider": "vonage",
@ -124,6 +132,9 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider:
if provider_type == "twilio":
return TwilioProvider(config)
elif provider_type == "plivo":
return PlivoProvider(config)
elif provider_type == "vonage":
return VonageProvider(config)
@ -154,6 +165,7 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
return [
ARIProvider,
CloudonixProvider,
PlivoProvider,
TelnyxProvider,
TwilioProvider,
VobizProvider,

View file

@ -0,0 +1,457 @@
"""
Plivo implementation of the TelephonyProvider interface.
"""
import base64
import hashlib
import hmac
import json
import os
import random
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import parse_qs, urlparse, urlunparse
import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
NormalizedInboundData,
TelephonyProvider,
)
from api.utils.common import get_backend_endpoints
if TYPE_CHECKING:
from fastapi import WebSocket
class PlivoProvider(TelephonyProvider):
"""
Plivo implementation of TelephonyProvider.
"""
PROVIDER_NAME = WorkflowRunMode.PLIVO.value
WEBHOOK_ENDPOINT = "plivo-xml"
def __init__(self, config: Dict[str, Any]):
self.auth_id = config.get("auth_id")
self.auth_token = config.get("auth_token")
self.from_numbers = config.get("from_numbers", [])
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
self.base_url = f"https://api.plivo.com/v1/Account/{self.auth_id}"
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
if not self.validate_config():
raise ValueError("Plivo provider not properly configured")
endpoint = f"{self.base_url}/Call/"
if from_number is None:
from_number = random.choice(self.from_numbers)
data = {
"from": from_number.lstrip("+"),
"to": to_number.lstrip("+"),
"answer_url": webhook_url,
"answer_method": "POST",
}
if workflow_run_id:
backend_endpoint, _ = await get_backend_endpoints()
data.update(
{
"hangup_url": f"{backend_endpoint}/api/v1/telephony/plivo/hangup-callback/{workflow_run_id}",
"hangup_method": "POST",
"ring_url": f"{backend_endpoint}/api/v1/telephony/plivo/ring-callback/{workflow_run_id}",
"ring_method": "POST",
}
)
data.update(kwargs)
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth(self.auth_id, self.auth_token)
async with session.post(endpoint, json=data, auth=auth) as response:
response_text = await response.text()
if response.status not in (200, 201, 202):
raise HTTPException(
status_code=response.status,
detail=f"Failed to initiate Plivo call: {response_text}",
)
response_data = json.loads(response_text)
call_id = (
response_data.get("request_uuid")
or response_data.get("call_uuid")
or response_data.get("call_uuids", [None])[0]
)
if not call_id:
raise HTTPException(
status_code=500,
detail=f"Plivo response missing call identifier: {response_data}",
)
return CallInitiationResult(
call_id=call_id,
status=response_data.get("message", "queued"),
caller_number=from_number,
provider_metadata={"call_id": call_id},
raw_response=response_data,
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
if not self.validate_config():
raise ValueError("Plivo provider not properly configured")
endpoint = f"{self.base_url}/Call/{call_id}/"
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth(self.auth_id, self.auth_token)
async with session.get(endpoint, auth=auth) as response:
if response.status != 200:
error_data = await response.text()
raise Exception(f"Failed to get call status: {error_data}")
return await response.json()
async def get_available_phone_numbers(self) -> List[str]:
return self.from_numbers
def validate_config(self) -> bool:
return bool(self.auth_id and self.auth_token and self.from_numbers)
@staticmethod
def _stringify_signature_value(value: Any) -> Any:
if isinstance(value, bytes):
return "".join(chr(x) for x in bytearray(value))
if isinstance(value, (int, float, bool)):
return str(value)
if isinstance(value, list):
return [PlivoProvider._stringify_signature_value(item) for item in value]
return value
@staticmethod
def _query_map(query: str) -> Dict[str, Any]:
return {
PlivoProvider._stringify_signature_value(key): PlivoProvider._stringify_signature_value(value)
for key, value in parse_qs(query, keep_blank_values=True).items()
}
@staticmethod
def _sorted_query_string(params: Dict[str, Any]) -> str:
parts: list[str] = []
for key in sorted(params.keys()):
value = params[key]
if isinstance(value, list):
normalized_values = sorted(PlivoProvider._stringify_signature_value(value))
parts.append("&".join(f"{key}={item}" for item in normalized_values))
else:
parts.append(f"{key}={PlivoProvider._stringify_signature_value(value)}")
return "&".join(parts)
@staticmethod
def _sorted_params_string(params: Dict[str, Any]) -> str:
parts: list[str] = []
for key in sorted(params.keys()):
value = params[key]
if isinstance(value, list):
normalized_values = sorted(PlivoProvider._stringify_signature_value(value))
parts.append("".join(f"{key}{item}" for item in normalized_values))
elif isinstance(value, dict):
parts.append(f"{key}{PlivoProvider._sorted_params_string(value)}")
else:
parts.append(f"{key}{PlivoProvider._stringify_signature_value(value)}")
return "".join(parts)
@staticmethod
def _construct_get_url(uri: str, params: Dict[str, Any], empty_post_params: bool = True) -> str:
parsed_uri = urlparse(uri)
base_url = urlunparse((parsed_uri.scheme, parsed_uri.netloc, parsed_uri.path, "", "", ""))
combined_params = dict(params)
combined_params.update(PlivoProvider._query_map(parsed_uri.query))
query_params = PlivoProvider._sorted_query_string(combined_params)
if query_params or not empty_post_params:
base_url = f"{base_url}?{query_params}"
if query_params and not empty_post_params:
base_url = f"{base_url}."
return base_url
@staticmethod
def _construct_post_url(uri: str, params: Dict[str, Any]) -> str:
base_url = PlivoProvider._construct_get_url(
uri,
{},
empty_post_params=(len(params) == 0),
)
return f"{base_url}{PlivoProvider._sorted_params_string(params)}"
async def verify_webhook_signature(
self,
url: str,
params: Dict[str, Any],
signature: str,
nonce: str = "",
) -> bool:
if not self.auth_token or not signature or not nonce:
return False
payload = f"{self._construct_post_url(url, params)}.{nonce}"
computed = base64.b64encode(
hmac.new(
self.auth_token.encode("utf-8"),
payload.encode("utf-8"),
hashlib.sha256,
).digest()
).decode("utf-8")
candidates = [candidate.strip() for candidate in signature.split(",") if candidate]
return any(hmac.compare_digest(computed, candidate) for candidate in candidates)
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
_, wss_backend_endpoint = await get_backend_endpoints()
return f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000">{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}</Stream>
</Response>"""
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
endpoint = f"{self.base_url}/Call/{call_id}/"
try:
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth(self.auth_id, self.auth_token)
async with session.get(endpoint, auth=auth) as response:
if response.status != 200:
error_data = await response.text()
logger.error(f"Failed to get Plivo call cost: {error_data}")
return {
"cost_usd": 0.0,
"duration": 0,
"status": "error",
"error": str(error_data),
}
call_data = await response.json()
total_amount = float(call_data.get("total_amount", 0) or 0)
duration = int(call_data.get("duration", 0) or 0)
return {
"cost_usd": total_amount,
"duration": duration,
"status": call_data.get("call_status", "unknown"),
"price_unit": "USD",
"raw_response": call_data,
}
except Exception as e:
logger.error(f"Exception fetching Plivo call cost: {e}")
return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
status_map = {
"in-progress": "answered",
"ringing": "ringing",
"ring": "ringing",
"completed": "completed",
"hangup": "completed",
"stopstream": "completed",
"busy": "busy",
"no-answer": "no-answer",
"cancel": "canceled",
"cancelled": "canceled",
"timeout": "no-answer",
}
call_status = (data.get("CallStatus") or data.get("Event") or "").lower()
return {
"call_id": data.get("CallUUID", "") or data.get("RequestUUID", ""),
"status": status_map.get(call_status, call_status),
"from_number": data.get("From"),
"to_number": data.get("To"),
"direction": data.get("Direction"),
"duration": data.get("Duration"),
"extra": data,
}
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
from api.services.pipecat.run_pipeline import run_pipeline_plivo
first_msg = await websocket.receive_text()
start_msg = json.loads(first_msg)
if start_msg.get("event") != "start":
logger.error(f"Expected 'start' event, got: {start_msg.get('event')}")
await websocket.close(code=4400, reason="Expected start event")
return
start_data = start_msg.get("start", {})
stream_id = start_data.get("streamId") or start_msg.get("streamId")
if not stream_id:
logger.error(f"Missing streamId in start event: {start_msg}")
await websocket.close(code=4400, reason="Missing streamId")
return
workflow_run = await db_client.get_workflow_run(workflow_run_id)
call_id = None
if workflow_run and workflow_run.gathered_context:
call_id = workflow_run.gathered_context.get("call_id")
if not call_id:
call_id = start_data.get("callId") or start_data.get("callUUID")
if not call_id:
logger.error(f"Missing call ID for Plivo workflow run {workflow_run_id}")
await websocket.close(code=4400, reason="Missing call ID")
return
await run_pipeline_plivo(
websocket, stream_id, call_id, workflow_id, workflow_run_id, user_id
)
@classmethod
def can_handle_webhook(
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
) -> bool:
has_plivo_signature = (
"x-plivo-signature-v3" in headers
or "x-plivo-signature-ma-v3" in headers
)
return has_plivo_signature and "CallUUID" in webhook_data
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
return NormalizedInboundData(
provider=PlivoProvider.PROVIDER_NAME,
call_id=webhook_data.get("CallUUID", "") or webhook_data.get("RequestUUID", ""),
from_number=PlivoProvider.normalize_phone_number(webhook_data.get("From", "")),
to_number=PlivoProvider.normalize_phone_number(webhook_data.get("To", "")),
direction=webhook_data.get("Direction", ""),
call_status=webhook_data.get("CallStatus", ""),
account_id=webhook_data.get("AuthID") or webhook_data.get("ParentAuthID"),
raw_data=webhook_data,
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
if webhook_account_id:
return config_data.get("auth_id") == webhook_account_id
# AuthID is not always present in Plivo webhooks (undocumented field).
# Fall back to checking that the org has a Plivo config at all.
logger.warning(
"Plivo webhook missing AuthID/ParentAuthID - "
"falling back to config existence check"
)
return bool(config_data.get("auth_id"))
@staticmethod
def normalize_phone_number(phone_number: str) -> str:
if not phone_number:
return ""
clean_number = phone_number.lstrip("+")
if clean_number.startswith("1") and len(clean_number) == 11:
return f"+{clean_number}"
if len(clean_number) == 10:
return f"+1{clean_number}"
if len(clean_number) > 10:
return f"+{clean_number}"
return phone_number
async def verify_inbound_signature(
self,
url: str,
webhook_data: Dict[str, Any],
signature: str,
nonce: str = "",
) -> bool:
if os.getenv("ENVIRONMENT") == "local":
logger.warning(
"Skipping Plivo inbound signature verification in local environment"
)
return True
return await self.verify_webhook_signature(url, webhook_data, signature, nonce)
@staticmethod
async def generate_inbound_response(
websocket_url: str, workflow_run_id: int = None
) -> tuple:
from fastapi import Response
hangup_callback_attr = ""
if workflow_run_id:
backend_endpoint, _ = await get_backend_endpoints()
hangup_url = f"{backend_endpoint}/api/v1/telephony/plivo/hangup-callback/{workflow_run_id}"
hangup_callback_attr = f' statusCallbackUrl="{hangup_url}" statusCallbackMethod="POST"'
plivo_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Stream bidirectional="true" keepCallAlive="true" contentType="audio/x-mulaw;rate=8000"{hangup_callback_attr}>{websocket_url}</Stream>
</Response>"""
return Response(content=plivo_xml, media_type="application/xml")
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:
from fastapi import Response
plivo_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Speak>Sorry, there was an error processing your call. {message}</Speak>
<Hangup/>
</Response>"""
return Response(content=plivo_xml, media_type="application/xml")
@staticmethod
def generate_validation_error_response(error_type) -> tuple:
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
)
plivo_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Speak>{message}</Speak>
<Hangup/>
</Response>"""
return Response(content=plivo_xml, media_type="application/xml")
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
raise NotImplementedError("Plivo provider does not support call transfers")
def supports_transfers(self) -> bool:
return False