This commit is contained in:
Abhishek Kumar 2026-03-06 16:49:21 +05:30
commit 819f8c64c6
3 changed files with 162 additions and 7 deletions

View file

@ -17,12 +17,19 @@ from fastapi import (
WebSocket,
)
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from pydantic import BaseModel, field_validator
from sqlalchemy import text
from sqlalchemy.future import select
from starlette.responses import HTMLResponse
from starlette.websockets import WebSocketDisconnect
from api.constants import (
AMD_FINAL_RESULTS,
AMD_HANGUP_ENABLED,
MACHINE_END_OTHER,
MACHINE_END_SILENCE,
)
from api.db import db_client
from api.db.models import OrganizationConfigurationModel, UserModel
from api.db.workflow_client import WorkflowClient
@ -50,7 +57,6 @@ from api.utils.telephony_helper import (
numbers_match,
parse_webhook_request,
)
from pipecat.utils.run_context import set_current_run_id
router = APIRouter(prefix="/telephony")
@ -1218,6 +1224,7 @@ async def handle_cloudonix_amd_callback(
):
"""Handle Cloudonix-specific Answering Machine Detection(AMD) callbacks.
Cloudonix sends AMD updates to the callback URL specified during call initiation.
Final results - 'machine_end_silence', 'machine_end_other', 'human', 'unknown'
"""
set_current_run_id(workflow_run_id)
@ -1236,6 +1243,58 @@ async def handle_cloudonix_amd_callback(
f"[run {workflow_run_id}] Received Cloudonix AMD status callback with answered-by {answered_by} for call ID {call_id}: {json.dumps(callback_data)}"
)
if answered_by in AMD_FINAL_RESULTS:
try:
await db_client.update_workflow_run(
run_id=workflow_run_id,
gathered_context={"answered_by": answered_by},
)
logger.info(
f"[run {workflow_run_id}] AMD final result '{answered_by}' stored in gathered context"
)
is_machine = (
answered_by == MACHINE_END_SILENCE or answered_by == MACHINE_END_OTHER
)
if is_machine and AMD_HANGUP_ENABLED:
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found, skipping AMD hangup"
)
return {"status": "success"}
workflow_run_call_id = (workflow_run.gathered_context or {}).get(
"call_id"
)
if workflow_run_call_id != call_id:
logger.warning(
f"[run {workflow_run_id}] AMD callback call_id '{call_id}' does not match "
f"workflow run call_id '{workflow_run_call_id}', skipping AMD hangup"
)
return {"status": "success"}
if not workflow_run.workflow:
logger.warning(
f"[run {workflow_run_id}] Workflow not found, skipping AMD hangup"
)
return {"status": "success"}
provider = await get_telephony_provider(
workflow_run.workflow.organization_id
)
await provider.hangup_machine_answered_call(call_id)
logger.info(
f"[run {workflow_run_id}] AMD hangup executed for machine call {call_id}"
)
return {"status": "success"}
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Failed to process AMD final result '{answered_by}': {e}"
)
return {"status": answered_by}