feat(twilio): add Answering Machine Detection (AMD) support via telephony config (#443)

* feat(twilio): add Answering Machine Detection (AMD) support via telephony config

Closes #339

* chore: regenerate OpenAPI spec to fix drift-check

The openapi.json snapshot had drifted from the FastAPI app definition
because main gained new organization endpoints (billing, credits,
context) after this branch was created. Regenerate it with
'python -m scripts.dump_docs_openapi' to bring it back in sync.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add provider-level AMD hooks

* fix: handle db error while persisting amd result

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
Co-authored-by: Sabiha Khan <87858386+chewwbaka@users.noreply.github.com>
This commit is contained in:
nuthalapativarun 2026-06-25 02:15:13 -07:00 committed by GitHub
parent 29c5be298c
commit d675fd1fda
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 380 additions and 66 deletions

View file

@ -56,6 +56,15 @@ class NormalizedInboundData:
raw_data: Dict[str, Any] = field(default_factory=dict) # Original webhook data
@dataclass
class AnsweringMachineDetectionResult:
"""Standardized answering-machine detection result across providers."""
call_id: str
answered_by: str
raw_data: Dict[str, Any] = field(default_factory=dict)
class TelephonyProvider(ABC):
"""
Abstract base class for telephony providers.
@ -192,6 +201,23 @@ class TelephonyProvider(ABC):
"""
pass
def supports_answering_machine_detection(self) -> bool:
"""Return whether this provider can request answering-machine detection."""
return False
def apply_answering_machine_detection_call_params(
self,
data: Dict[str, Any],
) -> Dict[str, Any]:
"""Add provider-specific AMD parameters to an outbound call request."""
return data
def parse_answering_machine_detection_result(
self, data: Dict[str, Any]
) -> Optional[AnsweringMachineDetectionResult]:
"""Parse provider-specific callback data into a normalized AMD result."""
return None
@abstractmethod
async def handle_websocket(
self,

View file

@ -20,6 +20,7 @@ def _config_loader(value: Dict[str, Any]) -> Dict[str, Any]:
"account_sid": value.get("account_sid"),
"auth_token": value.get("auth_token"),
"from_numbers": value.get("from_numbers", []),
"amd_enabled": value.get("amd_enabled", False),
}
@ -47,6 +48,15 @@ _UI_METADATA = ProviderUIMetadata(
type="string-array",
description="E.164-formatted Twilio phone numbers used for outbound calls",
),
ProviderUIField(
name="amd_enabled",
label="Answering Machine Detection",
type="boolean",
description=(
"Detect whether outbound calls are answered by a person or "
"machine. Twilio may bill AMD as an additional per-call feature."
),
),
],
)

View file

@ -16,6 +16,13 @@ class TwilioConfigurationRequest(BaseModel):
from_numbers: List[str] = Field(
default_factory=list, description="List of Twilio phone numbers"
)
amd_enabled: bool = Field(
default=False,
description=(
"Detect whether outbound calls are answered by a person or machine. "
"Twilio may bill AMD as an additional per-call feature."
),
)
class TwilioConfigurationResponse(BaseModel):
@ -25,3 +32,4 @@ class TwilioConfigurationResponse(BaseModel):
account_sid: str # Masked (e.g., "****************def0")
auth_token: str # Masked (e.g., "****************abc1")
from_numbers: List[str]
amd_enabled: bool = False

View file

@ -13,6 +13,7 @@ from twilio.request_validator import RequestValidator
from api.enums import TelephonyCallStatus, WorkflowRunMode
from api.services.telephony.base import (
AnsweringMachineDetectionResult,
CallInitiationResult,
NormalizedInboundData,
ProviderSyncResult,
@ -47,6 +48,7 @@ class TwilioProvider(TelephonyProvider):
self.account_sid = config.get("account_sid")
self.auth_token = config.get("auth_token")
self.from_numbers = config.get("from_numbers", [])
self.amd_enabled: bool = bool(config.get("amd_enabled", False))
# Handle both single number (string) and multiple numbers (list)
if isinstance(self.from_numbers, str):
@ -96,6 +98,8 @@ class TwilioProvider(TelephonyProvider):
}
)
data = self.apply_answering_machine_detection_call_params(data)
data.update(kwargs)
# Make the API request
@ -241,6 +245,31 @@ class TwilioProvider(TelephonyProvider):
"extra": data, # Include all original data
}
def supports_answering_machine_detection(self) -> bool:
"""Twilio supports AMD through the Voice Calls API."""
return True
def apply_answering_machine_detection_call_params(
self,
data: Dict[str, Any],
) -> Dict[str, Any]:
if self.amd_enabled:
data["MachineDetection"] = "Enable"
return data
def parse_answering_machine_detection_result(
self, data: Dict[str, Any]
) -> Optional[AnsweringMachineDetectionResult]:
answered_by = data.get("AnsweredBy")
if not answered_by:
return None
return AnsweringMachineDetectionResult(
call_id=data.get("CallSid", ""),
answered_by=answered_by,
raw_data=data,
)
async def handle_websocket(
self,
websocket: "WebSocket",

View file

@ -12,6 +12,7 @@ from pipecat.utils.run_context import set_current_run_id
from starlette.responses import HTMLResponse
from api.db import db_client
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.factory import get_telephony_provider_for_run
from api.services.telephony.status_processor import (
StatusCallbackRequest,
@ -21,6 +22,30 @@ from api.services.telephony.status_processor import (
router = APIRouter()
async def _persist_amd_result_if_present(
*,
provider: TelephonyProvider,
workflow_run_id: int,
callback_data: dict,
) -> None:
amd_result = provider.parse_answering_machine_detection_result(callback_data)
if not amd_result:
return
try:
logger.info(
f"[run {workflow_run_id}] AMD result: AnsweredBy={amd_result.answered_by}"
)
await db_client.update_workflow_run(
run_id=workflow_run_id,
gathered_context={"answered_by": amd_result.answered_by},
)
except Exception as exc:
logger.warning(
f"[run {workflow_run_id}] Failed to persist AMD result: {exc}"
)
@router.post("/twiml", include_in_schema=False)
async def handle_twiml_webhook(
workflow_id: int,
@ -49,6 +74,12 @@ async def handle_twiml_webhook(
)
raise HTTPException(status_code=401, detail="Invalid webhook signature")
await _persist_amd_result_if_present(
provider=provider,
workflow_run_id=workflow_run_id,
callback_data=callback_data,
)
response_content = await provider.get_webhook_response(
workflow_id, user_id, workflow_run_id
)
@ -111,6 +142,12 @@ async def handle_twilio_status_callback(
extra=parsed_data.get("extra", {}),
)
await _persist_amd_result_if_present(
provider=provider,
workflow_run_id=workflow_run_id,
callback_data=callback_data,
)
# Process the status update
await _process_status_update(workflow_run_id, status_update)

View file

@ -40,7 +40,8 @@ class ProviderUIField:
name: str # Must match the Pydantic field name on config_request_cls
label: str
type: str # "text" | "password" | "textarea" | "string-array" | "number"
# "text" | "password" | "textarea" | "string-array" | "number" | "boolean"
type: str
required: bool = True
sensitive: bool = False # If true, mask when displaying stored value
description: Optional[str] = None

View file

@ -76,6 +76,34 @@ def _signature(
return validator.compute_signature(url, form_data)
def test_twilio_provider_applies_answering_machine_detection_params():
provider = TwilioProvider(
{
"account_sid": "AC123",
"auth_token": "twilio-auth-token",
"from_numbers": ["+15551230002"],
"amd_enabled": True,
}
)
data = provider.apply_answering_machine_detection_call_params({"To": "+1555"})
assert provider.supports_answering_machine_detection() is True
assert data["MachineDetection"] == "Enable"
def test_twilio_provider_parses_answering_machine_detection_result():
provider = _provider()
result = provider.parse_answering_machine_detection_result(
{"CallSid": "CA123", "AnsweredBy": "machine_start"}
)
assert result is not None
assert result.call_id == "CA123"
assert result.answered_by == "machine_start"
@pytest.mark.asyncio
async def test_twiml_route_accepts_valid_signature_with_extra_query_param():
provider = _provider()
@ -251,3 +279,106 @@ async def test_twilio_status_callback_accepts_valid_signature():
assert result == {"status": "success"}
process_status.assert_awaited_once()
@pytest.mark.asyncio
async def test_twilio_status_callback_persists_answering_machine_detection_result():
provider = _provider()
form_data = {
"CallSid": "CA123",
"CallStatus": "completed",
"AnsweredBy": "machine_start",
}
request = _request(
path="/api/v1/telephony/twilio/status-callback/123",
query={},
form_data=form_data,
headers={
"x-twilio-signature": _signature(
provider,
path="/api/v1/telephony/twilio/status-callback/123",
query={},
form_data=form_data,
)
},
)
with (
patch("api.services.telephony.providers.twilio.routes.db_client") as db_client,
patch(
"api.services.telephony.providers.twilio.routes.get_telephony_provider_for_run",
new_callable=AsyncMock,
return_value=provider,
),
patch(
"api.services.telephony.providers.twilio.routes._process_status_update",
new_callable=AsyncMock,
),
):
db_client.get_workflow_run_by_id = AsyncMock(
return_value=SimpleNamespace(workflow_id=7)
)
db_client.get_workflow_by_id = AsyncMock(
return_value=SimpleNamespace(organization_id=11)
)
db_client.update_workflow_run = AsyncMock()
result = await handle_twilio_status_callback(
workflow_run_id=123, request=request
)
assert result == {"status": "success"}
db_client.update_workflow_run.assert_awaited_once_with(
run_id=123,
gathered_context={"answered_by": "machine_start"},
)
@pytest.mark.asyncio
async def test_twilio_status_callback_continues_when_amd_persistence_fails():
provider = _provider()
form_data = {
"CallSid": "CA123",
"CallStatus": "completed",
"AnsweredBy": "machine_start",
}
request = _request(
path="/api/v1/telephony/twilio/status-callback/123",
query={},
form_data=form_data,
headers={
"x-twilio-signature": _signature(
provider,
path="/api/v1/telephony/twilio/status-callback/123",
query={},
form_data=form_data,
)
},
)
with (
patch("api.services.telephony.providers.twilio.routes.db_client") as db_client,
patch(
"api.services.telephony.providers.twilio.routes.get_telephony_provider_for_run",
new_callable=AsyncMock,
return_value=provider,
),
patch(
"api.services.telephony.providers.twilio.routes._process_status_update",
new_callable=AsyncMock,
) as process_status,
):
db_client.get_workflow_run_by_id = AsyncMock(
return_value=SimpleNamespace(workflow_id=7)
)
db_client.get_workflow_by_id = AsyncMock(
return_value=SimpleNamespace(organization_id=11)
)
db_client.update_workflow_run = AsyncMock(side_effect=RuntimeError("db down"))
result = await handle_twilio_status_callback(
workflow_run_id=123, request=request
)
assert result == {"status": "success"}
process_status.assert_awaited_once()