mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-10 08:05:22 +02:00
259 lines
9.2 KiB
Python
259 lines
9.2 KiB
Python
|
|
import json
|
|||
|
|
import random
|
|||
|
|
from datetime import UTC, datetime
|
|||
|
|
from typing import Annotated, Optional
|
|||
|
|
|
|||
|
|
from fastapi import APIRouter, Depends, Form, Header, HTTPException, Request, WebSocket
|
|||
|
|
from loguru import logger
|
|||
|
|
from pipecat.utils.context import set_current_run_id
|
|||
|
|
from pydantic import BaseModel
|
|||
|
|
from starlette.responses import HTMLResponse
|
|||
|
|
|
|||
|
|
from api.db import db_client
|
|||
|
|
from api.db.models import UserModel
|
|||
|
|
from api.enums import OrganizationConfigurationKey, WorkflowRunMode
|
|||
|
|
from api.services.auth.depends import get_user
|
|||
|
|
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
|
|||
|
|
from api.services.campaign.campaign_event_publisher import (
|
|||
|
|
get_campaign_event_publisher,
|
|||
|
|
)
|
|||
|
|
from api.services.pipecat.run_pipeline import run_pipeline_twilio
|
|||
|
|
from api.services.telephony.twilio import TwilioService
|
|||
|
|
|
|||
|
|
router = APIRouter(prefix="/twilio")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class InitiateCallRequest(BaseModel):
|
|||
|
|
workflow_id: int
|
|||
|
|
workflow_run_id: int | None = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TwilioStatusCallbackRequest(BaseModel):
|
|||
|
|
CallSid: str
|
|||
|
|
CallStatus: str
|
|||
|
|
From: Optional[str] = None
|
|||
|
|
To: Optional[str] = None
|
|||
|
|
Direction: Optional[str] = None
|
|||
|
|
Duration: Optional[str] = None
|
|||
|
|
CallDuration: Optional[str] = None
|
|||
|
|
RecordingUrl: Optional[str] = None
|
|||
|
|
RecordingSid: Optional[str] = None
|
|||
|
|
Timestamp: Optional[str] = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.post("/initiate-call")
|
|||
|
|
async def initiate_call(
|
|||
|
|
request: InitiateCallRequest, user: UserModel = Depends(get_user)
|
|||
|
|
):
|
|||
|
|
# Check if organization has TWILIO_PHONE_NUMBERS configured
|
|||
|
|
twilio_config = await db_client.get_configuration(
|
|||
|
|
user.selected_organization_id,
|
|||
|
|
OrganizationConfigurationKey.TWILIO_PHONE_NUMBERS.value,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if (
|
|||
|
|
not twilio_config
|
|||
|
|
or not twilio_config.value
|
|||
|
|
or not twilio_config.value.get("value")
|
|||
|
|
):
|
|||
|
|
raise HTTPException(
|
|||
|
|
status_code=401,
|
|||
|
|
detail="Your organisation is not allowed to make phone call. Contact founders@dograh.com for further support.",
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
user_configuration = await db_client.get_user_configurations(user.id)
|
|||
|
|
|
|||
|
|
workflow_run_id = request.workflow_run_id
|
|||
|
|
|
|||
|
|
if not workflow_run_id:
|
|||
|
|
workflow_run_name = f"WR-TEL-{random.randint(1000, 9999)}"
|
|||
|
|
workflow_run = await db_client.create_workflow_run(
|
|||
|
|
workflow_run_name,
|
|||
|
|
request.workflow_id,
|
|||
|
|
WorkflowRunMode.TWILIO.value,
|
|||
|
|
initial_context={
|
|||
|
|
"phone_number": user_configuration.test_phone_number,
|
|||
|
|
},
|
|||
|
|
user_id=user.id,
|
|||
|
|
)
|
|||
|
|
workflow_run_id = workflow_run.id
|
|||
|
|
else:
|
|||
|
|
workflow_run = await db_client.get_workflow_run(workflow_run_id, user.id)
|
|||
|
|
if not workflow_run:
|
|||
|
|
raise HTTPException(status_code=400, detail="Workflow run not found")
|
|||
|
|
workflow_run_name = workflow_run.name
|
|||
|
|
|
|||
|
|
if user_configuration.test_phone_number:
|
|||
|
|
await TwilioService().initiate_call(
|
|||
|
|
to_number=user_configuration.test_phone_number,
|
|||
|
|
url_args={
|
|||
|
|
"workflow_id": request.workflow_id,
|
|||
|
|
"user_id": user.id,
|
|||
|
|
"workflow_run_id": workflow_run_id,
|
|||
|
|
},
|
|||
|
|
workflow_run_id=workflow_run_id,
|
|||
|
|
organization_id=user.selected_organization_id,
|
|||
|
|
)
|
|||
|
|
return {
|
|||
|
|
"message": f"Call initiated successfully with run name {workflow_run_name}"
|
|||
|
|
}
|
|||
|
|
else:
|
|||
|
|
raise HTTPException(status_code=400, detail="Test phone number not set")
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.post("/twiml", include_in_schema=False)
|
|||
|
|
async def start_call(workflow_id: int, user_id: int, workflow_run_id: int):
|
|||
|
|
twiml_content = await TwilioService().get_start_call_twiml(
|
|||
|
|
workflow_id, user_id, workflow_run_id
|
|||
|
|
)
|
|||
|
|
return HTMLResponse(content=twiml_content, media_type="application/xml")
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.websocket("/ws/{workflow_id}/{user_id}/{workflow_run_id}")
|
|||
|
|
async def websocket_endpoint(
|
|||
|
|
websocket: WebSocket, workflow_id: int, user_id: int, workflow_run_id: int
|
|||
|
|
):
|
|||
|
|
await websocket.accept()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# "connected" (ignore)
|
|||
|
|
msg = json.loads(await websocket.receive_text())
|
|||
|
|
if msg.get("event") != "connected":
|
|||
|
|
raise RuntimeError("Expected connected message first")
|
|||
|
|
|
|||
|
|
# "start" – this has everything we need
|
|||
|
|
start_msg = await websocket.receive_text()
|
|||
|
|
|
|||
|
|
# set the run context
|
|||
|
|
set_current_run_id(workflow_run_id)
|
|||
|
|
|
|||
|
|
logger.debug(f"Received start message: {start_msg}")
|
|||
|
|
|
|||
|
|
start_msg = json.loads(start_msg)
|
|||
|
|
if start_msg.get("event") != "start":
|
|||
|
|
raise RuntimeError("Expected start message second")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
stream_sid = start_msg["start"]["streamSid"]
|
|||
|
|
call_sid = start_msg["start"]["callSid"]
|
|||
|
|
except KeyError:
|
|||
|
|
logger.error(
|
|||
|
|
"Missing callSID and streamSID in start message. Closing connection."
|
|||
|
|
)
|
|||
|
|
await websocket.close(code=4400, reason="Missing or bad start message")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# Run your Pipecat bot
|
|||
|
|
await run_pipeline_twilio(
|
|||
|
|
websocket, stream_sid, call_sid, workflow_id, workflow_run_id, user_id
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Error in Twilio WebSocket connection: {e}")
|
|||
|
|
await websocket.close(1011, "Internal server error")
|
|||
|
|
|
|||
|
|
|
|||
|
|
@router.post("/status-callback/{workflow_run_id}", include_in_schema=False)
|
|||
|
|
async def status_callback(
|
|||
|
|
request: Request,
|
|||
|
|
workflow_run_id: int,
|
|||
|
|
x_twilio_signature: Annotated[
|
|||
|
|
Optional[str], Header(alias="X-Twilio-Signature")
|
|||
|
|
] = None,
|
|||
|
|
CallSid: str = Form(...),
|
|||
|
|
CallStatus: str = Form(...),
|
|||
|
|
From: Optional[str] = Form(None),
|
|||
|
|
To: Optional[str] = Form(None),
|
|||
|
|
Direction: Optional[str] = Form(None),
|
|||
|
|
Duration: Optional[str] = Form(None),
|
|||
|
|
CallDuration: Optional[str] = Form(None),
|
|||
|
|
RecordingUrl: Optional[str] = Form(None),
|
|||
|
|
RecordingSid: Optional[str] = Form(None),
|
|||
|
|
Timestamp: Optional[str] = Form(None),
|
|||
|
|
):
|
|||
|
|
"""Handle Twilio status callbacks for call lifecycle events."""
|
|||
|
|
try:
|
|||
|
|
# TODO: Implement Twilio signature verification
|
|||
|
|
|
|||
|
|
# Create callback data object
|
|||
|
|
callback_data = {
|
|||
|
|
"CallSid": CallSid,
|
|||
|
|
"CallStatus": CallStatus,
|
|||
|
|
"From": From,
|
|||
|
|
"To": To,
|
|||
|
|
"Direction": Direction,
|
|||
|
|
"Duration": Duration,
|
|||
|
|
"CallDuration": CallDuration,
|
|||
|
|
"RecordingUrl": RecordingUrl,
|
|||
|
|
"RecordingSid": RecordingSid,
|
|||
|
|
"Timestamp": Timestamp,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# Remove None values for cleaner logging
|
|||
|
|
callback_data = {k: v for k, v in callback_data.items() if v is not None}
|
|||
|
|
|
|||
|
|
logger.info(
|
|||
|
|
f"Received Twilio status callback for workflow_run_id {workflow_run_id}: {CallStatus}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Get the current workflow run
|
|||
|
|
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
|
|||
|
|
if not workflow_run:
|
|||
|
|
logger.error(f"Workflow run {workflow_run_id} not found for callback")
|
|||
|
|
return {"status": "error", "message": "Workflow run not found"}
|
|||
|
|
|
|||
|
|
callback_logs = workflow_run.logs.get("twilio_status_callbacks", [])
|
|||
|
|
|
|||
|
|
# Add new callback log entry to logs
|
|||
|
|
callback_log = {
|
|||
|
|
"status": CallStatus,
|
|||
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|||
|
|
"data": callback_data,
|
|||
|
|
}
|
|||
|
|
callback_logs.append(callback_log)
|
|||
|
|
|
|||
|
|
# Update the workflow run with the new logs
|
|||
|
|
await db_client.update_workflow_run(
|
|||
|
|
run_id=workflow_run_id, logs={"twilio_status_callbacks": callback_logs}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Release concurrent slot when call ends (for any terminal status)
|
|||
|
|
terminal_statuses = ["completed", "busy", "no-answer", "failed", "canceled"]
|
|||
|
|
if CallStatus.lower() in terminal_statuses and workflow_run.campaign_id:
|
|||
|
|
# Release the concurrent slot for this call
|
|||
|
|
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
|
|||
|
|
|
|||
|
|
# Check if retry is needed for campaign calls
|
|||
|
|
if (
|
|||
|
|
CallStatus.lower() in ["busy", "no-answer", "failed"]
|
|||
|
|
and workflow_run.campaign_id
|
|||
|
|
):
|
|||
|
|
# Lets retry for busy and no-answer
|
|||
|
|
if CallStatus.lower() in ["busy", "no-answer"]:
|
|||
|
|
publisher = await get_campaign_event_publisher()
|
|||
|
|
await publisher.publish_retry_needed(
|
|||
|
|
workflow_run_id=workflow_run_id,
|
|||
|
|
reason=CallStatus.lower().replace(
|
|||
|
|
"-", "_"
|
|||
|
|
), # Convert no-answer to no_answer
|
|||
|
|
campaign_id=workflow_run.campaign_id,
|
|||
|
|
queued_run_id=workflow_run.queued_run_id,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Update workflow run with appropriate tags
|
|||
|
|
call_tags = workflow_run.gathered_context.get("call_tags", [])
|
|||
|
|
call_tags.extend(["not_connected", f"twilio_{CallStatus.lower()}"])
|
|||
|
|
|
|||
|
|
await db_client.update_workflow_run(
|
|||
|
|
run_id=workflow_run_id,
|
|||
|
|
is_completed=True,
|
|||
|
|
gathered_context={
|
|||
|
|
"call_tags": call_tags,
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return {"status": "success", "message": "Callback processed"}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"Error processing Twilio status callback: {e}")
|
|||
|
|
return {"status": "error", "message": str(e)}
|