feat: add amd callback

This commit is contained in:
Sabiha Khan 2026-03-03 20:12:04 +05:30
parent aed5a782fb
commit fb08f56524
11 changed files with 308 additions and 137 deletions

View file

@ -6,8 +6,7 @@ Consolidated from split modules for easier maintenance.
import json import json
import uuid import uuid
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import Optional from typing import Optional, Any, Dict
from fastapi import ( from fastapi import (
APIRouter, APIRouter,
Depends, Depends,
@ -49,6 +48,7 @@ from api.utils.telephony_helper import (
normalize_webhook_data, normalize_webhook_data,
numbers_match, numbers_match,
parse_webhook_request, parse_webhook_request,
parse_cloudonix_amd_callback
) )
from pipecat.utils.run_context import set_current_run_id from pipecat.utils.run_context import set_current_run_id
@ -1182,13 +1182,12 @@ async def handle_cloudonix_status_callback(
logger.warning(f"Workflow run {workflow_run_id} not found for status callback") logger.warning(f"Workflow run {workflow_run_id} not found for status callback")
return {"status": "ignored", "reason": "workflow_run_not_found"} return {"status": "ignored", "reason": "workflow_run_not_found"}
# Get workflow and provider # Get workflow from workflow_run and get the provider
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) if not workflow_run.workflow:
if not workflow:
logger.warning(f"Workflow {workflow_run.workflow_id} not found") logger.warning(f"Workflow {workflow_run.workflow_id} not found")
return {"status": "ignored", "reason": "workflow_not_found"} return {"status": "ignored", "reason": "workflow_not_found"}
provider = await get_telephony_provider(workflow.organization_id) provider = await get_telephony_provider(workflow_run.workflow.organization_id)
# Parse the callback data into generic format # Parse the callback data into generic format
parsed_data = provider.parse_status_callback(callback_data) parsed_data = provider.parse_status_callback(callback_data)
@ -1209,6 +1208,71 @@ async def handle_cloudonix_status_callback(
return {"status": "success"} return {"status": "success"}
@router.post("/cloudonix/amd-callback/{workflow_run_id}")
async def handle_cloudonix_amd_callback(
workflow_run_id: int,
request: Request,
):
"""Handle Cloudonix-specific status callbacks.
Cloudonix sends call status updates to the callback URL specified during call initiation.
"""
set_current_run_id(workflow_run_id)
# Parse callback data - determine if JSON or form data
content_type = request.headers.get("content-type", "")
if "application/json" in content_type:
callback_data = await request.json()
else:
# Assume form data (like Twilio)
form_data = await request.form()
callback_data = dict(form_data)
logger.info(
f"[run {workflow_run_id}] Received Cloudonix AMD status callback: {json.dumps(callback_data)}"
)
call_id = callback_data["CallSid"]
answered_by = callback_data["AnsweredBy"]
logger.info(
f"[run {workflow_run_id}] returning from the AMD callback for call answered-by: {answered_by}"
)
if answered_by in ["human", "unknown"]:
logger.info(
f"[run {workflow_run_id}] returning from the AMD callback for call answered-by: {answered_by}"
)
return {"status": answered_by}
logger.info(
f"[run {workflow_run_id}] proceeding with call hang up and workflow run update for call answered-by: {answered_by}"
)
workflow_run = await db_client.get_workflow_run_by_call_id(call_id)
if not workflow_run:
logger.warning(f"No workflow run found for Cloudonix call_id: {call_id}")
return {"status": "ignored", "reason": "workflow_run_not_found"}
organization_id = workflow_run.workflow.organization_id
provider = await get_telephony_provider(organization_id)
# Parse the AMD callback data to persist in workflow_run
parsed_data = parse_cloudonix_amd_callback(callback_data)
# Create status update for AMD detection
status_update = StatusCallbackRequest(
call_id=parsed_data["call_id"],
status=f"amd_{parsed_data['answered_by']}",
extra_data=parsed_data,
)
# Process the status update
await _process_status_update(workflow_run_id, status_update)
logger.info(
f"[run {workflow_run_id}] Call answered by: {answered_by}, hang up call with call_id: {call_id}"
)
await provider.hang_up(call_id)
return {"status": answered_by}
@router.post("/vobiz/hangup-callback/workflow/{workflow_id}") @router.post("/vobiz/hangup-callback/workflow/{workflow_id}")
async def handle_vobiz_hangup_callback_by_workflow( async def handle_vobiz_hangup_callback_by_workflow(

View file

@ -352,3 +352,17 @@ class TelephonyProvider(ABC):
True if provider supports call transfers, False otherwise True if provider supports call transfers, False otherwise
""" """
pass pass
@abstractmethod
async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]:
"""
Terminate an active call.
Args:
call_id: Provider-specific call identifier
**kwargs: Provider-specific additional parameters
Returns:
Dict containing hangup response (format varies by provider)
"""
pass

View file

@ -418,3 +418,8 @@ class ARIProvider(TelephonyProvider):
f"&app={self.app_name}" f"&app={self.app_name}"
f"&subscribeAll=true" f"&subscribeAll=true"
) )
async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]:
"""Terminate an ARI call."""
# TODO: Implement ARI call termination
return {"status": "not_implemented"}

View file

@ -106,6 +106,16 @@ class CloudonixProvider(TelephonyProvider):
"caller-id": from_number, # Required field "caller-id": from_number, # Required field
} }
# Enable and process AMD
data["machineDetection"] = "DetectMessageEnd"
data["asyncAmd"] = True
data["machineDetectionTimeout"] = 30
data["machineDetectionSpeechThreshold"] = 2500
data["machineDetectionSpeechEndThreshold"] = 6000
data["machineDetectionSilenceTimeout"] = 2500
data["asyncAmdStatusCallback"] = f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}"
data["asyncAmdStatusCallbackMethod"]= "POST"
# TODO: Cloudonix status callbacks are spammy, so commenting it out. Can send it to # TODO: Cloudonix status callbacks are spammy, so commenting it out. Can send it to
# some persistent logging system instead of transcational database. # some persistent logging system instead of transcational database.
# Add status callback if workflow_run_id provided # Add status callback if workflow_run_id provided
@ -682,6 +692,42 @@ class CloudonixProvider(TelephonyProvider):
return Response(content=twiml, media_type="application/xml"), "application/xml" return Response(content=twiml, media_type="application/xml"), "application/xml"
async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]:
# Construct the DELETE session endpoint
# Using "self" as customer-id as per Cloudonix documentation
endpoint = (
f"{self.base_url}/customers/self/domains/{self.domain_id}/sessions/{call_id}"
)
# Prepare headers with Bearer token authentication
headers = {
"Authorization": f"Bearer {self.bearer_token}",
"Content-Type": "application/json",
}
logger.info(f"Terminating Cloudonix call {call_id} via DELETE {endpoint}")
# Make the DELETE request to terminate the session
async with aiohttp.ClientSession() as session:
async with session.delete(endpoint, headers=headers) as response:
status = response.status
response_text = await response.text()
if status in (200, 204, 404):
# 200/204: Success
# 404: Session already terminated (acceptable)
logger.info(
f"Successfully terminated Cloudonix session {call_id} "
f"(HTTP {status}), Response: {response_text}"
)
else:
logger.warning(
f"Unexpected response terminating Cloudonix session {call_id}: "
f"HTTP {status}, Response: {response_text}"
)
return {"status": "success"}
# ======== CALL TRANSFER METHODS ======== # ======== CALL TRANSFER METHODS ========
async def transfer_call( async def transfer_call(

View file

@ -587,3 +587,8 @@ class TwilioProvider(TelephonyProvider):
True - Twilio provider supports call transfers True - Twilio provider supports call transfers
""" """
return True return True
async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]:
"""Terminate a Twilio call."""
# TODO: Implement Twilio call termination
return {"status": "not_implemented"}

View file

@ -560,3 +560,8 @@ class VobizProvider(TelephonyProvider):
False - Vobiz provider does not support call transfers False - Vobiz provider does not support call transfers
""" """
return False return False
async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]:
"""Terminate a Vobiz call."""
# TODO: Implement Vobiz call termination
return {"status": "not_implemented"}

View file

@ -511,3 +511,8 @@ class VonageProvider(TelephonyProvider):
False - Vonage provider does not support call transfers False - Vonage provider does not support call transfers
""" """
return False return False
async def hang_up(self, call_id: str, **kwargs: Any) -> Dict[str, Any]:
"""Terminate a Vonage call."""
# TODO: Implement Vonage call termination
return {"status": "not_implemented"}

View file

@ -231,3 +231,30 @@ def get_countries_for_code(dialing_code: str) -> list[str]:
return [] return []
return [country for country, code in COUNTRY_CODES.items() if code == dialing_code] return [country for country, code in COUNTRY_CODES.items() if code == dialing_code]
def parse_cloudonix_amd_callback(data: dict) -> dict:
"""
Parse Cloudonix AMD callback data into generic format.
Note: This is Cloudonix-specific and not part of the generic provider interface
as AMD callbacks are currently only supported by Cloudonix.
Args:
data: Raw AMD callback data from Cloudonix
Returns:
Dict with parsed AMD information
"""
return {
"call_id": data.get("CallSid", ""),
"session": data.get("Session", ""),
"answered_by": data.get("AnsweredBy", ""),
"from_number": data.get("From", ""),
"to_number": data.get("To", ""),
"call_status": data.get("CallStatus", ""),
"domain": data.get("Domain", ""),
"direction": data.get("Direction", ""),
"account_sid": data.get("AccountSid", ""),
"api_version": data.get("ApiVersion", "")
}

View file

@ -59,155 +59,155 @@ services:
networks: networks:
- app-network - app-network
nginx: # nginx:
image: nginx:alpine # image: nginx:alpine
container_name: nginx_https # container_name: nginx_https
profiles: ["remote"] # profiles: ["remote"]
depends_on: # depends_on:
- ui # - ui
ports: # ports:
- "80:80" # - "80:80"
- "443:443" # - "443:443"
volumes: # volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro # - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
- ./certs:/etc/nginx/certs:ro # - ./certs:/etc/nginx/certs:ro
networks: # networks:
- app-network # - app-network
api: # api:
image: ${REGISTRY:-dograhai}/dograh-api:latest # image: ${REGISTRY:-dograhai}/dograh-api:latest
volumes: # volumes:
- shared-tmp:/tmp # - shared-tmp:/tmp
environment: # environment:
# Core application config # # Core application config
ENVIRONMENT: "local" # ENVIRONMENT: "local"
LOG_LEVEL: "INFO" # LOG_LEVEL: "INFO"
# Replace this environment variable if you are using a custom # # Replace this environment variable if you are using a custom
# domain to host the stack # # domain to host the stack
BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" # BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
# Database configuration (using containerized postgres) # # Database configuration (using containerized postgres)
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" # DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
# Redis configuration (using containerized redis) # # Redis configuration (using containerized redis)
REDIS_URL: "redis://:redissecret@redis:6379" # REDIS_URL: "redis://:redissecret@redis:6379"
# Storage configuration - using local MinIO # # Storage configuration - using local MinIO
ENABLE_AWS_S3: "false" # ENABLE_AWS_S3: "false"
# MinIO # # MinIO
MINIO_ENDPOINT: "minio:9000" # MINIO_ENDPOINT: "minio:9000"
MINIO_ACCESS_KEY: "minioadmin" # MINIO_ACCESS_KEY: "minioadmin"
MINIO_SECRET_KEY: "minioadmin" # MINIO_SECRET_KEY: "minioadmin"
MINIO_BUCKET: "voice-audio" # MINIO_BUCKET: "voice-audio"
MINIO_SECURE: "false" # MINIO_SECURE: "false"
# FastAPI workers count # # FastAPI workers count
FASTAPI_WORKERS: 1 # FASTAPI_WORKERS: 1
# Langfuse # # Langfuse
ENABLE_TRACING: "false" # ENABLE_TRACING: "false"
# LANGFUSE_SECRET_KEY: "" # # LANGFUSE_SECRET_KEY: ""
# LANGFUSE_PUBLIC_KEY: "" # # LANGFUSE_PUBLIC_KEY: ""
# LANGFUSE_HOST: "" # # LANGFUSE_HOST: ""
# TURN server configuration (for WebRTC NAT traversal in remote server) # # TURN server configuration (for WebRTC NAT traversal in remote server)
# Uses time-limited credentials via TURN REST API (HMAC-SHA1) # # Uses time-limited credentials via TURN REST API (HMAC-SHA1)
TURN_HOST: "${TURN_HOST:-}" # TURN_HOST: "${TURN_HOST:-}"
TURN_SECRET: "${TURN_SECRET:-}" # TURN_SECRET: "${TURN_SECRET:-}"
OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" # OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}"
ports: # ports:
- "8000:8000" # - "8000:8000"
depends_on: # depends_on:
postgres: # postgres:
condition: service_healthy # condition: service_healthy
redis: # redis:
condition: service_healthy # condition: service_healthy
minio: # minio:
condition: service_healthy # condition: service_healthy
cloudflared: # cloudflared:
condition: service_started # condition: service_started
healthcheck: # healthcheck:
test: # test:
[ # [
"CMD-SHELL", # "CMD-SHELL",
'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"', # 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"',
] # ]
interval: 30s # interval: 30s
timeout: 10s # timeout: 10s
retries: 3 # retries: 3
start_period: 60s # start_period: 60s
networks: # networks:
- app-network # - app-network
ui: # ui:
image: ${REGISTRY:-dograhai}/dograh-ui:latest # image: ${REGISTRY:-dograhai}/dograh-ui:latest
environment: # environment:
# Server-side URL (SSR, internal Docker network) # # Server-side URL (SSR, internal Docker network)
BACKEND_URL: "${BACKEND_URL:-http://api:8000}" # BACKEND_URL: "${BACKEND_URL:-http://api:8000}"
NODE_ENV: "oss" # NODE_ENV: "oss"
# Flag to enable/ disable posthog # # Flag to enable/ disable posthog
ENABLE_TELEMETRY: "${ENABLE_TELEMETRY:-true}" # ENABLE_TELEMETRY: "${ENABLE_TELEMETRY:-true}"
# Posthog # # Posthog
POSTHOG_KEY: "phc_ItizB1dP6yv7ZYobbcqrpxTdbomDA8hJFSEmAMdYvIr" # POSTHOG_KEY: "phc_ItizB1dP6yv7ZYobbcqrpxTdbomDA8hJFSEmAMdYvIr"
POSTHOG_HOST: "https://us.posthog.com" # POSTHOG_HOST: "https://us.posthog.com"
ports: # ports:
- "3010:3010" # - "3010:3010"
depends_on: # depends_on:
api: # api:
condition: service_healthy # condition: service_healthy
healthcheck: # healthcheck:
test: # test:
[ # [
"CMD-SHELL", # "CMD-SHELL",
"wget --no-verbose --tries=1 --spider http://localhost:3010 || exit 1", # "wget --no-verbose --tries=1 --spider http://localhost:3010 || exit 1",
] # ]
interval: 30s # interval: 30s
timeout: 10s # timeout: 10s
retries: 3 # retries: 3
start_period: 30s # start_period: 30s
networks: # networks:
- app-network # - app-network
cloudflared: # cloudflared:
image: cloudflare/cloudflared:latest # image: cloudflare/cloudflared:latest
container_name: cloudflared-tunnel # container_name: cloudflared-tunnel
command: tunnel --no-autoupdate --url http://api:8000 --metrics 0.0.0.0:2000 # command: tunnel --no-autoupdate --url http://api:8000 --metrics 0.0.0.0:2000
ports: # ports:
- "2000:2000" # Expose metrics endpoint # - "2000:2000" # Expose metrics endpoint
networks: # networks:
- app-network # - app-network
coturn: # coturn:
image: coturn/coturn:4.8.0 # image: coturn/coturn:4.8.0
container_name: coturn # container_name: coturn
restart: unless-stopped # restart: unless-stopped
profiles: ["remote"] # profiles: ["remote"]
ports: # ports:
- "3478:3478/udp" # - "3478:3478/udp"
- "3478:3478/tcp" # - "3478:3478/tcp"
- "5349:5349/udp" # - "5349:5349/udp"
- "5349:5349/tcp" # - "5349:5349/tcp"
- "49152-49200:49152-49200/udp" # - "49152-49200:49152-49200/udp"
volumes: # volumes:
- ./turnserver.conf:/etc/coturn/turnserver.conf:ro # - ./turnserver.conf:/etc/coturn/turnserver.conf:ro
command: # command:
- -c # - -c
- /etc/coturn/turnserver.conf # - /etc/coturn/turnserver.conf
networks: # networks:
- app-network # - app-network
volumes: volumes:
postgres_data: postgres_data:
redis_data: redis_data:
minio-data: minio-data:
driver: local driver: local
shared-tmp: # shared-tmp:
driver: local # driver: local
networks: networks:
app-network: app-network:

@ -1 +1 @@
Subproject commit 6aa0502a9834d536aba9589cec87d827e66f2fad Subproject commit 3559c6a8eb9cc82b400d745ab62ed037a420b42f

4
ui/package-lock.json generated
View file

@ -1,12 +1,12 @@
{ {
"name": "ui", "name": "ui",
"version": "1.13.0", "version": "1.15.0",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "ui", "name": "ui",
"version": "1.13.0", "version": "1.15.0",
"dependencies": { "dependencies": {
"@dagrejs/dagre": "^1.1.4", "@dagrejs/dagre": "^1.1.4",
"@hey-api/client-fetch": "^0.10.0", "@hey-api/client-fetch": "^0.10.0",