diff --git a/api/services/telephony/providers/exotel/__init__.py b/api/services/telephony/providers/exotel/__init__.py index 945d2f75..7dceaeeb 100644 --- a/api/services/telephony/providers/exotel/__init__.py +++ b/api/services/telephony/providers/exotel/__init__.py @@ -76,13 +76,14 @@ _UI_METADATA = ProviderUIMetadata( ), ProviderUIField( name="app_id", - label="App ID (optional)", + label="App Bazaar App ID", type="text", sensitive=False, - required=False, + required=True, description=( - "Exotel App Bazaar flow ID for inbound call routing. " - "Leave blank if you are configuring the answer URL manually." + "App ID from Exotel App Bazaar (My Apps → App ID column). " + "In that app's flow, set the WebSocket URL to: " + "wss://{your-backend-domain}/api/v1/telephony/exotel/stream" ), ), ], diff --git a/api/services/telephony/providers/exotel/config.py b/api/services/telephony/providers/exotel/config.py index 43f90613..ceffcf95 100644 --- a/api/services/telephony/providers/exotel/config.py +++ b/api/services/telephony/providers/exotel/config.py @@ -1,6 +1,6 @@ """Exotel telephony configuration schemas.""" -from typing import List, Literal, Optional +from typing import List, Literal from pydantic import BaseModel, Field @@ -23,12 +23,12 @@ class ExotelConfigurationRequest(BaseModel): default_factory=list, description="List of Exotel ExoPhone numbers (CallerIds) used for outbound calls", ) - app_id: Optional[str] = Field( - default=None, + app_id: str = Field( + ..., description=( - "Exotel App ID (from App Bazaar → My Apps). " - "When set, used as the Url for inbound call flows. " - "Leave blank if managing inbound via the Dograh answer URL." + "Exotel App ID (from App Bazaar → My Apps → App ID column). " + "Used to build the Url for outbound calls: " + "http://my.exotel.com/{account_sid}/exoml/start_voice/{app_id}" ), ) diff --git a/api/services/telephony/providers/exotel/provider.py b/api/services/telephony/providers/exotel/provider.py index 7eb754a2..e8c99e64 100644 --- a/api/services/telephony/providers/exotel/provider.py +++ b/api/services/telephony/providers/exotel/provider.py @@ -90,15 +90,24 @@ class ExotelProvider(TelephonyProvider): ``CallerId`` — the ExoPhone (Exotel virtual number) shown as caller ID. ``Url`` — Exotel app/flow URL; we point it at our answer webhook. """ - if not self.validate_config(): - raise ValueError("Exotel provider not properly configured") + if not self.app_id: + raise ValueError( + "Exotel app_id is required. Set it to the App Bazaar App ID " + "(App Bazaar → My Apps → App ID column)." + ) caller_id = from_number or random.choice(self.from_numbers) + # Build the App Bazaar flow URL — this is what Exotel calls when the + # call is answered. The flow has the WebSocket URL pre-configured. + app_bazaar_url = ( + f"http://my.exotel.com/{self.account_sid}/exoml/start_voice/{self.app_id}" + ) + data: Dict[str, Any] = { "From": to_number, "CallerId": caller_id, - "Url": webhook_url, + "Url": app_bazaar_url, "CallType": "trans", # transactional — no recording by default } @@ -207,6 +216,7 @@ class ExotelProvider(TelephonyProvider): and self.api_token and self.account_sid and self.from_numbers + and self.app_id ) # ------------------------------------------------------------------------- diff --git a/api/services/telephony/providers/exotel/routes.py b/api/services/telephony/providers/exotel/routes.py index 2a9935be..9c0b472a 100644 --- a/api/services/telephony/providers/exotel/routes.py +++ b/api/services/telephony/providers/exotel/routes.py @@ -1,70 +1,172 @@ """Exotel telephony routes. Mounted under /api/v1/telephony by api.routes.telephony via importlib. + +Architecture (App Bazaar flow): + 1. User configures an Exotel App Bazaar flow with WebSocket URL: + wss://{BACKEND}/api/v1/telephony/exotel/stream + 2. When initiating an outbound call, we pass the App Bazaar URL as `Url`. + 3. Exotel calls the number, answers, then connects to our fixed WebSocket. + 4. We identify the call by CallSid (stored on workflow_run at initiation). """ import json -from typing import Optional -from fastapi import APIRouter, Request +from fastapi import APIRouter, WebSocket, WebSocketDisconnect from loguru import logger from pipecat.utils.run_context import set_current_run_id -from starlette.responses import HTMLResponse +from sqlalchemy import select, text from api.db import db_client -from api.services.telephony.factory import get_telephony_provider_for_run +from api.db.models import WorkflowRun from api.services.telephony.status_processor import ( StatusCallbackRequest, _process_status_update, ) -from api.utils.common import get_backend_endpoints router = APIRouter() # --------------------------------------------------------------------------- -# Answer webhook — called by Exotel when the outbound call is answered. -# Returns ExoML that opens the bidirectional Stream. +# Fixed WebSocket stream endpoint — URL configured in Exotel App Bazaar. +# Exotel connects here when a call is answered (outbound or inbound). # --------------------------------------------------------------------------- -@router.post("/exotel-xml", include_in_schema=False) -async def handle_exotel_xml_webhook( - workflow_id: int, - user_id: int, - workflow_run_id: int, - organization_id: int, - request: Request, -): +@router.websocket("/exotel/stream") +async def exotel_stream(websocket: WebSocket): """ - Handle initial webhook from Exotel when an outbound call is answered. - Returns ExoML . + Fixed WebSocket endpoint pre-configured in the Exotel App Bazaar flow. + + Exotel sends a 'start' event with CallSid. We look up the workflow_run + that has that CallSid (stored at call-initiation time) and run the + pipecat pipeline for it. """ - set_current_run_id(workflow_run_id) - workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) - provider = await get_telephony_provider_for_run(workflow_run, organization_id) + await websocket.accept() - form_data = await request.form() - callback_data = dict(form_data) - logger.info( - f"[run {workflow_run_id}] Exotel answer webhook: " - f"{json.dumps(callback_data)}" - ) + # ------------------------------------------------------------------ + # 1. Read the start event + # ------------------------------------------------------------------ + try: + raw = await websocket.receive_text() + except WebSocketDisconnect: + logger.warning("[Exotel stream] WebSocket disconnected before start event") + return - # Exotel sends CallSid in the answer webhook — persist it on the run so - # handle_websocket can resolve it later. - call_id = callback_data.get("CallSid") - if call_id: - gathered_context = dict(workflow_run.gathered_context or {}) - gathered_context["call_id"] = call_id - await db_client.update_workflow_run( - run_id=workflow_run_id, gathered_context=gathered_context + logger.info(f"[Exotel stream] Start message raw: {raw}") + + try: + start_msg = json.loads(raw) + except json.JSONDecodeError: + logger.error(f"[Exotel stream] Non-JSON start message: {raw}") + await websocket.close(code=4400, reason="Expected JSON start event") + return + + event_type = (start_msg.get("event") or start_msg.get("Event") or "").lower() + if event_type != "start": + logger.error( + f"[Exotel stream] Expected 'start' event, got: {event_type!r}. " + f"Full message: {start_msg}" ) + await websocket.close(code=4400, reason="Expected start event") + return - response_content = await provider.get_webhook_response( - workflow_id, user_id, workflow_run_id + # Exotel may nest stream metadata under 'start' or at top level. + start_data = start_msg.get("start") or start_msg + call_sid = ( + start_data.get("callSid") + or start_data.get("CallSid") + or start_data.get("call_sid") + or start_msg.get("callSid") + or start_msg.get("CallSid") ) - return HTMLResponse(content=response_content, media_type="application/xml") + stream_id = ( + start_data.get("streamId") + or start_data.get("StreamId") + or start_msg.get("streamId") + or "" + ) + + logger.info( + f"[Exotel stream] callSid={call_sid!r} streamId={stream_id!r}" + ) + + if not call_sid: + logger.error( + f"[Exotel stream] Missing callSid in start event. Full msg: {start_msg}" + ) + await websocket.close(code=4400, reason="Missing callSid") + return + + # ------------------------------------------------------------------ + # 2. Look up the workflow_run by callSid stored in gathered_context + # ------------------------------------------------------------------ + workflow_run = await _find_workflow_run_by_call_sid(call_sid) + + if not workflow_run: + logger.error( + f"[Exotel stream] No workflow_run found for callSid={call_sid}. " + "Ensure the call was initiated via Dograh before Exotel connects." + ) + await websocket.close(code=4404, reason="Workflow run not found for call") + return + + workflow_run_id = workflow_run.id + workflow_id = workflow_run.workflow_id + set_current_run_id(workflow_run_id) + + # Resolve user_id from the workflow + workflow = await db_client.get_workflow_by_id(workflow_id) + if not workflow: + logger.error(f"[Exotel stream] Workflow {workflow_id} not found") + await websocket.close(code=4404, reason="Workflow not found") + return + + user_id = workflow.user_id + organization_id = workflow.organization_id + + logger.info( + f"[Exotel stream] Matched callSid={call_sid} → " + f"workflow_run_id={workflow_run_id} workflow_id={workflow_id}" + ) + + # ------------------------------------------------------------------ + # 3. Run the pipeline — same as other telephony providers + # ------------------------------------------------------------------ + from api.services.pipecat.run_pipeline import run_pipeline_telephony + + await run_pipeline_telephony( + websocket, + provider_name="exotel", + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + user_id=user_id, + call_id=call_sid, + transport_kwargs={"stream_id": stream_id, "call_id": call_sid}, + ) + + +async def _find_workflow_run_by_call_sid(call_sid: str): + """ + Find the most recent workflow_run whose gathered_context contains + {'call_id': call_sid}. Uses a JSONB containment query (PostgreSQL). + """ + try: + async with db_client.get_session() as session: + result = await session.execute( + select(WorkflowRun) + .where( + WorkflowRun.gathered_context.op("@>")( + text(f"'{{\"call_id\": \"{call_sid}\"}}'::jsonb") + ) + ) + .order_by(WorkflowRun.created_at.desc()) + .limit(1) + ) + return result.scalar_one_or_none() + except Exception as exc: + logger.error(f"[Exotel stream] DB lookup failed for callSid={call_sid}: {exc}") + return None # --------------------------------------------------------------------------- @@ -75,9 +177,12 @@ async def handle_exotel_xml_webhook( @router.post("/exotel/status-callback/{workflow_run_id}") async def handle_exotel_status_callback( workflow_run_id: int, - request: Request, + request, ): """Handle Exotel StatusCallback POST.""" + from fastapi import Request + + request: Request set_current_run_id(workflow_run_id) form_data = await request.form() @@ -101,6 +206,8 @@ async def handle_exotel_status_callback( ) return {"status": "ignored", "reason": "workflow_not_found"} + from api.services.telephony.factory import get_telephony_provider_for_run + provider = await get_telephony_provider_for_run( workflow_run, workflow.organization_id )