feat: hang up cloudonix machine answered call if feature flag enabled

This commit is contained in:
Sabiha Khan 2026-03-06 12:34:57 +05:30
parent 7b77721964
commit b0a724f8c1
3 changed files with 162 additions and 7 deletions

View file

@ -13,6 +13,50 @@ FILLER_SOUND_PROBABILITY = 0.0
VOICEMAIL_RECORDING_DURATION = 5.0
# Cloudonix Answering Machine Detection (AMD) constants
# Ref: https://developers.cloudonix.com/Documentation/apiWorkflow/callControlAndSessionManagement#answering-machine-detection-results
# Enables AMD and waits for the full voicemail greeting to finish before reporting a result.
# Alternative: "Enable" — ends detection immediately upon determination.
AMD_MACHINE_DETECTION = "DetectMessageEnd"
# Runs AMD in the background while the call continues, posting results to asyncAmdStatusCallback.
# Default: disabled (False).
AMD_ASYNC = True
# HTTP method used when Cloudonix posts the AMD result to the callback URL.
# Allowed: "POST" or "GET". Default: "POST".
AMD_CALLBACK_METHOD = "POST"
# Maximum seconds to wait for a determination before returning "unknown".
# Range: 359 seconds. Default: 30.
AMD_MACHINE_DETECTION_TIMEOUT = 30
# Minimum greeting duration (ms) expected from an answering machine.
# Range: 10006000 ms. Default: 2400.
AMD_MACHINE_DETECTION_SPEECH_THRESHOLD = 2400
# Duration of silence (ms) after speech that confirms the greeting has ended.
# Range: 5005000 ms. Default: 1200.
AMD_MACHINE_DETECTION_SPEECH_END_THRESHOLD = 1200
# Maximum wait (ms) for any audio after the call is answered before timing out.
# Range: 200010000 ms. Default: 5000.
AMD_MACHINE_DETECTION_SILENCE_TIMEOUT = 5000
# Cloudonix AMD final result values
MACHINE_END_SILENCE = "machine_end_silence"
MACHINE_END_OTHER = "machine_end_other"
HUMAN = "human"
UNKNOWN = "unknown"
# Final (non-interim) AMD result values — only these are stored in gathered context.
AMD_FINAL_RESULTS = {MACHINE_END_SILENCE, MACHINE_END_OTHER, HUMAN, UNKNOWN}
# When enabled, Cloudonix calls answered by an answering machine are automatically hung up.
AMD_HANGUP_ENABLED = os.getenv("AMD_HANGUP_ENABLED", "false").lower() == "true"
# Configuration constants
ENABLE_TRACING = os.getenv("ENABLE_TRACING", "false").lower() == "true"

View file

@ -17,12 +17,19 @@ from fastapi import (
WebSocket,
)
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from pydantic import BaseModel, field_validator
from sqlalchemy import text
from sqlalchemy.future import select
from starlette.responses import HTMLResponse
from starlette.websockets import WebSocketDisconnect
from api.constants import (
AMD_FINAL_RESULTS,
AMD_HANGUP_ENABLED,
MACHINE_END_OTHER,
MACHINE_END_SILENCE,
)
from api.db import db_client
from api.db.models import OrganizationConfigurationModel, UserModel
from api.db.workflow_client import WorkflowClient
@ -50,7 +57,6 @@ from api.utils.telephony_helper import (
numbers_match,
parse_webhook_request,
)
from pipecat.utils.run_context import set_current_run_id
router = APIRouter(prefix="/telephony")
@ -1218,6 +1224,7 @@ async def handle_cloudonix_amd_callback(
):
"""Handle Cloudonix-specific Answering Machine Detection(AMD) callbacks.
Cloudonix sends AMD updates to the callback URL specified during call initiation.
Final results - 'machine_end_silence', 'machine_end_other', 'human', 'unknown'
"""
set_current_run_id(workflow_run_id)
@ -1236,6 +1243,58 @@ async def handle_cloudonix_amd_callback(
f"[run {workflow_run_id}] Received Cloudonix AMD status callback with answered-by {answered_by} for call ID {call_id}: {json.dumps(callback_data)}"
)
if answered_by in AMD_FINAL_RESULTS:
try:
await db_client.update_workflow_run(
run_id=workflow_run_id,
gathered_context={"answered_by": answered_by},
)
logger.info(
f"[run {workflow_run_id}] AMD final result '{answered_by}' stored in gathered context"
)
is_machine = (
answered_by == MACHINE_END_SILENCE or answered_by == MACHINE_END_OTHER
)
if is_machine and AMD_HANGUP_ENABLED:
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(
f"[run {workflow_run_id}] Workflow run not found, skipping AMD hangup"
)
return {"status": "success"}
workflow_run_call_id = (workflow_run.gathered_context or {}).get(
"call_id"
)
if workflow_run_call_id != call_id:
logger.warning(
f"[run {workflow_run_id}] AMD callback call_id '{call_id}' does not match "
f"workflow run call_id '{workflow_run_call_id}', skipping AMD hangup"
)
return {"status": "success"}
if not workflow_run.workflow:
logger.warning(
f"[run {workflow_run_id}] Workflow not found, skipping AMD hangup"
)
return {"status": "success"}
provider = await get_telephony_provider(
workflow_run.workflow.organization_id
)
await provider.hangup_machine_answered_call(call_id)
logger.info(
f"[run {workflow_run_id}] AMD hangup executed for machine call {call_id}"
)
return {"status": "success"}
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Failed to process AMD final result '{answered_by}': {e}"
)
return {"status": answered_by}

View file

@ -10,6 +10,15 @@ import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.constants import (
AMD_ASYNC,
AMD_CALLBACK_METHOD,
AMD_MACHINE_DETECTION,
AMD_MACHINE_DETECTION_SILENCE_TIMEOUT,
AMD_MACHINE_DETECTION_SPEECH_END_THRESHOLD,
AMD_MACHINE_DETECTION_SPEECH_THRESHOLD,
AMD_MACHINE_DETECTION_TIMEOUT,
)
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
@ -58,6 +67,29 @@ class CloudonixProvider(TelephonyProvider):
"Content-Type": "application/json",
}
def _get_amd_config(
self, backend_endpoint: str, workflow_run_id: Optional[int]
) -> Dict[str, Any]:
"""Build the Answering Machine Detection configuration for an outbound call.
Args:
backend_endpoint: The backend base URL for the AMD status callback.
workflow_run_id: The workflow run ID used to route the callback.
Returns:
Dict containing AMD-related fields to merge into the call payload.
"""
return {
"machineDetection": AMD_MACHINE_DETECTION,
"asyncAmd": AMD_ASYNC,
"asyncAmdStatusCallback": f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}",
"asyncAmdStatusCallbackMethod": AMD_CALLBACK_METHOD,
"machineDetectionTimeout": AMD_MACHINE_DETECTION_TIMEOUT,
"machineDetectionSpeechThreshold": AMD_MACHINE_DETECTION_SPEECH_THRESHOLD,
"machineDetectionSpeechEndThreshold": AMD_MACHINE_DETECTION_SPEECH_END_THRESHOLD,
"machineDetectionSilenceTimeout": AMD_MACHINE_DETECTION_SILENCE_TIMEOUT,
}
async def initiate_call(
self,
to_number: str,
@ -105,12 +137,8 @@ class CloudonixProvider(TelephonyProvider):
</Response>""",
"caller-id": from_number, # Required field
}
data["machineDetection"] = "DetectMessageEnd"
data["asyncAmd"] = True
data["asyncAmdStatusCallback"] = (
f"{backend_endpoint}/api/v1/telephony/cloudonix/amd-callback/{workflow_run_id}"
)
data["asyncAmdStatusCallbackMethod"] = "POST"
data.update(self._get_amd_config(backend_endpoint, workflow_run_id))
# TODO: Cloudonix status callbacks are spammy, so commenting it out. Can send it to
# some persistent logging system instead of transcational database.
@ -688,6 +716,30 @@ class CloudonixProvider(TelephonyProvider):
return Response(content=twiml, media_type="application/xml"), "application/xml"
# ======== CALL CONTROL METHODS ========
async def hangup_machine_answered_call(self, call_id: str) -> bool:
"""Hang up a call that was answered by an answering machine.
Args:
call_id: The Cloudonix session token / call ID to terminate.
Returns:
True if the call was successfully terminated, False otherwise.
"""
from api.services.telephony.providers.cloudonix_call_strategies import (
CloudonixHangupStrategy,
)
strategy = CloudonixHangupStrategy()
return await strategy.execute_hangup(
{
"call_id": call_id,
"domain_id": self.domain_id,
"bearer_token": self.bearer_token,
}
)
# ======== CALL TRANSFER METHODS ========
async def transfer_call(