diff --git a/api/services/telephony/providers/exotel/routes.py b/api/services/telephony/providers/exotel/routes.py index 5e63ca78..4d062790 100644 --- a/api/services/telephony/providers/exotel/routes.py +++ b/api/services/telephony/providers/exotel/routes.py @@ -82,23 +82,32 @@ async def exotel_stream(websocket: WebSocket): return # Exotel may nest stream metadata under 'start' or at top level. + # Keys are snake_case (stream_sid, call_sid) — Twilio-compatible format. start_data = start_msg.get("start") or start_msg call_sid = ( - start_data.get("callSid") + start_data.get("call_sid") + or start_data.get("callSid") or start_data.get("CallSid") - or start_data.get("call_sid") + or start_msg.get("call_sid") or start_msg.get("callSid") - or start_msg.get("CallSid") ) - stream_id = ( - start_data.get("streamId") - or start_data.get("StreamId") - or start_msg.get("streamId") + stream_sid = ( + start_data.get("stream_sid") + or start_data.get("streamSid") + or start_data.get("streamId") + or start_msg.get("stream_sid") + or start_msg.get("streamSid") + or "" + ) + account_sid = ( + start_data.get("account_sid") + or start_data.get("accountSid") + or start_msg.get("account_sid") or "" ) logger.info( - f"[Exotel stream] callSid={call_sid!r} streamId={stream_id!r}" + f"[Exotel stream] callSid={call_sid!r} streamSid={stream_sid!r} accountSid={account_sid!r}" ) if not call_sid: @@ -151,7 +160,11 @@ async def exotel_stream(websocket: WebSocket): workflow_run_id=workflow_run_id, user_id=user_id, call_id=call_sid, - transport_kwargs={"stream_id": stream_id, "call_id": call_sid}, + transport_kwargs={ + "stream_id": stream_sid, + "call_id": call_sid, + "account_sid": account_sid, + }, ) diff --git a/api/services/telephony/providers/exotel/serializers.py b/api/services/telephony/providers/exotel/serializers.py index 4e0f0f25..1f753435 100644 --- a/api/services/telephony/providers/exotel/serializers.py +++ b/api/services/telephony/providers/exotel/serializers.py @@ -1,11 +1,166 @@ """Exotel frame serializer. -Exotel streams audio over WebSocket using the same JSON envelope and μ-law -8 kHz encoding as Plivo. We re-export PlivoFrameSerializer directly so -transport.py can import from `.serializers` and we have an obvious place to -drop a custom subclass later if Exotel diverges. +Exotel's WebSocket streaming protocol is Twilio Media Streams-compatible: + - Incoming audio: {"event":"media","stream_sid":"...","media":{"payload":""}} + - Outgoing audio: {"event":"media","streamSid":"...","media":{"payload":""}} + - Clear/interrupt: {"event":"clear","streamSid":"..."} + +We subclass TwilioFrameSerializer to inherit the correct encode/decode logic +and override _hang_up_call to hit Exotel's REST API instead of Twilio's. """ -from pipecat.serializers.plivo import PlivoFrameSerializer as ExotelFrameSerializer +import json + +import aiohttp +from loguru import logger +from pipecat.audio.utils import create_stream_resampler, pcm_to_ulaw, ulaw_to_pcm +from pipecat.frames.frames import ( + AudioRawFrame, + CancelFrame, + EndFrame, + Frame, + InputAudioRawFrame, + InterruptionFrame, + OutputTransportMessageFrame, + OutputTransportMessageUrgentFrame, + StartFrame, +) +from pipecat.serializers.base_serializer import FrameSerializer + +import base64 + + +class ExotelFrameSerializer(FrameSerializer): + """Serializer for Exotel's Twilio-compatible WebSocket streaming protocol. + + Exotel sends/receives audio using the same JSON envelope as Twilio Media Streams: + Incoming: {"event":"media","media":{"payload":""},"stream_sid":"..."} + Outgoing: {"event":"media","streamSid":"...","media":{"payload":""}} + Clear: {"event":"clear","streamSid":"..."} + + Auto hang-up hits Exotel's REST API (DELETE /v1/Accounts/{sid}/Calls/{call_sid}/). + """ + + class InputParams(FrameSerializer.InputParams): + sample_rate_hz: int = 8000 + sample_rate: int | None = None + auto_hang_up: bool = False # Exotel hangs up itself; safe to disable + + def __init__( + self, + stream_id: str, # Exotel stream_sid + call_id: str | None = None, + auth_id: str | None = None, # Exotel api_key + auth_token: str | None = None, # Exotel api_token + account_sid: str | None = None, # Exotel account_sid for REST hangup + subdomain: str = "api.exotel.com", + params: "ExotelFrameSerializer.InputParams | None" = None, + ): + params = params or ExotelFrameSerializer.InputParams() + super().__init__(params) + self._params: ExotelFrameSerializer.InputParams = params + + self._stream_sid = stream_id + self._call_sid = call_id + self._api_key = auth_id + self._api_token = auth_token + self._account_sid = account_sid + self._subdomain = subdomain + self._exotel_sample_rate = params.sample_rate_hz + self._sample_rate = 0 # Set in setup() + + self._input_resampler = create_stream_resampler() + self._output_resampler = create_stream_resampler() + self._hangup_attempted = False + + async def setup(self, frame: StartFrame): + self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate + + async def serialize(self, frame: Frame) -> str | bytes | None: + if isinstance(frame, (EndFrame, CancelFrame)): + if self._params.auto_hang_up and not self._hangup_attempted: + self._hangup_attempted = True + await self._hang_up_call() + return None + + elif isinstance(frame, InterruptionFrame): + return json.dumps({"event": "clear", "streamSid": self._stream_sid}) + + elif isinstance(frame, AudioRawFrame): + serialized_data = await pcm_to_ulaw( + frame.audio, + frame.sample_rate, + self._exotel_sample_rate, + self._output_resampler, + ) + if not serialized_data: + return None + payload = base64.b64encode(serialized_data).decode("utf-8") + return json.dumps({ + "event": "media", + "streamSid": self._stream_sid, + "media": {"payload": payload}, + }) + + elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)): + if self.should_ignore_frame(frame): + return None + return json.dumps(frame.message) + + return None + + async def deserialize(self, data: str | bytes) -> Frame | None: + try: + message = json.loads(data) + except json.JSONDecodeError: + logger.warning(f"[Exotel] Failed to parse WebSocket message: {data[:200]}") + return None + + event = message.get("event", "") + + if event == "media": + media = message.get("media", {}) + payload_b64 = media.get("payload") + if not payload_b64: + return None + + payload = base64.b64decode(payload_b64) + deserialized = await ulaw_to_pcm( + payload, + self._exotel_sample_rate, + self._sample_rate, + self._input_resampler, + ) + if not deserialized: + return None + return InputAudioRawFrame( + audio=deserialized, num_channels=1, sample_rate=self._sample_rate + ) + + # Ignore start/stop/connected/mark events silently + return None + + async def _hang_up_call(self): + """Hang up via Exotel REST API.""" + if not (self._api_key and self._api_token and self._account_sid and self._call_sid): + logger.warning("[Exotel] Cannot hang up — missing credentials or call_sid") + return + try: + endpoint = ( + f"https://{self._api_key}:{self._api_token}@{self._subdomain}" + f"/v1/Accounts/{self._account_sid}/Calls/{self._call_sid}/" + ) + async with aiohttp.ClientSession() as session: + async with session.delete(endpoint) as resp: + if resp.status in (200, 204): + logger.debug(f"[Exotel] Hung up call {self._call_sid}") + else: + body = await resp.text() + logger.warning( + f"[Exotel] Hangup returned {resp.status}: {body[:200]}" + ) + except Exception as e: + logger.error(f"[Exotel] Hangup failed: {e}") + __all__ = ["ExotelFrameSerializer"] diff --git a/api/services/telephony/providers/exotel/transport.py b/api/services/telephony/providers/exotel/transport.py index 2a7120ca..5e68c963 100644 --- a/api/services/telephony/providers/exotel/transport.py +++ b/api/services/telephony/providers/exotel/transport.py @@ -1,6 +1,7 @@ """Exotel transport factory.""" from fastapi import WebSocket + from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketParams, FastAPIWebsocketTransport, @@ -23,6 +24,7 @@ async def create_transport( telephony_configuration_id: int | None = None, stream_id: str, call_id: str, + account_sid: str = "", ): """Create a WebSocket transport for an Exotel call leg.""" config = await load_credentials_for_transport( @@ -31,23 +33,22 @@ async def create_transport( api_key = config.get("api_key") api_token = config.get("api_token") + cfg_account_sid = account_sid or config.get("account_sid", "") if not api_key or not api_token: raise ValueError( f"Incomplete Exotel configuration for organization {organization_id}" ) - # ExotelFrameSerializer is PlivoFrameSerializer under the hood — - # same μ-law 8 kHz JSON envelope. The auth_id/auth_token params are used - # by Plivo's serializer for optional mid-call REST calls; Exotel doesn't - # need them but we pass api_key/api_token for future extensibility. serializer = ExotelFrameSerializer( stream_id=stream_id, call_id=call_id, auth_id=api_key, auth_token=api_token, + account_sid=cfg_account_sid, + subdomain=config.get("subdomain", "api.exotel.com"), params=ExotelFrameSerializer.InputParams( - plivo_sample_rate=8000, + sample_rate_hz=8000, sample_rate=audio_config.pipeline_sample_rate, ), )