exotel support fix

This commit is contained in:
Hridayesh Gupta 2026-05-13 00:04:08 +05:30
parent f79b948f03
commit 4a4b8fb49f
3 changed files with 188 additions and 19 deletions

View file

@ -82,23 +82,32 @@ async def exotel_stream(websocket: WebSocket):
return
# Exotel may nest stream metadata under 'start' or at top level.
# Keys are snake_case (stream_sid, call_sid) — Twilio-compatible format.
start_data = start_msg.get("start") or start_msg
call_sid = (
start_data.get("callSid")
start_data.get("call_sid")
or start_data.get("callSid")
or start_data.get("CallSid")
or start_data.get("call_sid")
or start_msg.get("call_sid")
or start_msg.get("callSid")
or start_msg.get("CallSid")
)
stream_id = (
start_data.get("streamId")
or start_data.get("StreamId")
or start_msg.get("streamId")
stream_sid = (
start_data.get("stream_sid")
or start_data.get("streamSid")
or start_data.get("streamId")
or start_msg.get("stream_sid")
or start_msg.get("streamSid")
or ""
)
account_sid = (
start_data.get("account_sid")
or start_data.get("accountSid")
or start_msg.get("account_sid")
or ""
)
logger.info(
f"[Exotel stream] callSid={call_sid!r} streamId={stream_id!r}"
f"[Exotel stream] callSid={call_sid!r} streamSid={stream_sid!r} accountSid={account_sid!r}"
)
if not call_sid:
@ -151,7 +160,11 @@ async def exotel_stream(websocket: WebSocket):
workflow_run_id=workflow_run_id,
user_id=user_id,
call_id=call_sid,
transport_kwargs={"stream_id": stream_id, "call_id": call_sid},
transport_kwargs={
"stream_id": stream_sid,
"call_id": call_sid,
"account_sid": account_sid,
},
)

View file

@ -1,11 +1,166 @@
"""Exotel frame serializer.
Exotel streams audio over WebSocket using the same JSON envelope and μ-law
8 kHz encoding as Plivo. We re-export PlivoFrameSerializer directly so
transport.py can import from `.serializers` and we have an obvious place to
drop a custom subclass later if Exotel diverges.
Exotel's WebSocket streaming protocol is Twilio Media Streams-compatible:
- Incoming audio: {"event":"media","stream_sid":"...","media":{"payload":"<base64-mulaw>"}}
- Outgoing audio: {"event":"media","streamSid":"...","media":{"payload":"<base64-mulaw>"}}
- Clear/interrupt: {"event":"clear","streamSid":"..."}
We subclass TwilioFrameSerializer to inherit the correct encode/decode logic
and override _hang_up_call to hit Exotel's REST API instead of Twilio's.
"""
from pipecat.serializers.plivo import PlivoFrameSerializer as ExotelFrameSerializer
import json
import aiohttp
from loguru import logger
from pipecat.audio.utils import create_stream_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
CancelFrame,
EndFrame,
Frame,
InputAudioRawFrame,
InterruptionFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer
import base64
class ExotelFrameSerializer(FrameSerializer):
"""Serializer for Exotel's Twilio-compatible WebSocket streaming protocol.
Exotel sends/receives audio using the same JSON envelope as Twilio Media Streams:
Incoming: {"event":"media","media":{"payload":"<base64-mulaw>"},"stream_sid":"..."}
Outgoing: {"event":"media","streamSid":"...","media":{"payload":"<base64-mulaw>"}}
Clear: {"event":"clear","streamSid":"..."}
Auto hang-up hits Exotel's REST API (DELETE /v1/Accounts/{sid}/Calls/{call_sid}/).
"""
class InputParams(FrameSerializer.InputParams):
sample_rate_hz: int = 8000
sample_rate: int | None = None
auto_hang_up: bool = False # Exotel hangs up itself; safe to disable
def __init__(
self,
stream_id: str, # Exotel stream_sid
call_id: str | None = None,
auth_id: str | None = None, # Exotel api_key
auth_token: str | None = None, # Exotel api_token
account_sid: str | None = None, # Exotel account_sid for REST hangup
subdomain: str = "api.exotel.com",
params: "ExotelFrameSerializer.InputParams | None" = None,
):
params = params or ExotelFrameSerializer.InputParams()
super().__init__(params)
self._params: ExotelFrameSerializer.InputParams = params
self._stream_sid = stream_id
self._call_sid = call_id
self._api_key = auth_id
self._api_token = auth_token
self._account_sid = account_sid
self._subdomain = subdomain
self._exotel_sample_rate = params.sample_rate_hz
self._sample_rate = 0 # Set in setup()
self._input_resampler = create_stream_resampler()
self._output_resampler = create_stream_resampler()
self._hangup_attempted = False
async def setup(self, frame: StartFrame):
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate
async def serialize(self, frame: Frame) -> str | bytes | None:
if isinstance(frame, (EndFrame, CancelFrame)):
if self._params.auto_hang_up and not self._hangup_attempted:
self._hangup_attempted = True
await self._hang_up_call()
return None
elif isinstance(frame, InterruptionFrame):
return json.dumps({"event": "clear", "streamSid": self._stream_sid})
elif isinstance(frame, AudioRawFrame):
serialized_data = await pcm_to_ulaw(
frame.audio,
frame.sample_rate,
self._exotel_sample_rate,
self._output_resampler,
)
if not serialized_data:
return None
payload = base64.b64encode(serialized_data).decode("utf-8")
return json.dumps({
"event": "media",
"streamSid": self._stream_sid,
"media": {"payload": payload},
})
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
if self.should_ignore_frame(frame):
return None
return json.dumps(frame.message)
return None
async def deserialize(self, data: str | bytes) -> Frame | None:
try:
message = json.loads(data)
except json.JSONDecodeError:
logger.warning(f"[Exotel] Failed to parse WebSocket message: {data[:200]}")
return None
event = message.get("event", "")
if event == "media":
media = message.get("media", {})
payload_b64 = media.get("payload")
if not payload_b64:
return None
payload = base64.b64decode(payload_b64)
deserialized = await ulaw_to_pcm(
payload,
self._exotel_sample_rate,
self._sample_rate,
self._input_resampler,
)
if not deserialized:
return None
return InputAudioRawFrame(
audio=deserialized, num_channels=1, sample_rate=self._sample_rate
)
# Ignore start/stop/connected/mark events silently
return None
async def _hang_up_call(self):
"""Hang up via Exotel REST API."""
if not (self._api_key and self._api_token and self._account_sid and self._call_sid):
logger.warning("[Exotel] Cannot hang up — missing credentials or call_sid")
return
try:
endpoint = (
f"https://{self._api_key}:{self._api_token}@{self._subdomain}"
f"/v1/Accounts/{self._account_sid}/Calls/{self._call_sid}/"
)
async with aiohttp.ClientSession() as session:
async with session.delete(endpoint) as resp:
if resp.status in (200, 204):
logger.debug(f"[Exotel] Hung up call {self._call_sid}")
else:
body = await resp.text()
logger.warning(
f"[Exotel] Hangup returned {resp.status}: {body[:200]}"
)
except Exception as e:
logger.error(f"[Exotel] Hangup failed: {e}")
__all__ = ["ExotelFrameSerializer"]

View file

@ -1,6 +1,7 @@
"""Exotel transport factory."""
from fastapi import WebSocket
from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
@ -23,6 +24,7 @@ async def create_transport(
telephony_configuration_id: int | None = None,
stream_id: str,
call_id: str,
account_sid: str = "",
):
"""Create a WebSocket transport for an Exotel call leg."""
config = await load_credentials_for_transport(
@ -31,23 +33,22 @@ async def create_transport(
api_key = config.get("api_key")
api_token = config.get("api_token")
cfg_account_sid = account_sid or config.get("account_sid", "")
if not api_key or not api_token:
raise ValueError(
f"Incomplete Exotel configuration for organization {organization_id}"
)
# ExotelFrameSerializer is PlivoFrameSerializer under the hood —
# same μ-law 8 kHz JSON envelope. The auth_id/auth_token params are used
# by Plivo's serializer for optional mid-call REST calls; Exotel doesn't
# need them but we pass api_key/api_token for future extensibility.
serializer = ExotelFrameSerializer(
stream_id=stream_id,
call_id=call_id,
auth_id=api_key,
auth_token=api_token,
account_sid=cfg_account_sid,
subdomain=config.get("subdomain", "api.exotel.com"),
params=ExotelFrameSerializer.InputParams(
plivo_sample_rate=8000,
sample_rate_hz=8000,
sample_rate=audio_config.pipeline_sample_rate,
),
)