exotel support fix

This commit is contained in:
Hridayesh Gupta 2026-05-12 22:51:34 +05:30
parent 84c4c57799
commit 64d6fff1f5
4 changed files with 169 additions and 51 deletions

View file

@ -76,13 +76,14 @@ _UI_METADATA = ProviderUIMetadata(
),
ProviderUIField(
name="app_id",
label="App ID (optional)",
label="App Bazaar App ID",
type="text",
sensitive=False,
required=False,
required=True,
description=(
"Exotel App Bazaar flow ID for inbound call routing. "
"Leave blank if you are configuring the answer URL manually."
"App ID from Exotel App Bazaar (My Apps → App ID column). "
"In that app's flow, set the WebSocket URL to: "
"wss://{your-backend-domain}/api/v1/telephony/exotel/stream"
),
),
],

View file

@ -1,6 +1,6 @@
"""Exotel telephony configuration schemas."""
from typing import List, Literal, Optional
from typing import List, Literal
from pydantic import BaseModel, Field
@ -23,12 +23,12 @@ class ExotelConfigurationRequest(BaseModel):
default_factory=list,
description="List of Exotel ExoPhone numbers (CallerIds) used for outbound calls",
)
app_id: Optional[str] = Field(
default=None,
app_id: str = Field(
...,
description=(
"Exotel App ID (from App Bazaar → My Apps). "
"When set, used as the Url for inbound call flows. "
"Leave blank if managing inbound via the Dograh answer URL."
"Exotel App ID (from App Bazaar → My Apps → App ID column). "
"Used to build the Url for outbound calls: "
"http://my.exotel.com/{account_sid}/exoml/start_voice/{app_id}"
),
)

View file

@ -90,15 +90,24 @@ class ExotelProvider(TelephonyProvider):
``CallerId`` the ExoPhone (Exotel virtual number) shown as caller ID.
``Url`` Exotel app/flow URL; we point it at our answer webhook.
"""
if not self.validate_config():
raise ValueError("Exotel provider not properly configured")
if not self.app_id:
raise ValueError(
"Exotel app_id is required. Set it to the App Bazaar App ID "
"(App Bazaar → My Apps → App ID column)."
)
caller_id = from_number or random.choice(self.from_numbers)
# Build the App Bazaar flow URL — this is what Exotel calls when the
# call is answered. The flow has the WebSocket URL pre-configured.
app_bazaar_url = (
f"http://my.exotel.com/{self.account_sid}/exoml/start_voice/{self.app_id}"
)
data: Dict[str, Any] = {
"From": to_number,
"CallerId": caller_id,
"Url": webhook_url,
"Url": app_bazaar_url,
"CallType": "trans", # transactional — no recording by default
}
@ -207,6 +216,7 @@ class ExotelProvider(TelephonyProvider):
and self.api_token
and self.account_sid
and self.from_numbers
and self.app_id
)
# -------------------------------------------------------------------------

View file

@ -1,70 +1,172 @@
"""Exotel telephony routes.
Mounted under /api/v1/telephony by api.routes.telephony via importlib.
Architecture (App Bazaar flow):
1. User configures an Exotel App Bazaar flow with WebSocket URL:
wss://{BACKEND}/api/v1/telephony/exotel/stream
2. When initiating an outbound call, we pass the App Bazaar URL as `Url`.
3. Exotel calls the number, answers, then connects to our fixed WebSocket.
4. We identify the call by CallSid (stored on workflow_run at initiation).
"""
import json
from typing import Optional
from fastapi import APIRouter, Request
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from starlette.responses import HTMLResponse
from sqlalchemy import select, text
from api.db import db_client
from api.services.telephony.factory import get_telephony_provider_for_run
from api.db.models import WorkflowRun
from api.services.telephony.status_processor import (
StatusCallbackRequest,
_process_status_update,
)
from api.utils.common import get_backend_endpoints
router = APIRouter()
# ---------------------------------------------------------------------------
# Answer webhook — called by Exotel when the outbound call is answered.
# Returns ExoML that opens the bidirectional Stream.
# Fixed WebSocket stream endpoint — URL configured in Exotel App Bazaar.
# Exotel connects here when a call is answered (outbound or inbound).
# ---------------------------------------------------------------------------
@router.post("/exotel-xml", include_in_schema=False)
async def handle_exotel_xml_webhook(
workflow_id: int,
user_id: int,
workflow_run_id: int,
organization_id: int,
request: Request,
):
@router.websocket("/exotel/stream")
async def exotel_stream(websocket: WebSocket):
"""
Handle initial webhook from Exotel when an outbound call is answered.
Returns ExoML <Response><Stream /></Response>.
Fixed WebSocket endpoint pre-configured in the Exotel App Bazaar flow.
Exotel sends a 'start' event with CallSid. We look up the workflow_run
that has that CallSid (stored at call-initiation time) and run the
pipecat pipeline for it.
"""
set_current_run_id(workflow_run_id)
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
provider = await get_telephony_provider_for_run(workflow_run, organization_id)
await websocket.accept()
form_data = await request.form()
callback_data = dict(form_data)
logger.info(
f"[run {workflow_run_id}] Exotel answer webhook: "
f"{json.dumps(callback_data)}"
)
# ------------------------------------------------------------------
# 1. Read the start event
# ------------------------------------------------------------------
try:
raw = await websocket.receive_text()
except WebSocketDisconnect:
logger.warning("[Exotel stream] WebSocket disconnected before start event")
return
# Exotel sends CallSid in the answer webhook — persist it on the run so
# handle_websocket can resolve it later.
call_id = callback_data.get("CallSid")
if call_id:
gathered_context = dict(workflow_run.gathered_context or {})
gathered_context["call_id"] = call_id
await db_client.update_workflow_run(
run_id=workflow_run_id, gathered_context=gathered_context
logger.info(f"[Exotel stream] Start message raw: {raw}")
try:
start_msg = json.loads(raw)
except json.JSONDecodeError:
logger.error(f"[Exotel stream] Non-JSON start message: {raw}")
await websocket.close(code=4400, reason="Expected JSON start event")
return
event_type = (start_msg.get("event") or start_msg.get("Event") or "").lower()
if event_type != "start":
logger.error(
f"[Exotel stream] Expected 'start' event, got: {event_type!r}. "
f"Full message: {start_msg}"
)
await websocket.close(code=4400, reason="Expected start event")
return
response_content = await provider.get_webhook_response(
workflow_id, user_id, workflow_run_id
# Exotel may nest stream metadata under 'start' or at top level.
start_data = start_msg.get("start") or start_msg
call_sid = (
start_data.get("callSid")
or start_data.get("CallSid")
or start_data.get("call_sid")
or start_msg.get("callSid")
or start_msg.get("CallSid")
)
return HTMLResponse(content=response_content, media_type="application/xml")
stream_id = (
start_data.get("streamId")
or start_data.get("StreamId")
or start_msg.get("streamId")
or ""
)
logger.info(
f"[Exotel stream] callSid={call_sid!r} streamId={stream_id!r}"
)
if not call_sid:
logger.error(
f"[Exotel stream] Missing callSid in start event. Full msg: {start_msg}"
)
await websocket.close(code=4400, reason="Missing callSid")
return
# ------------------------------------------------------------------
# 2. Look up the workflow_run by callSid stored in gathered_context
# ------------------------------------------------------------------
workflow_run = await _find_workflow_run_by_call_sid(call_sid)
if not workflow_run:
logger.error(
f"[Exotel stream] No workflow_run found for callSid={call_sid}. "
"Ensure the call was initiated via Dograh before Exotel connects."
)
await websocket.close(code=4404, reason="Workflow run not found for call")
return
workflow_run_id = workflow_run.id
workflow_id = workflow_run.workflow_id
set_current_run_id(workflow_run_id)
# Resolve user_id from the workflow
workflow = await db_client.get_workflow_by_id(workflow_id)
if not workflow:
logger.error(f"[Exotel stream] Workflow {workflow_id} not found")
await websocket.close(code=4404, reason="Workflow not found")
return
user_id = workflow.user_id
organization_id = workflow.organization_id
logger.info(
f"[Exotel stream] Matched callSid={call_sid}"
f"workflow_run_id={workflow_run_id} workflow_id={workflow_id}"
)
# ------------------------------------------------------------------
# 3. Run the pipeline — same as other telephony providers
# ------------------------------------------------------------------
from api.services.pipecat.run_pipeline import run_pipeline_telephony
await run_pipeline_telephony(
websocket,
provider_name="exotel",
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
user_id=user_id,
call_id=call_sid,
transport_kwargs={"stream_id": stream_id, "call_id": call_sid},
)
async def _find_workflow_run_by_call_sid(call_sid: str):
"""
Find the most recent workflow_run whose gathered_context contains
{'call_id': call_sid}. Uses a JSONB containment query (PostgreSQL).
"""
try:
async with db_client.get_session() as session:
result = await session.execute(
select(WorkflowRun)
.where(
WorkflowRun.gathered_context.op("@>")(
text(f"'{{\"call_id\": \"{call_sid}\"}}'::jsonb")
)
)
.order_by(WorkflowRun.created_at.desc())
.limit(1)
)
return result.scalar_one_or_none()
except Exception as exc:
logger.error(f"[Exotel stream] DB lookup failed for callSid={call_sid}: {exc}")
return None
# ---------------------------------------------------------------------------
@ -75,9 +177,12 @@ async def handle_exotel_xml_webhook(
@router.post("/exotel/status-callback/{workflow_run_id}")
async def handle_exotel_status_callback(
workflow_run_id: int,
request: Request,
request,
):
"""Handle Exotel StatusCallback POST."""
from fastapi import Request
request: Request
set_current_run_id(workflow_run_id)
form_data = await request.form()
@ -101,6 +206,8 @@ async def handle_exotel_status_callback(
)
return {"status": "ignored", "reason": "workflow_not_found"}
from api.services.telephony.factory import get_telephony_provider_for_run
provider = await get_telephony_provider_for_run(
workflow_run, workflow.organization_id
)