diff --git a/api/enums.py b/api/enums.py
index 12557057..fab9d353 100644
--- a/api/enums.py
+++ b/api/enums.py
@@ -19,6 +19,7 @@ class CallType(Enum):
class WorkflowRunMode(Enum):
ARI = "ari"
+ EXOTEL = "exotel"
PLIVO = "plivo"
TWILIO = "twilio"
VONAGE = "vonage"
diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py
index b056400f..37c65bc2 100644
--- a/api/schemas/telephony_config.py
+++ b/api/schemas/telephony_config.py
@@ -20,6 +20,10 @@ from api.services.telephony.providers.cloudonix.config import (
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
)
+from api.services.telephony.providers.exotel.config import (
+ ExotelConfigurationRequest,
+ ExotelConfigurationResponse,
+)
from api.services.telephony.providers.plivo.config import (
PlivoConfigurationRequest,
PlivoConfigurationResponse,
@@ -48,6 +52,7 @@ TelephonyConfigRequest = Annotated[
Union[
ARIConfigurationRequest,
CloudonixConfigurationRequest,
+ ExotelConfigurationRequest,
PlivoConfigurationRequest,
TelnyxConfigurationRequest,
TwilioConfigurationRequest,
@@ -73,6 +78,7 @@ class TelephonyConfigurationResponse(BaseModel):
cloudonix: Optional[CloudonixConfigurationResponse] = None
ari: Optional[ARIConfigurationResponse] = None
telnyx: Optional[TelnyxConfigurationResponse] = None
+ exotel: Optional[ExotelConfigurationResponse] = None
# ---------------------------------------------------------------------------
@@ -136,6 +142,8 @@ __all__ = [
"ARIConfigurationResponse",
"CloudonixConfigurationRequest",
"CloudonixConfigurationResponse",
+ "ExotelConfigurationRequest",
+ "ExotelConfigurationResponse",
"PlivoConfigurationRequest",
"PlivoConfigurationResponse",
"TelephonyConfigRequest",
diff --git a/api/services/telephony/providers/__init__.py b/api/services/telephony/providers/__init__.py
index 4df4e7f0..329a119f 100644
--- a/api/services/telephony/providers/__init__.py
+++ b/api/services/telephony/providers/__init__.py
@@ -9,6 +9,7 @@ or run_pipeline.
from api.services.telephony.providers import ( # noqa: F401 -- import for side effects (registration)
ari,
cloudonix,
+ exotel,
plivo,
telnyx,
twilio,
diff --git a/api/services/telephony/providers/exotel/__init__.py b/api/services/telephony/providers/exotel/__init__.py
new file mode 100644
index 00000000..945d2f75
--- /dev/null
+++ b/api/services/telephony/providers/exotel/__init__.py
@@ -0,0 +1,112 @@
+"""Exotel telephony provider package.
+
+Importing this module registers ExotelProvider with the telephony registry.
+"""
+
+from typing import Any, Dict
+
+from api.services.telephony.registry import (
+ ProviderSpec,
+ ProviderUIField,
+ ProviderUIMetadata,
+ register,
+)
+
+from .config import ExotelConfigurationRequest, ExotelConfigurationResponse
+from .provider import ExotelProvider
+from .transport import create_transport
+
+
+def _config_loader(value: Dict[str, Any]) -> Dict[str, Any]:
+ """Normalize stored credentials JSONB into the ExotelProvider constructor dict."""
+ return {
+ "provider": "exotel",
+ "api_key": value.get("api_key"),
+ "api_token": value.get("api_token"),
+ "account_sid": value.get("account_sid"),
+ "subdomain": value.get("subdomain", "api.exotel.com"),
+ "from_numbers": value.get("from_numbers", []),
+ "app_id": value.get("app_id"),
+ }
+
+
+_UI_METADATA = ProviderUIMetadata(
+ display_name="Exotel",
+ docs_url="https://developer.exotel.com/api/",
+ fields=[
+ ProviderUIField(
+ name="api_key",
+ label="API Key",
+ type="text",
+ sensitive=True,
+ description="From Exotel Dashboard → Settings → API Settings",
+ ),
+ ProviderUIField(
+ name="api_token",
+ label="API Token",
+ type="password",
+ sensitive=True,
+ ),
+ ProviderUIField(
+ name="account_sid",
+ label="Account SID",
+ type="text",
+ sensitive=False,
+ description="Your Exotel account SID / subdomain identifier",
+ ),
+ ProviderUIField(
+ name="subdomain",
+ label="API Subdomain",
+ type="text",
+ sensitive=False,
+ required=False,
+ description=(
+ "api.exotel.com (global / SEA) or "
+ "api.in.exotel.com (India-hosted). Defaults to api.exotel.com."
+ ),
+ ),
+ ProviderUIField(
+ name="from_numbers",
+ label="ExoPhone Numbers (CallerIds)",
+ type="string-array",
+ description=(
+ "Exotel virtual phone numbers used as CallerIds for outbound calls. "
+ "Add them without country code if your account expects that format."
+ ),
+ ),
+ ProviderUIField(
+ name="app_id",
+ label="App ID (optional)",
+ type="text",
+ sensitive=False,
+ required=False,
+ description=(
+ "Exotel App Bazaar flow ID for inbound call routing. "
+ "Leave blank if you are configuring the answer URL manually."
+ ),
+ ),
+ ],
+)
+
+SPEC = ProviderSpec(
+ name="exotel",
+ provider_cls=ExotelProvider,
+ config_loader=_config_loader,
+ transport_factory=create_transport,
+ transport_sample_rate=8000, # μ-law 8 kHz
+ config_request_cls=ExotelConfigurationRequest,
+ config_response_cls=ExotelConfigurationResponse,
+ ui_metadata=_UI_METADATA,
+ # AccountSid is present on all Exotel inbound webhooks.
+ account_id_credential_field="account_sid",
+)
+
+register(SPEC)
+
+__all__ = [
+ "SPEC",
+ "ExotelConfigurationRequest",
+ "ExotelConfigurationResponse",
+ "ExotelProvider",
+ "create_transport",
+]
diff --git a/api/services/telephony/providers/exotel/config.py b/api/services/telephony/providers/exotel/config.py
new file mode 100644
index 00000000..43f90613
--- /dev/null
+++ b/api/services/telephony/providers/exotel/config.py
@@ -0,0 +1,45 @@
+"""Exotel telephony configuration schemas."""
+
+from typing import List, Literal, Optional
+
+from pydantic import BaseModel, Field
+
+
+class ExotelConfigurationRequest(BaseModel):
+ """Request schema for Exotel configuration."""
+
+ provider: Literal["exotel"] = Field(default="exotel")
+ api_key: str = Field(..., description="Exotel API Key (from Dashboard → Settings → API Settings)")
+ api_token: str = Field(..., description="Exotel API Token")
+ account_sid: str = Field(..., description="Exotel Account SID (subdomain/account identifier)")
+ subdomain: str = Field(
+ default="api.exotel.com",
+ description=(
+ "Exotel API subdomain. Use 'api.exotel.com' for global (SEA), "
+ "'api.in.exotel.com' for India-hosted accounts."
+ ),
+ )
+ from_numbers: List[str] = Field(
+ default_factory=list,
+ description="List of Exotel ExoPhone numbers (CallerIds) used for outbound calls",
+ )
+ app_id: Optional[str] = Field(
+ default=None,
+ description=(
+ "Exotel App ID (from App Bazaar → My Apps). "
+ "When set, used as the Url for inbound call flows. "
+ "Leave blank if managing inbound via the Dograh answer URL."
+ ),
+ )
+
+
+class ExotelConfigurationResponse(BaseModel):
+ """Response schema for Exotel configuration with masked sensitive fields."""
+
+ provider: Literal["exotel"] = Field(default="exotel")
+ api_key: str # Masked
+ api_token: str # Masked
+ account_sid: str
+ subdomain: str
+ from_numbers: List[str]
+ app_id: Optional[str] = None
diff --git a/api/services/telephony/providers/exotel/provider.py b/api/services/telephony/providers/exotel/provider.py
new file mode 100644
index 00000000..97ef72be
--- /dev/null
+++ b/api/services/telephony/providers/exotel/provider.py
@@ -0,0 +1,455 @@
+"""
+Exotel implementation of the TelephonyProvider interface.
+
+Exotel Voice v1 API:
+ Base URL: https://{api_key}:{api_token}@{subdomain}/v1/Accounts/{account_sid}/
+ Outbound: POST /Calls/connect
+ Call details: GET /Calls/{CallSid}.json
+
+Audio format: 8 kHz μ-law (same wire format as Twilio / Plivo).
+"""
+
+import json
+import random
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+
+import aiohttp
+from fastapi import HTTPException
+from loguru import logger
+
+from api.db import db_client
+from api.enums import WorkflowRunMode
+from api.services.telephony.base import (
+ CallInitiationResult,
+ NormalizedInboundData,
+ ProviderSyncResult,
+ TelephonyProvider,
+)
+from api.utils.common import get_backend_endpoints
+from api.utils.telephony_address import normalize_telephony_address
+
+if TYPE_CHECKING:
+ from fastapi import WebSocket
+
+
+class ExotelProvider(TelephonyProvider):
+ """
+ Exotel Voice v1 implementation of TelephonyProvider.
+
+ Credentials required:
+ - api_key : from Exotel Dashboard → Settings → API Settings
+ - api_token : same location
+ - account_sid : your Exotel account SID / subdomain identifier
+ - subdomain : e.g. api.exotel.com (global) or api.in.exotel.com (India)
+ """
+
+ PROVIDER_NAME = WorkflowRunMode.EXOTEL.value
+ WEBHOOK_ENDPOINT = "exotel-xml" # path under /api/v1/telephony
+
+ def __init__(self, config: Dict[str, Any]):
+ self.api_key = config.get("api_key", "")
+ self.api_token = config.get("api_token", "")
+ self.account_sid = config.get("account_sid", "")
+ self.subdomain = config.get("subdomain", "api.exotel.com")
+ self.from_numbers = config.get("from_numbers", [])
+ self.app_id = config.get("app_id")
+
+ if isinstance(self.from_numbers, str):
+ self.from_numbers = [self.from_numbers]
+
+ self._base_url = (
+ f"https://{self.api_key}:{self.api_token}"
+ f"@{self.subdomain}/v1/Accounts/{self.account_sid}"
+ )
+
+ # -------------------------------------------------------------------------
+ # Helpers
+ # -------------------------------------------------------------------------
+
+ def _calls_url(self) -> str:
+ return f"{self._base_url}/Calls"
+
+ def _call_url(self, call_sid: str) -> str:
+ return f"{self._base_url}/Calls/{call_sid}.json"
+
+ # -------------------------------------------------------------------------
+ # Outbound
+ # -------------------------------------------------------------------------
+
+ async def initiate_call(
+ self,
+ to_number: str,
+ webhook_url: str,
+ workflow_run_id: Optional[int] = None,
+ from_number: Optional[str] = None,
+ **kwargs: Any,
+ ) -> CallInitiationResult:
+ """Initiate an outbound call via Exotel Calls/connect.
+
+ ``From`` — the number that gets called first (the callee / end-user).
+ ``CallerId`` — the ExoPhone (Exotel virtual number) shown as caller ID.
+ ``Url`` — Exotel app/flow URL; we point it at our answer webhook.
+ """
+ if not self.validate_config():
+ raise ValueError("Exotel provider not properly configured")
+
+ caller_id = from_number or random.choice(self.from_numbers)
+
+ data: Dict[str, Any] = {
+ "From": to_number,
+ "CallerId": caller_id,
+ "Url": webhook_url,
+ "CallType": "trans", # transactional — no recording by default
+ }
+
+ if workflow_run_id:
+ backend_endpoint, _ = await get_backend_endpoints()
+ data["StatusCallback"] = (
+ f"{backend_endpoint}/api/v1/telephony/exotel/status-callback/{workflow_run_id}"
+ )
+
+ data.update(kwargs)
+
+ endpoint = f"{self._calls_url()}/connect"
+ logger.info(
+ f"[Exotel] Initiating outbound call to {to_number} "
+ f"via CallerID={caller_id}, workflow_run_id={workflow_run_id}"
+ )
+
+ async with aiohttp.ClientSession() as session:
+ async with session.post(endpoint, data=data) as response:
+ response_text = await response.text()
+ if response.status not in (200, 201, 202):
+ logger.error(
+ f"[Exotel] Calls/connect failed: "
+ f"HTTP {response.status} body={response_text}"
+ )
+ raise HTTPException(
+ status_code=response.status,
+ detail=f"Failed to initiate Exotel call: {response_text}",
+ )
+
+ try:
+ response_data = json.loads(response_text)
+ except json.JSONDecodeError:
+ logger.error(f"[Exotel] Non-JSON response: {response_text}")
+ raise HTTPException(
+ status_code=502,
+ detail=f"Exotel returned non-JSON response: {response_text}",
+ )
+
+ call_obj = response_data.get("Call", {})
+ call_sid = call_obj.get("Sid")
+ if not call_sid:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Exotel response missing Call.Sid: {response_data}",
+ )
+
+ logger.info(
+ f"[Exotel] Outbound call placed: Sid={call_sid} "
+ f"Status={call_obj.get('Status')}"
+ )
+ return CallInitiationResult(
+ call_id=call_sid,
+ status=call_obj.get("Status", "queued"),
+ caller_number=caller_id,
+ provider_metadata={"call_sid": call_sid},
+ raw_response=response_data,
+ )
+
+ async def get_call_status(self, call_id: str) -> Dict[str, Any]:
+ if not self.validate_config():
+ raise ValueError("Exotel provider not properly configured")
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(self._call_url(call_id)) as response:
+ if response.status != 200:
+ error_data = await response.text()
+ raise Exception(
+ f"[Exotel] Failed to get call status: {error_data}"
+ )
+ data = await response.json(content_type=None)
+ return data.get("Call", data)
+
+ async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
+ try:
+ call_data = await self.get_call_status(call_id)
+ price_str = call_data.get("Price") or "0"
+ try:
+ cost = float(price_str)
+ except (ValueError, TypeError):
+ cost = 0.0
+
+ duration_str = call_data.get("Duration") or "0"
+ try:
+ duration = int(duration_str)
+ except (ValueError, TypeError):
+ duration = 0
+
+ return {
+ "cost_usd": cost,
+ "duration": duration,
+ "status": call_data.get("Status", "unknown"),
+ "price_unit": "INR", # Exotel prices in INR
+ "raw_response": call_data,
+ }
+ except Exception as e:
+ logger.error(f"[Exotel] Exception fetching call cost for {call_id}: {e}")
+ return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
+
+ async def get_available_phone_numbers(self) -> List[str]:
+ return self.from_numbers
+
+ def validate_config(self) -> bool:
+ return bool(
+ self.api_key
+ and self.api_token
+ and self.account_sid
+ and self.from_numbers
+ )
+
+ # -------------------------------------------------------------------------
+ # Webhooks / answer URL
+ # -------------------------------------------------------------------------
+
+ async def verify_webhook_signature(
+ self,
+ url: str,
+ params: Dict[str, Any],
+ signature: str,
+ ) -> bool:
+ # Exotel v1 does not sign webhooks with a secret. Accept all.
+ return True
+
+ async def get_webhook_response(
+ self, workflow_id: int, user_id: int, workflow_run_id: int
+ ) -> str:
+ """Return ExoML that streams audio over WebSocket (μ-law 8 kHz)."""
+ _, wss_backend_endpoint = await get_backend_endpoints()
+ return (
+ f'\n'
+ f"\n"
+ f' '
+ f"{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{user_id}/{workflow_run_id}"
+ f"\n"
+ f""
+ )
+
+ def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Normalize Exotel StatusCallback POST fields."""
+ status_map = {
+ "queued": "queued",
+ "in-progress": "answered",
+ "completed": "completed",
+ "failed": "failed",
+ "busy": "busy",
+ "no-answer": "no-answer",
+ "answered": "answered",
+ "terminal": "completed",
+ }
+ raw_status = (data.get("Status") or data.get("EventType") or "").lower()
+ call_sid = data.get("CallSid", "")
+ return {
+ "call_id": call_sid,
+ "status": status_map.get(raw_status, raw_status),
+ "from_number": data.get("From"),
+ "to_number": data.get("To"),
+ "direction": data.get("Direction"),
+ "duration": data.get("ConversationDuration") or data.get("Duration"),
+ "extra": data,
+ }
+
+ # -------------------------------------------------------------------------
+ # WebSocket
+ # -------------------------------------------------------------------------
+
+ async def handle_websocket(
+ self,
+ websocket: "WebSocket",
+ workflow_id: int,
+ user_id: int,
+ workflow_run_id: int,
+ ) -> None:
+ from api.services.pipecat.run_pipeline import run_pipeline_telephony
+
+ # Exotel sends a JSON "start" event first (same pattern as Plivo/Twilio)
+ first_msg = await websocket.receive_text()
+ start_msg = json.loads(first_msg)
+
+ if start_msg.get("event") != "start":
+ logger.error(
+ f"[Exotel] Expected 'start' event, got: {start_msg.get('event')}"
+ )
+ await websocket.close(code=4400, reason="Expected start event")
+ return
+
+ start_data = start_msg.get("start", {})
+ stream_id = start_data.get("streamId") or start_msg.get("streamId", "")
+
+ if not stream_id:
+ logger.error(f"[Exotel] Missing streamId in start event: {start_msg}")
+ await websocket.close(code=4400, reason="Missing streamId")
+ return
+
+ # Prefer call_id stored on the workflow run (populated by the answer webhook)
+ workflow_run = await db_client.get_workflow_run(workflow_run_id)
+ call_id = None
+ if workflow_run and workflow_run.gathered_context:
+ call_id = workflow_run.gathered_context.get("call_id")
+
+ if not call_id:
+ call_id = (
+ start_data.get("callId")
+ or start_data.get("callSid")
+ or start_data.get("CallSid")
+ )
+
+ if not call_id:
+ logger.error(
+ f"[Exotel] Missing call ID for workflow run {workflow_run_id}"
+ )
+ await websocket.close(code=4400, reason="Missing call ID")
+ return
+
+ await run_pipeline_telephony(
+ websocket,
+ provider_name=self.PROVIDER_NAME,
+ workflow_id=workflow_id,
+ workflow_run_id=workflow_run_id,
+ user_id=user_id,
+ call_id=call_id,
+ transport_kwargs={"stream_id": stream_id, "call_id": call_id},
+ )
+
+ # -------------------------------------------------------------------------
+ # Inbound
+ # -------------------------------------------------------------------------
+
+ @classmethod
+ def can_handle_webhook(
+ cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
+ ) -> bool:
+ # Exotel inbound webhooks carry CallSid and AccountSid but no
+ # provider-specific header. We match on AccountSid presence.
+ return "CallSid" in webhook_data and "AccountSid" in webhook_data
+
+ @staticmethod
+ def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
+ from_raw = webhook_data.get("From", "")
+ to_raw = webhook_data.get("To", "") or webhook_data.get("PhoneNumberSid", "")
+ return NormalizedInboundData(
+ provider=ExotelProvider.PROVIDER_NAME,
+ call_id=webhook_data.get("CallSid", ""),
+ from_number=normalize_telephony_address(from_raw).canonical
+ if from_raw
+ else "",
+ to_number=normalize_telephony_address(to_raw).canonical
+ if to_raw
+ else "",
+ direction=webhook_data.get("Direction", "inbound"),
+ call_status=webhook_data.get("Status", ""),
+ account_id=webhook_data.get("AccountSid"),
+ raw_data=webhook_data,
+ )
+
+ @staticmethod
+ def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
+ if webhook_account_id:
+ return config_data.get("account_sid") == webhook_account_id
+ logger.warning(
+ "[Exotel] Inbound webhook missing AccountSid — "
+ "falling back to config existence check"
+ )
+ return bool(config_data.get("account_sid"))
+
+ def normalize_phone_number(self, phone_number: str) -> str:
+ return phone_number
+
+ async def verify_inbound_signature(
+ self,
+ url: str,
+ webhook_data: Dict[str, Any],
+ headers: Dict[str, str],
+ body: str = "",
+ ) -> bool:
+ # Exotel v1 does not sign inbound webhooks.
+ return True
+
+ async def start_inbound_stream(
+ self,
+ *,
+ websocket_url: str,
+ workflow_run_id: int,
+ normalized_data: NormalizedInboundData,
+ backend_endpoint: str,
+ ):
+ from fastapi import Response
+
+ hangup_callback_attr = ""
+ if workflow_run_id:
+ hangup_url = (
+ f"{backend_endpoint}/api/v1/telephony/exotel"
+ f"/status-callback/{workflow_run_id}"
+ )
+ hangup_callback_attr = (
+ f' statusCallbackUrl="{hangup_url}" statusCallbackMethod="POST"'
+ )
+
+ exo_xml = (
+ f'\n'
+ f"\n"
+ f' "
+ f"{websocket_url}"
+ f"\n"
+ f""
+ )
+ return Response(content=exo_xml, media_type="application/xml")
+
+ @staticmethod
+ def generate_error_response(error_type: str, message: str) -> tuple:
+ from fastapi import Response
+
+ exo_xml = (
+ f'\n'
+ f"\n"
+ f" Sorry, there was an error processing your call. {message}\n"
+ f" \n"
+ f""
+ )
+ return Response(content=exo_xml, media_type="application/xml")
+
+ # -------------------------------------------------------------------------
+ # Transfers (not supported)
+ # -------------------------------------------------------------------------
+
+ async def transfer_call(
+ self,
+ destination: str,
+ transfer_id: str,
+ conference_name: str,
+ timeout: int = 30,
+ **kwargs: Any,
+ ) -> Dict[str, Any]:
+ raise NotImplementedError("Exotel provider does not support call transfers")
+
+ def supports_transfers(self) -> bool:
+ return False
+
+ # -------------------------------------------------------------------------
+ # Optional: configure inbound (no-op — Exotel doesn't support programmatic
+ # answer-URL binding via the v1 REST API)
+ # -------------------------------------------------------------------------
+
+ async def configure_inbound(
+ self, address: str, webhook_url: Optional[str]
+ ) -> ProviderSyncResult:
+ logger.info(
+ f"[Exotel] configure_inbound called for {address} → {webhook_url}. "
+ "Exotel v1 does not support programmatic webhook binding; "
+ "configure the answer URL in the Exotel App Bazaar."
+ )
+ return ProviderSyncResult(ok=True)
diff --git a/api/services/telephony/providers/exotel/routes.py b/api/services/telephony/providers/exotel/routes.py
new file mode 100644
index 00000000..2a9935be
--- /dev/null
+++ b/api/services/telephony/providers/exotel/routes.py
@@ -0,0 +1,120 @@
+"""Exotel telephony routes.
+
+Mounted under /api/v1/telephony by api.routes.telephony via importlib.
+"""
+
+import json
+from typing import Optional
+
+from fastapi import APIRouter, Request
+from loguru import logger
+from pipecat.utils.run_context import set_current_run_id
+from starlette.responses import HTMLResponse
+
+from api.db import db_client
+from api.services.telephony.factory import get_telephony_provider_for_run
+from api.services.telephony.status_processor import (
+ StatusCallbackRequest,
+ _process_status_update,
+)
+from api.utils.common import get_backend_endpoints
+
+router = APIRouter()
+
+
+# ---------------------------------------------------------------------------
+# Answer webhook — called by Exotel when the outbound call is answered.
+# Returns ExoML that opens the bidirectional Stream.
+# ---------------------------------------------------------------------------
+
+
+@router.post("/exotel-xml", include_in_schema=False)
+async def handle_exotel_xml_webhook(
+ workflow_id: int,
+ user_id: int,
+ workflow_run_id: int,
+ organization_id: int,
+ request: Request,
+):
+ """
+ Handle initial webhook from Exotel when an outbound call is answered.
+ Returns ExoML .
+ """
+ set_current_run_id(workflow_run_id)
+ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
+ provider = await get_telephony_provider_for_run(workflow_run, organization_id)
+
+ form_data = await request.form()
+ callback_data = dict(form_data)
+ logger.info(
+ f"[run {workflow_run_id}] Exotel answer webhook: "
+ f"{json.dumps(callback_data)}"
+ )
+
+ # Exotel sends CallSid in the answer webhook — persist it on the run so
+ # handle_websocket can resolve it later.
+ call_id = callback_data.get("CallSid")
+ if call_id:
+ gathered_context = dict(workflow_run.gathered_context or {})
+ gathered_context["call_id"] = call_id
+ await db_client.update_workflow_run(
+ run_id=workflow_run_id, gathered_context=gathered_context
+ )
+
+ response_content = await provider.get_webhook_response(
+ workflow_id, user_id, workflow_run_id
+ )
+ return HTMLResponse(content=response_content, media_type="application/xml")
+
+
+# ---------------------------------------------------------------------------
+# Status callback — called by Exotel on call completion / state changes.
+# ---------------------------------------------------------------------------
+
+
+@router.post("/exotel/status-callback/{workflow_run_id}")
+async def handle_exotel_status_callback(
+ workflow_run_id: int,
+ request: Request,
+):
+ """Handle Exotel StatusCallback POST."""
+ set_current_run_id(workflow_run_id)
+
+ form_data = await request.form()
+ callback_data = dict(form_data)
+ logger.info(
+ f"[run {workflow_run_id}] Exotel status callback: "
+ f"{json.dumps(callback_data)}"
+ )
+
+ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
+ if not workflow_run:
+ logger.warning(
+ f"[run {workflow_run_id}] Exotel status callback: workflow run not found"
+ )
+ return {"status": "ignored", "reason": "workflow_run_not_found"}
+
+ workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
+ if not workflow:
+ logger.warning(
+ f"[run {workflow_run_id}] Exotel status callback: workflow not found"
+ )
+ return {"status": "ignored", "reason": "workflow_not_found"}
+
+ provider = await get_telephony_provider_for_run(
+ workflow_run, workflow.organization_id
+ )
+
+ parsed_data = provider.parse_status_callback(callback_data)
+ status_update = StatusCallbackRequest(
+ call_id=parsed_data["call_id"],
+ status=parsed_data["status"],
+ from_number=parsed_data.get("from_number"),
+ to_number=parsed_data.get("to_number"),
+ direction=parsed_data.get("direction"),
+ duration=parsed_data.get("duration"),
+ extra=parsed_data.get("extra", {}),
+ )
+
+ await _process_status_update(workflow_run_id, status_update)
+ return {"status": "success"}
diff --git a/api/services/telephony/providers/exotel/serializers.py b/api/services/telephony/providers/exotel/serializers.py
new file mode 100644
index 00000000..4e0f0f25
--- /dev/null
+++ b/api/services/telephony/providers/exotel/serializers.py
@@ -0,0 +1,11 @@
+"""Exotel frame serializer.
+
+Exotel streams audio over WebSocket using the same JSON envelope and μ-law
+8 kHz encoding as Plivo. We re-export PlivoFrameSerializer directly so
+transport.py can import from `.serializers` and we have an obvious place to
+drop a custom subclass later if Exotel diverges.
+"""
+
+from pipecat.serializers.plivo import PlivoFrameSerializer as ExotelFrameSerializer
+
+__all__ = ["ExotelFrameSerializer"]
diff --git a/api/services/telephony/providers/exotel/transport.py b/api/services/telephony/providers/exotel/transport.py
new file mode 100644
index 00000000..2a7120ca
--- /dev/null
+++ b/api/services/telephony/providers/exotel/transport.py
@@ -0,0 +1,69 @@
+"""Exotel transport factory."""
+
+from fastapi import WebSocket
+from pipecat.transports.websocket.fastapi import (
+ FastAPIWebsocketParams,
+ FastAPIWebsocketTransport,
+)
+
+from api.services.pipecat.audio_config import AudioConfig
+from api.services.pipecat.audio_mixer import build_audio_out_mixer
+from api.services.telephony.factory import load_credentials_for_transport
+
+from .serializers import ExotelFrameSerializer
+
+
+async def create_transport(
+ websocket: WebSocket,
+ workflow_run_id: int,
+ audio_config: AudioConfig,
+ organization_id: int,
+ *,
+ ambient_noise_config: dict | None = None,
+ telephony_configuration_id: int | None = None,
+ stream_id: str,
+ call_id: str,
+):
+ """Create a WebSocket transport for an Exotel call leg."""
+ config = await load_credentials_for_transport(
+ organization_id, telephony_configuration_id, expected_provider="exotel"
+ )
+
+ api_key = config.get("api_key")
+ api_token = config.get("api_token")
+
+ if not api_key or not api_token:
+ raise ValueError(
+ f"Incomplete Exotel configuration for organization {organization_id}"
+ )
+
+ # ExotelFrameSerializer is PlivoFrameSerializer under the hood —
+ # same μ-law 8 kHz JSON envelope. The auth_id/auth_token params are used
+ # by Plivo's serializer for optional mid-call REST calls; Exotel doesn't
+ # need them but we pass api_key/api_token for future extensibility.
+ serializer = ExotelFrameSerializer(
+ stream_id=stream_id,
+ call_id=call_id,
+ auth_id=api_key,
+ auth_token=api_token,
+ params=ExotelFrameSerializer.InputParams(
+ plivo_sample_rate=8000,
+ sample_rate=audio_config.pipeline_sample_rate,
+ ),
+ )
+
+ mixer = await build_audio_out_mixer(
+ audio_config.transport_out_sample_rate, ambient_noise_config
+ )
+
+ return FastAPIWebsocketTransport(
+ websocket=websocket,
+ params=FastAPIWebsocketParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ audio_in_sample_rate=audio_config.transport_in_sample_rate,
+ audio_out_sample_rate=audio_config.transport_out_sample_rate,
+ audio_out_mixer=mixer,
+ serializer=serializer,
+ ),
+ )