From fba164ca00c3c9170b5a2a5d09dca27a789cd146 Mon Sep 17 00:00:00 2001 From: Hridayesh Gupta Date: Tue, 12 May 2026 13:25:18 +0530 Subject: [PATCH] adding exotel support --- api/enums.py | 1 + api/schemas/telephony_config.py | 8 + api/services/telephony/providers/__init__.py | 1 + .../telephony/providers/exotel/__init__.py | 112 +++++ .../telephony/providers/exotel/config.py | 45 ++ .../telephony/providers/exotel/provider.py | 455 ++++++++++++++++++ .../telephony/providers/exotel/routes.py | 120 +++++ .../telephony/providers/exotel/serializers.py | 11 + .../telephony/providers/exotel/transport.py | 69 +++ 9 files changed, 822 insertions(+) create mode 100644 api/services/telephony/providers/exotel/__init__.py create mode 100644 api/services/telephony/providers/exotel/config.py create mode 100644 api/services/telephony/providers/exotel/provider.py create mode 100644 api/services/telephony/providers/exotel/routes.py create mode 100644 api/services/telephony/providers/exotel/serializers.py create mode 100644 api/services/telephony/providers/exotel/transport.py diff --git a/api/enums.py b/api/enums.py index 12557057..fab9d353 100644 --- a/api/enums.py +++ b/api/enums.py @@ -19,6 +19,7 @@ class CallType(Enum): class WorkflowRunMode(Enum): ARI = "ari" + EXOTEL = "exotel" PLIVO = "plivo" TWILIO = "twilio" VONAGE = "vonage" diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py index b056400f..37c65bc2 100644 --- a/api/schemas/telephony_config.py +++ b/api/schemas/telephony_config.py @@ -20,6 +20,10 @@ from api.services.telephony.providers.cloudonix.config import ( CloudonixConfigurationRequest, CloudonixConfigurationResponse, ) +from api.services.telephony.providers.exotel.config import ( + ExotelConfigurationRequest, + ExotelConfigurationResponse, +) from api.services.telephony.providers.plivo.config import ( PlivoConfigurationRequest, PlivoConfigurationResponse, @@ -48,6 +52,7 @@ TelephonyConfigRequest = Annotated[ Union[ ARIConfigurationRequest, CloudonixConfigurationRequest, + ExotelConfigurationRequest, PlivoConfigurationRequest, TelnyxConfigurationRequest, TwilioConfigurationRequest, @@ -73,6 +78,7 @@ class TelephonyConfigurationResponse(BaseModel): cloudonix: Optional[CloudonixConfigurationResponse] = None ari: Optional[ARIConfigurationResponse] = None telnyx: Optional[TelnyxConfigurationResponse] = None + exotel: Optional[ExotelConfigurationResponse] = None # --------------------------------------------------------------------------- @@ -136,6 +142,8 @@ __all__ = [ "ARIConfigurationResponse", "CloudonixConfigurationRequest", "CloudonixConfigurationResponse", + "ExotelConfigurationRequest", + "ExotelConfigurationResponse", "PlivoConfigurationRequest", "PlivoConfigurationResponse", "TelephonyConfigRequest", diff --git a/api/services/telephony/providers/__init__.py b/api/services/telephony/providers/__init__.py index 4df4e7f0..329a119f 100644 --- a/api/services/telephony/providers/__init__.py +++ b/api/services/telephony/providers/__init__.py @@ -9,6 +9,7 @@ or run_pipeline. from api.services.telephony.providers import ( # noqa: F401 -- import for side effects (registration) ari, cloudonix, + exotel, plivo, telnyx, twilio, diff --git a/api/services/telephony/providers/exotel/__init__.py b/api/services/telephony/providers/exotel/__init__.py new file mode 100644 index 00000000..945d2f75 --- /dev/null +++ b/api/services/telephony/providers/exotel/__init__.py @@ -0,0 +1,112 @@ +"""Exotel telephony provider package. + +Importing this module registers ExotelProvider with the telephony registry. +""" + +from typing import Any, Dict + +from api.services.telephony.registry import ( + ProviderSpec, + ProviderUIField, + ProviderUIMetadata, + register, +) + +from .config import ExotelConfigurationRequest, ExotelConfigurationResponse +from .provider import ExotelProvider +from .transport import create_transport + + +def _config_loader(value: Dict[str, Any]) -> Dict[str, Any]: + """Normalize stored credentials JSONB into the ExotelProvider constructor dict.""" + return { + "provider": "exotel", + "api_key": value.get("api_key"), + "api_token": value.get("api_token"), + "account_sid": value.get("account_sid"), + "subdomain": value.get("subdomain", "api.exotel.com"), + "from_numbers": value.get("from_numbers", []), + "app_id": value.get("app_id"), + } + + +_UI_METADATA = ProviderUIMetadata( + display_name="Exotel", + docs_url="https://developer.exotel.com/api/", + fields=[ + ProviderUIField( + name="api_key", + label="API Key", + type="text", + sensitive=True, + description="From Exotel Dashboard → Settings → API Settings", + ), + ProviderUIField( + name="api_token", + label="API Token", + type="password", + sensitive=True, + ), + ProviderUIField( + name="account_sid", + label="Account SID", + type="text", + sensitive=False, + description="Your Exotel account SID / subdomain identifier", + ), + ProviderUIField( + name="subdomain", + label="API Subdomain", + type="text", + sensitive=False, + required=False, + description=( + "api.exotel.com (global / SEA) or " + "api.in.exotel.com (India-hosted). Defaults to api.exotel.com." + ), + ), + ProviderUIField( + name="from_numbers", + label="ExoPhone Numbers (CallerIds)", + type="string-array", + description=( + "Exotel virtual phone numbers used as CallerIds for outbound calls. " + "Add them without country code if your account expects that format." + ), + ), + ProviderUIField( + name="app_id", + label="App ID (optional)", + type="text", + sensitive=False, + required=False, + description=( + "Exotel App Bazaar flow ID for inbound call routing. " + "Leave blank if you are configuring the answer URL manually." + ), + ), + ], +) + +SPEC = ProviderSpec( + name="exotel", + provider_cls=ExotelProvider, + config_loader=_config_loader, + transport_factory=create_transport, + transport_sample_rate=8000, # μ-law 8 kHz + config_request_cls=ExotelConfigurationRequest, + config_response_cls=ExotelConfigurationResponse, + ui_metadata=_UI_METADATA, + # AccountSid is present on all Exotel inbound webhooks. + account_id_credential_field="account_sid", +) + +register(SPEC) + +__all__ = [ + "SPEC", + "ExotelConfigurationRequest", + "ExotelConfigurationResponse", + "ExotelProvider", + "create_transport", +] diff --git a/api/services/telephony/providers/exotel/config.py b/api/services/telephony/providers/exotel/config.py new file mode 100644 index 00000000..43f90613 --- /dev/null +++ b/api/services/telephony/providers/exotel/config.py @@ -0,0 +1,45 @@ +"""Exotel telephony configuration schemas.""" + +from typing import List, Literal, Optional + +from pydantic import BaseModel, Field + + +class ExotelConfigurationRequest(BaseModel): + """Request schema for Exotel configuration.""" + + provider: Literal["exotel"] = Field(default="exotel") + api_key: str = Field(..., description="Exotel API Key (from Dashboard → Settings → API Settings)") + api_token: str = Field(..., description="Exotel API Token") + account_sid: str = Field(..., description="Exotel Account SID (subdomain/account identifier)") + subdomain: str = Field( + default="api.exotel.com", + description=( + "Exotel API subdomain. Use 'api.exotel.com' for global (SEA), " + "'api.in.exotel.com' for India-hosted accounts." + ), + ) + from_numbers: List[str] = Field( + default_factory=list, + description="List of Exotel ExoPhone numbers (CallerIds) used for outbound calls", + ) + app_id: Optional[str] = Field( + default=None, + description=( + "Exotel App ID (from App Bazaar → My Apps). " + "When set, used as the Url for inbound call flows. " + "Leave blank if managing inbound via the Dograh answer URL." + ), + ) + + +class ExotelConfigurationResponse(BaseModel): + """Response schema for Exotel configuration with masked sensitive fields.""" + + provider: Literal["exotel"] = Field(default="exotel") + api_key: str # Masked + api_token: str # Masked + account_sid: str + subdomain: str + from_numbers: List[str] + app_id: Optional[str] = None diff --git a/api/services/telephony/providers/exotel/provider.py b/api/services/telephony/providers/exotel/provider.py new file mode 100644 index 00000000..97ef72be --- /dev/null +++ b/api/services/telephony/providers/exotel/provider.py @@ -0,0 +1,455 @@ +""" +Exotel implementation of the TelephonyProvider interface. + +Exotel Voice v1 API: + Base URL: https://{api_key}:{api_token}@{subdomain}/v1/Accounts/{account_sid}/ + Outbound: POST /Calls/connect + Call details: GET /Calls/{CallSid}.json + +Audio format: 8 kHz μ-law (same wire format as Twilio / Plivo). +""" + +import json +import random +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +import aiohttp +from fastapi import HTTPException +from loguru import logger + +from api.db import db_client +from api.enums import WorkflowRunMode +from api.services.telephony.base import ( + CallInitiationResult, + NormalizedInboundData, + ProviderSyncResult, + TelephonyProvider, +) +from api.utils.common import get_backend_endpoints +from api.utils.telephony_address import normalize_telephony_address + +if TYPE_CHECKING: + from fastapi import WebSocket + + +class ExotelProvider(TelephonyProvider): + """ + Exotel Voice v1 implementation of TelephonyProvider. + + Credentials required: + - api_key : from Exotel Dashboard → Settings → API Settings + - api_token : same location + - account_sid : your Exotel account SID / subdomain identifier + - subdomain : e.g. api.exotel.com (global) or api.in.exotel.com (India) + """ + + PROVIDER_NAME = WorkflowRunMode.EXOTEL.value + WEBHOOK_ENDPOINT = "exotel-xml" # path under /api/v1/telephony + + def __init__(self, config: Dict[str, Any]): + self.api_key = config.get("api_key", "") + self.api_token = config.get("api_token", "") + self.account_sid = config.get("account_sid", "") + self.subdomain = config.get("subdomain", "api.exotel.com") + self.from_numbers = config.get("from_numbers", []) + self.app_id = config.get("app_id") + + if isinstance(self.from_numbers, str): + self.from_numbers = [self.from_numbers] + + self._base_url = ( + f"https://{self.api_key}:{self.api_token}" + f"@{self.subdomain}/v1/Accounts/{self.account_sid}" + ) + + # ------------------------------------------------------------------------- + # Helpers + # ------------------------------------------------------------------------- + + def _calls_url(self) -> str: + return f"{self._base_url}/Calls" + + def _call_url(self, call_sid: str) -> str: + return f"{self._base_url}/Calls/{call_sid}.json" + + # ------------------------------------------------------------------------- + # Outbound + # ------------------------------------------------------------------------- + + async def initiate_call( + self, + to_number: str, + webhook_url: str, + workflow_run_id: Optional[int] = None, + from_number: Optional[str] = None, + **kwargs: Any, + ) -> CallInitiationResult: + """Initiate an outbound call via Exotel Calls/connect. + + ``From`` — the number that gets called first (the callee / end-user). + ``CallerId`` — the ExoPhone (Exotel virtual number) shown as caller ID. + ``Url`` — Exotel app/flow URL; we point it at our answer webhook. + """ + if not self.validate_config(): + raise ValueError("Exotel provider not properly configured") + + caller_id = from_number or random.choice(self.from_numbers) + + data: Dict[str, Any] = { + "From": to_number, + "CallerId": caller_id, + "Url": webhook_url, + "CallType": "trans", # transactional — no recording by default + } + + if workflow_run_id: + backend_endpoint, _ = await get_backend_endpoints() + data["StatusCallback"] = ( + f"{backend_endpoint}/api/v1/telephony/exotel/status-callback/{workflow_run_id}" + ) + + data.update(kwargs) + + endpoint = f"{self._calls_url()}/connect" + logger.info( + f"[Exotel] Initiating outbound call to {to_number} " + f"via CallerID={caller_id}, workflow_run_id={workflow_run_id}" + ) + + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, data=data) as response: + response_text = await response.text() + if response.status not in (200, 201, 202): + logger.error( + f"[Exotel] Calls/connect failed: " + f"HTTP {response.status} body={response_text}" + ) + raise HTTPException( + status_code=response.status, + detail=f"Failed to initiate Exotel call: {response_text}", + ) + + try: + response_data = json.loads(response_text) + except json.JSONDecodeError: + logger.error(f"[Exotel] Non-JSON response: {response_text}") + raise HTTPException( + status_code=502, + detail=f"Exotel returned non-JSON response: {response_text}", + ) + + call_obj = response_data.get("Call", {}) + call_sid = call_obj.get("Sid") + if not call_sid: + raise HTTPException( + status_code=500, + detail=f"Exotel response missing Call.Sid: {response_data}", + ) + + logger.info( + f"[Exotel] Outbound call placed: Sid={call_sid} " + f"Status={call_obj.get('Status')}" + ) + return CallInitiationResult( + call_id=call_sid, + status=call_obj.get("Status", "queued"), + caller_number=caller_id, + provider_metadata={"call_sid": call_sid}, + raw_response=response_data, + ) + + async def get_call_status(self, call_id: str) -> Dict[str, Any]: + if not self.validate_config(): + raise ValueError("Exotel provider not properly configured") + + async with aiohttp.ClientSession() as session: + async with session.get(self._call_url(call_id)) as response: + if response.status != 200: + error_data = await response.text() + raise Exception( + f"[Exotel] Failed to get call status: {error_data}" + ) + data = await response.json(content_type=None) + return data.get("Call", data) + + async def get_call_cost(self, call_id: str) -> Dict[str, Any]: + try: + call_data = await self.get_call_status(call_id) + price_str = call_data.get("Price") or "0" + try: + cost = float(price_str) + except (ValueError, TypeError): + cost = 0.0 + + duration_str = call_data.get("Duration") or "0" + try: + duration = int(duration_str) + except (ValueError, TypeError): + duration = 0 + + return { + "cost_usd": cost, + "duration": duration, + "status": call_data.get("Status", "unknown"), + "price_unit": "INR", # Exotel prices in INR + "raw_response": call_data, + } + except Exception as e: + logger.error(f"[Exotel] Exception fetching call cost for {call_id}: {e}") + return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)} + + async def get_available_phone_numbers(self) -> List[str]: + return self.from_numbers + + def validate_config(self) -> bool: + return bool( + self.api_key + and self.api_token + and self.account_sid + and self.from_numbers + ) + + # ------------------------------------------------------------------------- + # Webhooks / answer URL + # ------------------------------------------------------------------------- + + async def verify_webhook_signature( + self, + url: str, + params: Dict[str, Any], + signature: str, + ) -> bool: + # Exotel v1 does not sign webhooks with a secret. Accept all. + return True + + async def get_webhook_response( + self, workflow_id: int, user_id: int, workflow_run_id: int + ) -> str: + """Return ExoML that streams audio over WebSocket (μ-law 8 kHz).""" + _, wss_backend_endpoint = await get_backend_endpoints() + return ( + f'\n' + f"\n" + f' ' + f"{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}" + f"\n" + f"" + ) + + def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Normalize Exotel StatusCallback POST fields.""" + status_map = { + "queued": "queued", + "in-progress": "answered", + "completed": "completed", + "failed": "failed", + "busy": "busy", + "no-answer": "no-answer", + "answered": "answered", + "terminal": "completed", + } + raw_status = (data.get("Status") or data.get("EventType") or "").lower() + call_sid = data.get("CallSid", "") + return { + "call_id": call_sid, + "status": status_map.get(raw_status, raw_status), + "from_number": data.get("From"), + "to_number": data.get("To"), + "direction": data.get("Direction"), + "duration": data.get("ConversationDuration") or data.get("Duration"), + "extra": data, + } + + # ------------------------------------------------------------------------- + # WebSocket + # ------------------------------------------------------------------------- + + async def handle_websocket( + self, + websocket: "WebSocket", + workflow_id: int, + user_id: int, + workflow_run_id: int, + ) -> None: + from api.services.pipecat.run_pipeline import run_pipeline_telephony + + # Exotel sends a JSON "start" event first (same pattern as Plivo/Twilio) + first_msg = await websocket.receive_text() + start_msg = json.loads(first_msg) + + if start_msg.get("event") != "start": + logger.error( + f"[Exotel] Expected 'start' event, got: {start_msg.get('event')}" + ) + await websocket.close(code=4400, reason="Expected start event") + return + + start_data = start_msg.get("start", {}) + stream_id = start_data.get("streamId") or start_msg.get("streamId", "") + + if not stream_id: + logger.error(f"[Exotel] Missing streamId in start event: {start_msg}") + await websocket.close(code=4400, reason="Missing streamId") + return + + # Prefer call_id stored on the workflow run (populated by the answer webhook) + workflow_run = await db_client.get_workflow_run(workflow_run_id) + call_id = None + if workflow_run and workflow_run.gathered_context: + call_id = workflow_run.gathered_context.get("call_id") + + if not call_id: + call_id = ( + start_data.get("callId") + or start_data.get("callSid") + or start_data.get("CallSid") + ) + + if not call_id: + logger.error( + f"[Exotel] Missing call ID for workflow run {workflow_run_id}" + ) + await websocket.close(code=4400, reason="Missing call ID") + return + + await run_pipeline_telephony( + websocket, + provider_name=self.PROVIDER_NAME, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + user_id=user_id, + call_id=call_id, + transport_kwargs={"stream_id": stream_id, "call_id": call_id}, + ) + + # ------------------------------------------------------------------------- + # Inbound + # ------------------------------------------------------------------------- + + @classmethod + def can_handle_webhook( + cls, webhook_data: Dict[str, Any], headers: Dict[str, str] + ) -> bool: + # Exotel inbound webhooks carry CallSid and AccountSid but no + # provider-specific header. We match on AccountSid presence. + return "CallSid" in webhook_data and "AccountSid" in webhook_data + + @staticmethod + def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: + from_raw = webhook_data.get("From", "") + to_raw = webhook_data.get("To", "") or webhook_data.get("PhoneNumberSid", "") + return NormalizedInboundData( + provider=ExotelProvider.PROVIDER_NAME, + call_id=webhook_data.get("CallSid", ""), + from_number=normalize_telephony_address(from_raw).canonical + if from_raw + else "", + to_number=normalize_telephony_address(to_raw).canonical + if to_raw + else "", + direction=webhook_data.get("Direction", "inbound"), + call_status=webhook_data.get("Status", ""), + account_id=webhook_data.get("AccountSid"), + raw_data=webhook_data, + ) + + @staticmethod + def validate_account_id(config_data: dict, webhook_account_id: str) -> bool: + if webhook_account_id: + return config_data.get("account_sid") == webhook_account_id + logger.warning( + "[Exotel] Inbound webhook missing AccountSid — " + "falling back to config existence check" + ) + return bool(config_data.get("account_sid")) + + def normalize_phone_number(self, phone_number: str) -> str: + return phone_number + + async def verify_inbound_signature( + self, + url: str, + webhook_data: Dict[str, Any], + headers: Dict[str, str], + body: str = "", + ) -> bool: + # Exotel v1 does not sign inbound webhooks. + return True + + async def start_inbound_stream( + self, + *, + websocket_url: str, + workflow_run_id: int, + normalized_data: NormalizedInboundData, + backend_endpoint: str, + ): + from fastapi import Response + + hangup_callback_attr = "" + if workflow_run_id: + hangup_url = ( + f"{backend_endpoint}/api/v1/telephony/exotel" + f"/status-callback/{workflow_run_id}" + ) + hangup_callback_attr = ( + f' statusCallbackUrl="{hangup_url}" statusCallbackMethod="POST"' + ) + + exo_xml = ( + f'\n' + f"\n" + f' " + f"{websocket_url}" + f"\n" + f"" + ) + return Response(content=exo_xml, media_type="application/xml") + + @staticmethod + def generate_error_response(error_type: str, message: str) -> tuple: + from fastapi import Response + + exo_xml = ( + f'\n' + f"\n" + f" Sorry, there was an error processing your call. {message}\n" + f" \n" + f"" + ) + return Response(content=exo_xml, media_type="application/xml") + + # ------------------------------------------------------------------------- + # Transfers (not supported) + # ------------------------------------------------------------------------- + + async def transfer_call( + self, + destination: str, + transfer_id: str, + conference_name: str, + timeout: int = 30, + **kwargs: Any, + ) -> Dict[str, Any]: + raise NotImplementedError("Exotel provider does not support call transfers") + + def supports_transfers(self) -> bool: + return False + + # ------------------------------------------------------------------------- + # Optional: configure inbound (no-op — Exotel doesn't support programmatic + # answer-URL binding via the v1 REST API) + # ------------------------------------------------------------------------- + + async def configure_inbound( + self, address: str, webhook_url: Optional[str] + ) -> ProviderSyncResult: + logger.info( + f"[Exotel] configure_inbound called for {address} → {webhook_url}. " + "Exotel v1 does not support programmatic webhook binding; " + "configure the answer URL in the Exotel App Bazaar." + ) + return ProviderSyncResult(ok=True) diff --git a/api/services/telephony/providers/exotel/routes.py b/api/services/telephony/providers/exotel/routes.py new file mode 100644 index 00000000..2a9935be --- /dev/null +++ b/api/services/telephony/providers/exotel/routes.py @@ -0,0 +1,120 @@ +"""Exotel telephony routes. + +Mounted under /api/v1/telephony by api.routes.telephony via importlib. +""" + +import json +from typing import Optional + +from fastapi import APIRouter, Request +from loguru import logger +from pipecat.utils.run_context import set_current_run_id +from starlette.responses import HTMLResponse + +from api.db import db_client +from api.services.telephony.factory import get_telephony_provider_for_run +from api.services.telephony.status_processor import ( + StatusCallbackRequest, + _process_status_update, +) +from api.utils.common import get_backend_endpoints + +router = APIRouter() + + +# --------------------------------------------------------------------------- +# Answer webhook — called by Exotel when the outbound call is answered. +# Returns ExoML that opens the bidirectional Stream. +# --------------------------------------------------------------------------- + + +@router.post("/exotel-xml", include_in_schema=False) +async def handle_exotel_xml_webhook( + workflow_id: int, + user_id: int, + workflow_run_id: int, + organization_id: int, + request: Request, +): + """ + Handle initial webhook from Exotel when an outbound call is answered. + Returns ExoML . + """ + set_current_run_id(workflow_run_id) + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + provider = await get_telephony_provider_for_run(workflow_run, organization_id) + + form_data = await request.form() + callback_data = dict(form_data) + logger.info( + f"[run {workflow_run_id}] Exotel answer webhook: " + f"{json.dumps(callback_data)}" + ) + + # Exotel sends CallSid in the answer webhook — persist it on the run so + # handle_websocket can resolve it later. + call_id = callback_data.get("CallSid") + if call_id: + gathered_context = dict(workflow_run.gathered_context or {}) + gathered_context["call_id"] = call_id + await db_client.update_workflow_run( + run_id=workflow_run_id, gathered_context=gathered_context + ) + + response_content = await provider.get_webhook_response( + workflow_id, user_id, workflow_run_id + ) + return HTMLResponse(content=response_content, media_type="application/xml") + + +# --------------------------------------------------------------------------- +# Status callback — called by Exotel on call completion / state changes. +# --------------------------------------------------------------------------- + + +@router.post("/exotel/status-callback/{workflow_run_id}") +async def handle_exotel_status_callback( + workflow_run_id: int, + request: Request, +): + """Handle Exotel StatusCallback POST.""" + set_current_run_id(workflow_run_id) + + form_data = await request.form() + callback_data = dict(form_data) + logger.info( + f"[run {workflow_run_id}] Exotel status callback: " + f"{json.dumps(callback_data)}" + ) + + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.warning( + f"[run {workflow_run_id}] Exotel status callback: workflow run not found" + ) + return {"status": "ignored", "reason": "workflow_run_not_found"} + + workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) + if not workflow: + logger.warning( + f"[run {workflow_run_id}] Exotel status callback: workflow not found" + ) + return {"status": "ignored", "reason": "workflow_not_found"} + + provider = await get_telephony_provider_for_run( + workflow_run, workflow.organization_id + ) + + parsed_data = provider.parse_status_callback(callback_data) + status_update = StatusCallbackRequest( + call_id=parsed_data["call_id"], + status=parsed_data["status"], + from_number=parsed_data.get("from_number"), + to_number=parsed_data.get("to_number"), + direction=parsed_data.get("direction"), + duration=parsed_data.get("duration"), + extra=parsed_data.get("extra", {}), + ) + + await _process_status_update(workflow_run_id, status_update) + return {"status": "success"} diff --git a/api/services/telephony/providers/exotel/serializers.py b/api/services/telephony/providers/exotel/serializers.py new file mode 100644 index 00000000..4e0f0f25 --- /dev/null +++ b/api/services/telephony/providers/exotel/serializers.py @@ -0,0 +1,11 @@ +"""Exotel frame serializer. + +Exotel streams audio over WebSocket using the same JSON envelope and μ-law +8 kHz encoding as Plivo. We re-export PlivoFrameSerializer directly so +transport.py can import from `.serializers` and we have an obvious place to +drop a custom subclass later if Exotel diverges. +""" + +from pipecat.serializers.plivo import PlivoFrameSerializer as ExotelFrameSerializer + +__all__ = ["ExotelFrameSerializer"] diff --git a/api/services/telephony/providers/exotel/transport.py b/api/services/telephony/providers/exotel/transport.py new file mode 100644 index 00000000..2a7120ca --- /dev/null +++ b/api/services/telephony/providers/exotel/transport.py @@ -0,0 +1,69 @@ +"""Exotel transport factory.""" + +from fastapi import WebSocket +from pipecat.transports.websocket.fastapi import ( + FastAPIWebsocketParams, + FastAPIWebsocketTransport, +) + +from api.services.pipecat.audio_config import AudioConfig +from api.services.pipecat.audio_mixer import build_audio_out_mixer +from api.services.telephony.factory import load_credentials_for_transport + +from .serializers import ExotelFrameSerializer + + +async def create_transport( + websocket: WebSocket, + workflow_run_id: int, + audio_config: AudioConfig, + organization_id: int, + *, + ambient_noise_config: dict | None = None, + telephony_configuration_id: int | None = None, + stream_id: str, + call_id: str, +): + """Create a WebSocket transport for an Exotel call leg.""" + config = await load_credentials_for_transport( + organization_id, telephony_configuration_id, expected_provider="exotel" + ) + + api_key = config.get("api_key") + api_token = config.get("api_token") + + if not api_key or not api_token: + raise ValueError( + f"Incomplete Exotel configuration for organization {organization_id}" + ) + + # ExotelFrameSerializer is PlivoFrameSerializer under the hood — + # same μ-law 8 kHz JSON envelope. The auth_id/auth_token params are used + # by Plivo's serializer for optional mid-call REST calls; Exotel doesn't + # need them but we pass api_key/api_token for future extensibility. + serializer = ExotelFrameSerializer( + stream_id=stream_id, + call_id=call_id, + auth_id=api_key, + auth_token=api_token, + params=ExotelFrameSerializer.InputParams( + plivo_sample_rate=8000, + sample_rate=audio_config.pipeline_sample_rate, + ), + ) + + mixer = await build_audio_out_mixer( + audio_config.transport_out_sample_rate, ambient_noise_config + ) + + return FastAPIWebsocketTransport( + websocket=websocket, + params=FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=audio_config.transport_in_sample_rate, + audio_out_sample_rate=audio_config.transport_out_sample_rate, + audio_out_mixer=mixer, + serializer=serializer, + ), + )