dograh/api/services/telephony/providers/telnyx/provider.py
Sabiha Khan 085ab0a7ae
fix: honor telnyxs per-call codec in bidirectional stream (#256)
Telnyx's bidirectional stream uses different codecs per direction:
  - Dograh → Telnyx: what we declare via `stream_bidirectional_codec`
  - Telnyx → Dograh: whatever the PSTN leg negotiated (PCMA for UK,
    Europe, India termination; PCMU for US), announced as
    `media_format.encoding` on the WebSocket start message.

We hardcoded both directions on the serializer to PCMU, so any call
whose PSTN leg used PCMA arrived as A-law bytes that we decoded
through a μ-law table → static for the entire call.

`handle_websocket` now extracts `media_format.encoding` from the
start message and threads it through `transport_kwargs`. The
serializer uses it for the Telnyx → Dograh direction; the
Dograh → Telnyx direction stays pinned to PCMU to match the
unchanged `stream_bidirectional_codec` in the dial/answer payloads.

Note: pipecat's `TelnyxFrameSerializer` names its params from the
call's POV, not Dograh's — `inbound_encoding` is what we send into
the call, `outbound_encoding` is what we receive. Easy to mix up.
2026-04-29 19:20:52 +05:30

611 lines
23 KiB
Python

"""
Telnyx implementation of the TelephonyProvider interface.
Uses the Telnyx Call Control API v2 for outbound calling with
inline WebSocket media streaming.
"""
import json
import random
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
NormalizedInboundData,
ProviderSyncResult,
TelephonyProvider,
)
from api.utils.common import get_backend_endpoints
if TYPE_CHECKING:
from fastapi import WebSocket
class TelnyxProvider(TelephonyProvider):
"""
Telnyx implementation of TelephonyProvider.
Uses the Call Control API v2 with inline WebSocket streaming for audio.
"""
PROVIDER_NAME = WorkflowRunMode.TELNYX.value
WEBHOOK_ENDPOINT = "telnyx/webhook"
TELNYX_API_BASE = "https://api.telnyx.com/v2"
def __init__(self, config: Dict[str, Any]):
self.api_key = config.get("api_key")
self.connection_id = config.get("connection_id")
self.from_numbers = config.get("from_numbers", [])
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
def _headers(self) -> Dict[str, str]:
return {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""Initiate an outbound call via Telnyx Call Control API."""
if not self.validate_config():
raise ValueError("Telnyx provider not properly configured")
if from_number is None:
from_number = random.choice(self.from_numbers)
logger.info(f"Selected phone number {from_number} for outbound call")
backend_endpoint, wss_backend_endpoint = await get_backend_endpoints()
# Build the WebSocket stream URL for inline audio streaming
workflow_id = kwargs.get("workflow_id")
user_id = kwargs.get("user_id")
stream_url = (
f"{wss_backend_endpoint}/api/v1/telephony/ws"
f"/{workflow_id}/{user_id}/{workflow_run_id}"
)
# Build the webhook URL for status callbacks
events_url = (
f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}"
)
# stream_bidirectional_codec controls only the Dograh → Telnyx direction.
# The Telnyx → Dograh direction follows the PSTN leg and is announced via
# media_format.encoding in the WebSocket start message.
payload = {
"connection_id": self.connection_id,
"to": to_number,
"from": from_number,
"stream_url": stream_url,
"stream_track": "inbound_track",
"stream_bidirectional_mode": "rtp",
"stream_bidirectional_codec": "PCMU",
"webhook_url": events_url,
"webhook_url_method": "POST",
}
logger.info(
f"Telnyx dial payload: {json.dumps({k: v for k, v in payload.items() if k != 'connection_id'})}"
)
endpoint = f"{self.TELNYX_API_BASE}/calls"
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, json=payload, headers=self._headers()
) as response:
if response.status != 200:
error_data = await response.json()
logger.error(f"Telnyx API error: {error_data}")
raise HTTPException(
status_code=response.status, detail=json.dumps(error_data)
)
response_data = await response.json()
data = response_data.get("data", {})
call_control_id = data.get("call_control_id", "")
call_leg_id = data.get("call_leg_id", "")
call_session_id = data.get("call_session_id", "")
logger.info(
f"Telnyx call initiated: call_control_id={call_control_id}, "
f"call_leg_id={call_leg_id}, call_session_id={call_session_id}"
)
return CallInitiationResult(
call_id=call_control_id,
status="initiated",
caller_number=from_number,
provider_metadata={
"call_control_id": call_control_id,
"call_leg_id": call_leg_id,
"call_session_id": call_session_id,
},
raw_response=response_data,
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
"""Get the current status of a Telnyx call."""
endpoint = f"{self.TELNYX_API_BASE}/calls/{call_id}"
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers=self._headers()) as response:
if response.status != 200:
error_data = await response.json()
raise Exception(f"Failed to get call status: {error_data}")
return await response.json()
async def get_available_phone_numbers(self) -> List[str]:
return self.from_numbers
def validate_config(self) -> bool:
return bool(self.api_key and self.connection_id and self.from_numbers)
async def verify_webhook_signature(
self, url: str, params: Dict[str, Any], signature: str
) -> bool:
"""Required by the abstract interface but not actively called for Telnyx.
Telnyx webhook signature verification uses Ed25519 (via the
telnyx-signature-ed25519 header). This can be implemented in the
future using the Telnyx SDK if needed.
"""
return True
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
"""Not used for Telnyx — streaming is inline with the dial request."""
return ""
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
"""Get cost information for a Telnyx call.
Telnyx doesn't provide per-call cost via the Call Control API.
Cost data is available through the billing/CDR APIs.
"""
return {
"cost_usd": 0.0,
"duration": 0,
"status": "unknown",
"raw_response": {},
}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Telnyx webhook event data into generic format."""
event_data = data.get("data", data)
event_type = event_data.get("event_type", "")
payload = event_data.get("payload", {})
status = self._resolve_status(event_type, payload)
duration_secs = payload.get("duration_secs")
return {
"call_id": payload.get("call_control_id", ""),
"status": status,
"from_number": payload.get("from"),
"to_number": payload.get("to"),
"direction": payload.get("direction"),
"duration": str(duration_secs) if duration_secs else None,
"extra": data,
}
@staticmethod
def _resolve_status(event_type: str, payload: Dict[str, Any]) -> str:
"""Map a Telnyx event type (and hangup cause) to a normalized status."""
EVENT_STATUS = {
"call.initiated": "initiated",
"call.answered": "in-progress",
"call.hangup": "completed",
"call.machine.detection.ended": "machine-detected",
"streaming.started": "streaming-started",
"streaming.stopped": "streaming-stopped",
}
HANGUP_STATUS = {
"busy": "busy",
"no_answer": "no-answer",
"timeout": "no-answer",
"call_rejected": "failed",
"unallocated_number": "failed",
}
status = EVENT_STATUS.get(event_type, event_type)
if event_type == "call.hangup":
hangup_cause = payload.get("hangup_cause", "")
status = HANGUP_STATUS.get(hangup_cause, status)
return status
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""Handle Telnyx WebSocket connection for real-time audio.
Telnyx sends:
1. "connected" event on WebSocket open
2. "start" event with stream_id, call_control_id, media_format
3. "media" events with base64-encoded audio
"""
from api.services.pipecat.run_pipeline import run_pipeline_telephony
try:
# Wait for "connected" event
first_msg = await websocket.receive_text()
msg = json.loads(first_msg)
if msg.get("event") != "connected":
logger.error(f"Expected 'connected' event, got: {msg.get('event')}")
await websocket.close(code=4400, reason="Expected connected event")
return
logger.debug(
f"Telnyx WebSocket connected for workflow_run {workflow_run_id}"
)
# Wait for "start" event with stream details
start_msg = await websocket.receive_text()
logger.debug(f"Received start message: {start_msg}")
start_data = json.loads(start_msg)
if start_data.get("event") != "start":
logger.error("Expected 'start' event second")
await websocket.close(code=4400, reason="Expected start event")
return
# media_format.encoding is the codec Telnyx delivers on the
# inbound direction (Telnyx → Dograh); the outbound direction is
# pinned to PCMU separately via stream_bidirectional_codec.
try:
stream_id = start_data.get("stream_id", "")
start_info = start_data.get("start", {})
call_control_id = start_info.get("call_control_id", "")
media_format = start_info.get("media_format") or {}
encoding = media_format.get("encoding") or "PCMU"
except (KeyError, AttributeError):
logger.error("Missing stream_id or call_control_id in start message")
await websocket.close(code=4400, reason="Missing stream identifiers")
return
if not stream_id or not call_control_id:
logger.error(
f"Empty stream identifiers: stream_id={stream_id}, "
f"call_control_id={call_control_id}"
)
await websocket.close(code=4400, reason="Missing stream identifiers")
return
logger.info(
f"Telnyx stream started: stream_id={stream_id}, "
f"call_control_id={call_control_id}, encoding={encoding}"
)
await run_pipeline_telephony(
websocket,
provider_name=self.PROVIDER_NAME,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
user_id=user_id,
call_id=call_control_id,
transport_kwargs={
"stream_id": stream_id,
"call_control_id": call_control_id,
"encoding": encoding,
},
)
except Exception as e:
logger.error(f"Error in Telnyx WebSocket handler: {e}")
raise
async def answer_and_stream(
self, call_control_id: str, stream_url: str, webhook_url: str
) -> None:
"""Answer an inbound Telnyx call and start WebSocket streaming inline.
This is Telnyx-specific: unlike Twilio/Vobiz where you return XML in the
webhook response, Telnyx requires an explicit REST API call to answer
the call and set up streaming.
Args:
call_control_id: The call_control_id from the inbound webhook
stream_url: WebSocket URL for bidirectional audio streaming
webhook_url: URL for status callback events
"""
endpoint = f"{self.TELNYX_API_BASE}/calls/{call_control_id}/actions/answer"
payload = {
"stream_url": stream_url,
"stream_track": "inbound_track",
"stream_bidirectional_mode": "rtp",
"stream_bidirectional_codec": "PCMU",
"webhook_url": webhook_url,
"webhook_url_method": "POST",
}
logger.info(
f"Answering Telnyx inbound call {call_control_id} with stream_url={stream_url}"
)
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, json=payload, headers=self._headers()
) as response:
if response.status != 200:
error_data = await response.text()
logger.error(
f"Failed to answer Telnyx call {call_control_id}: "
f"status={response.status}, response={error_data}"
)
raise Exception(
f"Failed to answer Telnyx call: {response.status} {error_data}"
)
logger.info(f"Successfully answered Telnyx call {call_control_id}")
# ======== INBOUND CALL METHODS ========
@classmethod
def can_handle_webhook(
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
) -> bool:
"""Detect if a webhook is from Telnyx.
Telnyx webhooks have a nested data.event_type structure
and may include a telnyx-signature-ed25519 header.
"""
if "telnyx-signature-ed25519" in headers:
return True
# Check for Telnyx event structure
data = webhook_data.get("data", {})
if data.get("record_type") == "event" and "event_type" in data:
event_type = data.get("event_type", "")
if event_type.startswith("call."):
return True
return False
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
"""Parse Telnyx inbound webhook into normalized format."""
data = webhook_data.get("data", webhook_data)
payload = data.get("payload", {})
# Telnyx uses "incoming" for inbound — normalize to "inbound"
direction = payload.get("direction", "")
if direction == "incoming":
direction = "inbound"
return NormalizedInboundData(
provider=TelnyxProvider.PROVIDER_NAME,
call_id=payload.get("call_control_id", ""),
from_number=TelnyxProvider.normalize_phone_number(payload.get("from", "")),
to_number=TelnyxProvider.normalize_phone_number(payload.get("to", "")),
direction=direction,
call_status=data.get("event_type", ""),
account_id=payload.get("connection_id"),
raw_data=webhook_data,
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
"""Validate that the connection_id from webhook matches configuration."""
if not webhook_account_id:
return False
return config_data.get("connection_id") == webhook_account_id
@staticmethod
def normalize_phone_number(phone_number: str) -> str:
"""Normalize phone number to E.164 format.
Telnyx already provides numbers in E.164 format
"""
return phone_number or ""
async def verify_inbound_signature(
self,
url: str,
webhook_data: Dict[str, Any],
headers: Dict[str, str],
body: str = "",
) -> bool:
"""Required by the abstract interface. Telnyx signature verification
(Ed25519 via ``telnyx-signature-ed25519``) is not yet implemented —
accepts all inbound webhooks for now.
"""
return True
async def configure_inbound(
self, address: str, webhook_url: Optional[str]
) -> ProviderSyncResult:
"""Update webhook_event_url on the Telnyx Call Control Application.
PATCH requires application_name even on partial updates, so we GET
first to preserve whatever name the user set in the cockpit. The URL
is shared across every number on the application — clearing is a
no-op to avoid silently breaking inbound for sibling numbers.
"""
if webhook_url is None:
logger.info(
f"Telnyx configure_inbound clear for {address}: skipping "
f"application update (webhook_event_url is shared across all "
f"numbers on Call Control Application {self.connection_id})"
)
return ProviderSyncResult(ok=True)
if not self.validate_config():
return ProviderSyncResult(
ok=False, message="Telnyx provider not properly configured"
)
if not self.connection_id:
return ProviderSyncResult(
ok=False,
message=(
"Telnyx connection_id (Call Control Application ID) is "
"not configured. Set it in the telephony configuration "
"so inbound webhooks can be synced to the right "
"application."
),
)
app_endpoint = (
f"{self.TELNYX_API_BASE}/call_control_applications/{self.connection_id}"
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(
app_endpoint, headers=self._headers()
) as response:
if response.status != 200:
body = await response.text()
logger.error(
f"Failed to fetch Telnyx Call Control Application "
f"{self.connection_id}: {response.status} {body}"
)
return ProviderSyncResult(
ok=False,
message=f"Telnyx API {response.status}: {body}",
)
app_data = await response.json()
except Exception as e:
logger.error(
f"Exception fetching Telnyx Call Control Application "
f"{self.connection_id}: {e}"
)
return ProviderSyncResult(ok=False, message=f"Telnyx lookup failed: {e}")
application_name = (app_data.get("data") or {}).get("application_name")
if not application_name:
return ProviderSyncResult(
ok=False,
message=(
f"Telnyx Call Control Application {self.connection_id} "
f"did not return an application_name; cannot PATCH "
f"without it."
),
)
update_body = {
"application_name": application_name,
"webhook_event_url": webhook_url,
}
try:
async with aiohttp.ClientSession() as session:
async with session.patch(
app_endpoint, json=update_body, headers=self._headers()
) as response:
if response.status != 200:
body = await response.text()
logger.error(
f"Telnyx Call Control Application update failed "
f"for {self.connection_id}: {response.status} "
f"{body}"
)
return ProviderSyncResult(
ok=False,
message=f"Telnyx API {response.status}: {body}",
)
except Exception as e:
logger.error(
f"Exception updating Telnyx Call Control Application "
f"{self.connection_id}: {e}"
)
return ProviderSyncResult(ok=False, message=f"Telnyx update failed: {e}")
logger.info(
f"Telnyx webhook_event_url set on Call Control Application "
f"{self.connection_id} (triggered by address {address})"
)
return ProviderSyncResult(ok=True)
async def start_inbound_stream(
self,
*,
websocket_url: str,
workflow_run_id: int,
normalized_data,
backend_endpoint: str,
):
"""Answer the inbound Telnyx call via Call Control and start streaming.
Unlike markup-response providers, Telnyx ignores webhook response
bodies for call control — the call must be answered with a REST
call back to Telnyx before media can flow. We do that here and
return a simple acknowledgement; on failure, return the
ANSWER_FAILED error response so the route stays provider-agnostic.
"""
events_url = (
f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}"
)
try:
await self.answer_and_stream(
call_control_id=normalized_data.call_id,
stream_url=websocket_url,
webhook_url=events_url,
)
except Exception as e:
logger.error(f"Failed to answer Telnyx inbound call: {e}")
return self.generate_error_response(
"ANSWER_FAILED", "Failed to answer call"
)
return {"status": "ok"}
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:
from fastapi import Response
return Response(
content=json.dumps({"error": error_type, "message": message}),
media_type="application/json",
)
@staticmethod
def generate_validation_error_response(error_type) -> tuple:
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
)
return Response(
content=json.dumps({"error": str(error_type), "message": message}),
media_type="application/json",
)
# ======== CALL TRANSFER METHODS ========
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""Telnyx call transfer is not yet implemented."""
raise NotImplementedError("Call transfer not yet supported for Telnyx")
def supports_transfers(self) -> bool:
return False