dograh/api/routes/twilio.py

259 lines
9.2 KiB
Python
Raw Permalink Normal View History

2025-09-09 14:37:32 +05:30
import json
import random
from datetime import UTC, datetime
from typing import Annotated, Optional
from fastapi import APIRouter, Depends, Form, Header, HTTPException, Request, WebSocket
from loguru import logger
from pipecat.utils.context import set_current_run_id
from pydantic import BaseModel
from starlette.responses import HTMLResponse
from api.db import db_client
from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey, WorkflowRunMode
from api.services.auth.depends import get_user
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
from api.services.campaign.campaign_event_publisher import (
get_campaign_event_publisher,
)
from api.services.pipecat.run_pipeline import run_pipeline_twilio
from api.services.telephony.twilio import TwilioService
router = APIRouter(prefix="/twilio")
class InitiateCallRequest(BaseModel):
workflow_id: int
workflow_run_id: int | None = None
class TwilioStatusCallbackRequest(BaseModel):
CallSid: str
CallStatus: str
From: Optional[str] = None
To: Optional[str] = None
Direction: Optional[str] = None
Duration: Optional[str] = None
CallDuration: Optional[str] = None
RecordingUrl: Optional[str] = None
RecordingSid: Optional[str] = None
Timestamp: Optional[str] = None
@router.post("/initiate-call")
async def initiate_call(
request: InitiateCallRequest, user: UserModel = Depends(get_user)
):
# Check if organization has TWILIO_PHONE_NUMBERS configured
twilio_config = await db_client.get_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.TWILIO_PHONE_NUMBERS.value,
)
if (
not twilio_config
or not twilio_config.value
or not twilio_config.value.get("value")
):
raise HTTPException(
status_code=401,
detail="Your organisation is not allowed to make phone call. Contact founders@dograh.com for further support.",
)
user_configuration = await db_client.get_user_configurations(user.id)
workflow_run_id = request.workflow_run_id
if not workflow_run_id:
workflow_run_name = f"WR-TEL-{random.randint(1000, 9999)}"
workflow_run = await db_client.create_workflow_run(
workflow_run_name,
request.workflow_id,
WorkflowRunMode.TWILIO.value,
initial_context={
"phone_number": user_configuration.test_phone_number,
},
user_id=user.id,
)
workflow_run_id = workflow_run.id
else:
workflow_run = await db_client.get_workflow_run(workflow_run_id, user.id)
if not workflow_run:
raise HTTPException(status_code=400, detail="Workflow run not found")
workflow_run_name = workflow_run.name
if user_configuration.test_phone_number:
await TwilioService().initiate_call(
to_number=user_configuration.test_phone_number,
url_args={
"workflow_id": request.workflow_id,
"user_id": user.id,
"workflow_run_id": workflow_run_id,
},
workflow_run_id=workflow_run_id,
organization_id=user.selected_organization_id,
)
return {
"message": f"Call initiated successfully with run name {workflow_run_name}"
}
else:
raise HTTPException(status_code=400, detail="Test phone number not set")
@router.post("/twiml", include_in_schema=False)
async def start_call(workflow_id: int, user_id: int, workflow_run_id: int):
twiml_content = await TwilioService().get_start_call_twiml(
workflow_id, user_id, workflow_run_id
)
return HTMLResponse(content=twiml_content, media_type="application/xml")
@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
async def websocket_endpoint(
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
):
await websocket.accept()
try:
# "connected" (ignore)
msg = json.loads(await websocket.receive_text())
if msg.get("event") != "connected":
raise RuntimeError("Expected connected message first")
# "start" this has everything we need
start_msg = await websocket.receive_text()
# set the run context
set_current_run_id(workflow_run_id)
logger.debug(f"Received start message: {start_msg}")
start_msg = json.loads(start_msg)
if start_msg.get("event") != "start":
raise RuntimeError("Expected start message second")
try:
stream_sid = start_msg["start"]["streamSid"]
call_sid = start_msg["start"]["callSid"]
except KeyError:
logger.error(
"Missing callSID and streamSID in start message. Closing connection."
)
await websocket.close(code=4400, reason="Missing or bad start message")
return
# Run your Pipecat bot
await run_pipeline_twilio(
websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id
)
except Exception as e:
logger.error(f"Error in Twilio WebSocket connection: {e}")
await websocket.close(1011, "Internal server error")
@router.post("/status-callback/{workflow_run_id}", include_in_schema=False)
async def status_callback(
request: Request,
workflow_run_id: int,
x_twilio_signature: Annotated[
Optional[str], Header(alias="X-Twilio-Signature")
] = None,
CallSid: str = Form(...),
CallStatus: str = Form(...),
From: Optional[str] = Form(None),
To: Optional[str] = Form(None),
Direction: Optional[str] = Form(None),
Duration: Optional[str] = Form(None),
CallDuration: Optional[str] = Form(None),
RecordingUrl: Optional[str] = Form(None),
RecordingSid: Optional[str] = Form(None),
Timestamp: Optional[str] = Form(None),
):
"""Handle Twilio status callbacks for call lifecycle events."""
try:
# TODO: Implement Twilio signature verification
# Create callback data object
callback_data = {
"CallSid": CallSid,
"CallStatus": CallStatus,
"From": From,
"To": To,
"Direction": Direction,
"Duration": Duration,
"CallDuration": CallDuration,
"RecordingUrl": RecordingUrl,
"RecordingSid": RecordingSid,
"Timestamp": Timestamp,
}
# Remove None values for cleaner logging
callback_data = {k: v for k, v in callback_data.items() if v is not None}
logger.info(
f"Received Twilio status callback for workflow_run_id {workflow_run_id}: {CallStatus}"
)
# Get the current workflow run
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.error(f"Workflow run {workflow_run_id} not found for callback")
return {"status": "error", "message": "Workflow run not found"}
callback_logs = workflow_run.logs.get("twilio_status_callbacks", [])
# Add new callback log entry to logs
callback_log = {
"status": CallStatus,
"timestamp": datetime.now(UTC).isoformat(),
"data": callback_data,
}
callback_logs.append(callback_log)
# Update the workflow run with the new logs
await db_client.update_workflow_run(
run_id=workflow_run_id, logs={"twilio_status_callbacks": callback_logs}
)
# Release concurrent slot when call ends (for any terminal status)
terminal_statuses = ["completed", "busy", "no-answer", "failed", "canceled"]
if CallStatus.lower() in terminal_statuses and workflow_run.campaign_id:
# Release the concurrent slot for this call
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
# Check if retry is needed for campaign calls
if (
CallStatus.lower() in ["busy", "no-answer", "failed"]
and workflow_run.campaign_id
):
# Lets retry for busy and no-answer
if CallStatus.lower() in ["busy", "no-answer"]:
publisher = await get_campaign_event_publisher()
await publisher.publish_retry_needed(
workflow_run_id=workflow_run_id,
reason=CallStatus.lower().replace(
"-", "_"
), # Convert no-answer to no_answer
campaign_id=workflow_run.campaign_id,
queued_run_id=workflow_run.queued_run_id,
)
# Update workflow run with appropriate tags
call_tags = workflow_run.gathered_context.get("call_tags", [])
call_tags.extend(["not_connected", f"twilio_{CallStatus.lower()}"])
await db_client.update_workflow_run(
run_id=workflow_run_id,
is_completed=True,
gathered_context={
"call_tags": call_tags,
},
)
return {"status": "success", "message": "Callback processed"}
except Exception as e:
logger.error(f"Error processing Twilio status callback: {e}")
return {"status": "error", "message": str(e)}