From 3c5bc688eddde55395611fd0345229c524b33301 Mon Sep 17 00:00:00 2001 From: Sabiha Khan <87858386+chewwbaka@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:22:44 +0530 Subject: [PATCH] feat: hang up cloudonix machine answered call if feature flag enabled (#182) --- api/constants.py | 44 +++++++++++++ api/routes/telephony.py | 61 +++++++++++++++++- .../telephony/providers/cloudonix_provider.py | 64 +++++++++++++++++-- 3 files changed, 162 insertions(+), 7 deletions(-) diff --git a/api/constants.py b/api/constants.py index 2bc81ac..a7d82a1 100644 --- a/api/constants.py +++ b/api/constants.py @@ -13,6 +13,50 @@ FILLER_SOUND_PROBABILITY = 0.0 VOICEMAIL_RECORDING_DURATION = 5.0 +# Cloudonix Answering Machine Detection (AMD) constants +# Ref: https://developers.cloudonix.com/Documentation/apiWorkflow/callControlAndSessionManagement#answering-machine-detection-results + +# Enables AMD and waits for the full voicemail greeting to finish before reporting a result. +# Alternative: "Enable" — ends detection immediately upon determination. +AMD_MACHINE_DETECTION = "DetectMessageEnd" + +# Runs AMD in the background while the call continues, posting results to asyncAmdStatusCallback. +# Default: disabled (False). +AMD_ASYNC = True + +# HTTP method used when Cloudonix posts the AMD result to the callback URL. +# Allowed: "POST" or "GET". Default: "POST". +AMD_CALLBACK_METHOD = "POST" + +# Maximum seconds to wait for a determination before returning "unknown". +# Range: 3–59 seconds. Default: 30. +AMD_MACHINE_DETECTION_TIMEOUT = 30 + +# Minimum greeting duration (ms) expected from an answering machine. +# Range: 1000–6000 ms. Default: 2400. +AMD_MACHINE_DETECTION_SPEECH_THRESHOLD = 2400 + +# Duration of silence (ms) after speech that confirms the greeting has ended. +# Range: 500–5000 ms. Default: 1200. +AMD_MACHINE_DETECTION_SPEECH_END_THRESHOLD = 1200 + +# Maximum wait (ms) for any audio after the call is answered before timing out. +# Range: 2000–10000 ms. Default: 5000. +AMD_MACHINE_DETECTION_SILENCE_TIMEOUT = 5000 + +# Cloudonix AMD final result values +MACHINE_END_SILENCE = "machine_end_silence" +MACHINE_END_OTHER = "machine_end_other" +HUMAN = "human" +UNKNOWN = "unknown" + +# Final (non-interim) AMD result values — only these are stored in gathered context. +AMD_FINAL_RESULTS = {MACHINE_END_SILENCE, MACHINE_END_OTHER, HUMAN, UNKNOWN} + +# When enabled, Cloudonix calls answered by an answering machine are automatically hung up. +AMD_HANGUP_ENABLED = os.getenv("AMD_HANGUP_ENABLED", "false").lower() == "true" + + # Configuration constants ENABLE_TRACING = os.getenv("ENABLE_TRACING", "false").lower() == "true" diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 71571bd..acf7b53 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -17,12 +17,19 @@ from fastapi import ( WebSocket, ) from loguru import logger +from pipecat.utils.run_context import set_current_run_id from pydantic import BaseModel, field_validator from sqlalchemy import text from sqlalchemy.future import select from starlette.responses import HTMLResponse from starlette.websockets import WebSocketDisconnect +from api.constants import ( + AMD_FINAL_RESULTS, + AMD_HANGUP_ENABLED, + MACHINE_END_OTHER, + MACHINE_END_SILENCE, +) from api.db import db_client from api.db.models import OrganizationConfigurationModel, UserModel from api.db.workflow_client import WorkflowClient @@ -50,7 +57,6 @@ from api.utils.telephony_helper import ( numbers_match, parse_webhook_request, ) -from pipecat.utils.run_context import set_current_run_id router = APIRouter(prefix="/telephony") @@ -1218,6 +1224,7 @@ async def handle_cloudonix_amd_callback( ): """Handle Cloudonix-specific Answering Machine Detection(AMD) callbacks. Cloudonix sends AMD updates to the callback URL specified during call initiation. + Final results - 'machine_end_silence', 'machine_end_other', 'human', 'unknown' """ set_current_run_id(workflow_run_id) @@ -1236,6 +1243,58 @@ async def handle_cloudonix_amd_callback( f"[run {workflow_run_id}] Received Cloudonix AMD status callback with answered-by {answered_by} for call ID {call_id}: {json.dumps(callback_data)}" ) + if answered_by in AMD_FINAL_RESULTS: + try: + await db_client.update_workflow_run( + run_id=workflow_run_id, + gathered_context={"answered_by": answered_by}, + ) + logger.info( + f"[run {workflow_run_id}] AMD final result '{answered_by}' stored in gathered context" + ) + + is_machine = ( + answered_by == MACHINE_END_SILENCE or answered_by == MACHINE_END_OTHER + ) + if is_machine and AMD_HANGUP_ENABLED: + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.warning( + f"[run {workflow_run_id}] Workflow run not found, skipping AMD hangup" + ) + return {"status": "success"} + + workflow_run_call_id = (workflow_run.gathered_context or {}).get( + "call_id" + ) + if workflow_run_call_id != call_id: + logger.warning( + f"[run {workflow_run_id}] AMD callback call_id '{call_id}' does not match " + f"workflow run call_id '{workflow_run_call_id}', skipping AMD hangup" + ) + return {"status": "success"} + + if not workflow_run.workflow: + logger.warning( + f"[run {workflow_run_id}] Workflow not found, skipping AMD hangup" + ) + return {"status": "success"} + + provider = await get_telephony_provider( + workflow_run.workflow.organization_id + ) + await provider.hangup_machine_answered_call(call_id) + logger.info( + f"[run {workflow_run_id}] AMD hangup executed for machine call {call_id}" + ) + + return {"status": "success"} + + except Exception as e: + logger.error( + f"[run {workflow_run_id}] Failed to process AMD final result '{answered_by}': {e}" + ) + return {"status": answered_by} diff --git a/api/services/telephony/providers/cloudonix_provider.py b/api/services/telephony/providers/cloudonix_provider.py index 8bbc668..d00630c 100644 --- a/api/services/telephony/providers/cloudonix_provider.py +++ b/api/services/telephony/providers/cloudonix_provider.py @@ -10,6 +10,15 @@ import aiohttp from fastapi import HTTPException from loguru import logger +from api.constants import ( + AMD_ASYNC, + AMD_CALLBACK_METHOD, + AMD_MACHINE_DETECTION, + AMD_MACHINE_DETECTION_SILENCE_TIMEOUT, + AMD_MACHINE_DETECTION_SPEECH_END_THRESHOLD, + AMD_MACHINE_DETECTION_SPEECH_THRESHOLD, + AMD_MACHINE_DETECTION_TIMEOUT, +) from api.enums import WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, @@ -58,6 +67,29 @@ class CloudonixProvider(TelephonyProvider): "Content-Type": "application/json", } + def _get_amd_config( + self, backend_endpoint: str, workflow_run_id: Optional[int] + ) -> Dict[str, Any]: + """Build the Answering Machine Detection configuration for an outbound call. + + Args: + backend_endpoint: The backend base URL for the AMD status callback. + workflow_run_id: The workflow run ID used to route the callback. + + Returns: + Dict containing AMD-related fields to merge into the call payload. + """ + return { + "machineDetection": AMD_MACHINE_DETECTION, + "asyncAmd": AMD_ASYNC, + "asyncAmdStatusCallback": f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}", + "asyncAmdStatusCallbackMethod": AMD_CALLBACK_METHOD, + "machineDetectionTimeout": AMD_MACHINE_DETECTION_TIMEOUT, + "machineDetectionSpeechThreshold": AMD_MACHINE_DETECTION_SPEECH_THRESHOLD, + "machineDetectionSpeechEndThreshold": AMD_MACHINE_DETECTION_SPEECH_END_THRESHOLD, + "machineDetectionSilenceTimeout": AMD_MACHINE_DETECTION_SILENCE_TIMEOUT, + } + async def initiate_call( self, to_number: str, @@ -105,12 +137,8 @@ class CloudonixProvider(TelephonyProvider): """, "caller-id": from_number, # Required field } - data["machineDetection"] = "DetectMessageEnd" - data["asyncAmd"] = True - data["asyncAmdStatusCallback"] = ( - f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}" - ) - data["asyncAmdStatusCallbackMethod"] = "POST" + + data.update(self._get_amd_config(backend_endpoint, workflow_run_id)) # TODO: Cloudonix status callbacks are spammy, so commenting it out. Can send it to # some persistent logging system instead of transcational database. @@ -688,6 +716,30 @@ class CloudonixProvider(TelephonyProvider): return Response(content=twiml, media_type="application/xml"), "application/xml" + # ======== CALL CONTROL METHODS ======== + + async def hangup_machine_answered_call(self, call_id: str) -> bool: + """Hang up a call that was answered by an answering machine. + + Args: + call_id: The Cloudonix session token / call ID to terminate. + + Returns: + True if the call was successfully terminated, False otherwise. + """ + from api.services.telephony.providers.cloudonix_call_strategies import ( + CloudonixHangupStrategy, + ) + + strategy = CloudonixHangupStrategy() + return await strategy.execute_hangup( + { + "call_id": call_id, + "domain_id": self.domain_id, + "bearer_token": self.bearer_token, + } + ) + # ======== CALL TRANSFER METHODS ======== async def transfer_call(