""" Telnyx implementation of the TelephonyProvider interface. Uses the Telnyx Call Control API v2 for outbound calling with inline WebSocket media streaming. """ import base64 import binascii import json import random import time from typing import TYPE_CHECKING, Any, Dict, List, Optional import aiohttp import nacl.exceptions import nacl.signing from fastapi import HTTPException, WebSocketDisconnect from loguru import logger # 5-min replay window — matches Telnyx SDKs (Python/Node/Go/Ruby/PHP); # Source: github.com/team-telnyx/telnyx-python src/telnyx/lib/webhook_verification.py TELNYX_TIMESTAMP_TOLERANCE_SECONDS = 300 # Ed25519 sizes per RFC 8032; Telnyx SDKs check these for clearer errors than PyNaCl. TELNYX_PUBLIC_KEY_BYTES = 32 TELNYX_SIGNATURE_BYTES = 64 from api.enums import WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, ProviderSyncResult, TelephonyProvider, ) from api.utils.common import get_backend_endpoints from api.utils.telephony_address import normalize_telephony_address if TYPE_CHECKING: from fastapi import WebSocket def normalize_event_type(event_type: str) -> str: """Telnyx delivers event types with either dots or underscores (e.g. ``streaming.started`` vs ``streaming_started``). Normalize to the dotted form so all downstream matching can use a single canonical shape. """ return (event_type or "").replace("_", ".") def _get_header(headers: Dict[str, str], name: str) -> str: for key, value in headers.items(): if key.lower() == name: return value return "" class TelnyxProvider(TelephonyProvider): """ Telnyx implementation of TelephonyProvider. Uses the Call Control API v2 with inline WebSocket streaming for audio. """ PROVIDER_NAME = WorkflowRunMode.TELNYX.value WEBHOOK_ENDPOINT = "telnyx/webhook" TELNYX_API_BASE = "https://api.telnyx.com/v2" def __init__(self, config: Dict[str, Any]): self.api_key = config.get("api_key") self.connection_id = config.get("connection_id") self.webhook_public_key = config.get("webhook_public_key") self.from_numbers = config.get("from_numbers", []) if isinstance(self.from_numbers, str): self.from_numbers = [self.from_numbers] def _headers(self) -> Dict[str, str]: return { "Content-Type": "application/json", "Accept": "application/json", "Authorization": f"Bearer {self.api_key}", } async def initiate_call( self, to_number: str, webhook_url: str, workflow_run_id: Optional[int] = None, from_number: Optional[str] = None, **kwargs: Any, ) -> CallInitiationResult: """Initiate an outbound call via Telnyx Call Control API.""" if not self.validate_config(): raise ValueError("Telnyx provider not properly configured") if from_number is None: from_number = random.choice(self.from_numbers) logger.info(f"Selected phone number {from_number} for outbound call") backend_endpoint, wss_backend_endpoint = await get_backend_endpoints() # Build the WebSocket stream URL for inline audio streaming workflow_id = kwargs.get("workflow_id") user_id = kwargs.get("user_id") stream_url = ( f"{wss_backend_endpoint}/api/v1/telephony/ws" f"/{workflow_id}/{user_id}/{workflow_run_id}" ) # Build the webhook URL for status callbacks events_url = ( f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}" ) # stream_bidirectional_codec controls only the Dograh → Telnyx direction. # The Telnyx → Dograh direction follows the PSTN leg and is announced via # media_format.encoding in the WebSocket start message. payload = { "connection_id": self.connection_id, "to": to_number, "from": from_number, "stream_url": stream_url, "stream_track": "inbound_track", "stream_bidirectional_mode": "rtp", "stream_bidirectional_codec": "PCMU", "webhook_url": events_url, "webhook_url_method": "POST", } logger.info( f"Telnyx dial payload: {json.dumps({k: v for k, v in payload.items() if k != 'connection_id'})}" ) endpoint = f"{self.TELNYX_API_BASE}/calls" async with aiohttp.ClientSession() as session: async with session.post( endpoint, json=payload, headers=self._headers() ) as response: if response.status != 200: error_data = await response.json() logger.error(f"Telnyx API error: {error_data}") raise HTTPException( status_code=response.status, detail=json.dumps(error_data) ) response_data = await response.json() data = response_data.get("data", {}) call_control_id = data.get("call_control_id", "") call_leg_id = data.get("call_leg_id", "") call_session_id = data.get("call_session_id", "") logger.info( f"Telnyx call initiated: call_control_id={call_control_id}, " f"call_leg_id={call_leg_id}, call_session_id={call_session_id}" ) return CallInitiationResult( call_id=call_control_id, status="initiated", caller_number=from_number, provider_metadata={ "call_id": call_control_id, "call_control_id": call_control_id, "call_leg_id": call_leg_id, "call_session_id": call_session_id, }, raw_response=response_data, ) async def get_call_status(self, call_id: str) -> Dict[str, Any]: """Get the current status of a Telnyx call.""" endpoint = f"{self.TELNYX_API_BASE}/calls/{call_id}" async with aiohttp.ClientSession() as session: async with session.get(endpoint, headers=self._headers()) as response: if response.status != 200: error_data = await response.json() raise Exception(f"Failed to get call status: {error_data}") return await response.json() async def get_available_phone_numbers(self) -> List[str]: return self.from_numbers def validate_config(self) -> bool: return bool(self.api_key and self.connection_id and self.from_numbers) async def verify_webhook_signature( self, url: str, params: Dict[str, Any], signature: str ) -> bool: """Verify a Telnyx Ed25519 webhook signature. Telnyx signs ``{timestamp}|{json_payload}`` and sends the signature in ``telnyx-signature-ed25519``. The public key is read from provider configuration, not from the request. ``url`` is unused — Telnyx does not sign the request URL; the parameter exists to satisfy the base class interface. Docs: https://developers.telnyx.com/development/api-fundamentals/webhooks/receiving-webhooks """ timestamp = params.get("telnyx_timestamp") or params.get("timestamp") raw_body = params.get("_raw_body", "") if not signature: logger.warning("Telnyx webhook missing telnyx-signature-ed25519 header") return False if not timestamp: logger.warning("Telnyx webhook missing telnyx-timestamp header") return False if not self.webhook_public_key: logger.error("Missing Telnyx webhook_public_key configuration") return False try: ts_int = int(timestamp) except (TypeError, ValueError): logger.warning(f"Invalid Telnyx webhook timestamp format: {timestamp!r}") return False if abs(time.time() - ts_int) > TELNYX_TIMESTAMP_TOLERANCE_SECONDS: logger.warning( f"Telnyx webhook timestamp outside " f"{TELNYX_TIMESTAMP_TOLERANCE_SECONDS}s tolerance: " f"timestamp={ts_int}, now={int(time.time())}" ) return False if isinstance(raw_body, bytes): raw_body = raw_body.decode("utf-8") try: signature_bytes = base64.b64decode(signature, validate=True) except (binascii.Error, ValueError) as e: logger.warning(f"Telnyx webhook signature not valid base64: {e}") return False try: public_key_bytes = base64.b64decode( self.webhook_public_key.strip(), validate=True ) except (binascii.Error, ValueError) as e: logger.error(f"Telnyx webhook_public_key not valid base64: {e}") return False if len(public_key_bytes) != TELNYX_PUBLIC_KEY_BYTES: logger.error( f"Telnyx webhook_public_key wrong length: expected " f"{TELNYX_PUBLIC_KEY_BYTES}, got {len(public_key_bytes)}" ) return False if len(signature_bytes) != TELNYX_SIGNATURE_BYTES: logger.warning( f"Telnyx webhook signature wrong length: expected " f"{TELNYX_SIGNATURE_BYTES}, got {len(signature_bytes)}" ) return False try: verify_key = nacl.signing.VerifyKey(public_key_bytes) signed_payload = f"{timestamp}|{raw_body}".encode("utf-8") verify_key.verify(signed_payload, signature_bytes) return True except nacl.exceptions.BadSignatureError: return False async def get_webhook_response( self, workflow_id: int, user_id: int, workflow_run_id: int ) -> str: """Not used for Telnyx — streaming is inline with the dial request.""" return "" async def get_call_cost(self, call_id: str) -> Dict[str, Any]: """Get cost information for a Telnyx call. Telnyx doesn't provide per-call cost via the Call Control API. Cost data is available through the billing/CDR APIs. """ return { "cost_usd": 0.0, "duration": 0, "status": "unknown", "raw_response": {}, } def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: """Parse Telnyx webhook event data into generic format.""" event_data = data.get("data", data) event_type = normalize_event_type(event_data.get("event_type", "")) payload = event_data.get("payload", {}) status = self._resolve_status(event_type, payload) duration_secs = payload.get("duration_secs") return { "call_id": payload.get("call_control_id", ""), "status": status, "from_number": payload.get("from"), "to_number": payload.get("to"), "direction": payload.get("direction"), "duration": str(duration_secs) if duration_secs else None, "extra": data, } @staticmethod def _resolve_status(event_type: str, payload: Dict[str, Any]) -> str: """Map a Telnyx event type (and hangup cause) to a normalized status.""" EVENT_STATUS = { "call.initiated": "initiated", "call.answered": "in-progress", "call.hangup": "completed", "call.machine.detection.ended": "machine-detected", "streaming.started": "streaming-started", "streaming.stopped": "streaming-stopped", } HANGUP_STATUS = { "busy": "busy", "no_answer": "no-answer", "timeout": "no-answer", "call_rejected": "failed", "unallocated_number": "failed", } status = EVENT_STATUS.get(event_type, event_type) if event_type == "call.hangup": hangup_cause = payload.get("hangup_cause", "") status = HANGUP_STATUS.get(hangup_cause, status) return status async def handle_websocket( self, websocket: "WebSocket", workflow_id: int, user_id: int, workflow_run_id: int, ) -> None: """Handle Telnyx WebSocket connection for real-time audio. Telnyx sends: 1. "connected" event on WebSocket open 2. "start" event with stream_id, call_control_id, media_format 3. "media" events with base64-encoded audio """ from api.services.pipecat.run_pipeline import run_pipeline_telephony try: # Wait for "connected" event first_msg = await websocket.receive_text() msg = json.loads(first_msg) if msg.get("event") != "connected": logger.error(f"Expected 'connected' event, got: {msg.get('event')}") await websocket.close(code=4400, reason="Expected connected event") return logger.debug( f"Telnyx WebSocket connected for workflow_run {workflow_run_id}" ) # Wait for "start" event with stream details start_msg = await websocket.receive_text() logger.debug(f"Received start message: {start_msg}") start_data = json.loads(start_msg) if start_data.get("event") != "start": logger.error("Expected 'start' event second") await websocket.close(code=4400, reason="Expected start event") return # media_format.encoding is the codec Telnyx delivers on the # inbound direction (Telnyx → Dograh); the outbound direction is # pinned to PCMU separately via stream_bidirectional_codec. try: stream_id = start_data.get("stream_id", "") start_info = start_data.get("start", {}) call_control_id = start_info.get("call_control_id", "") media_format = start_info.get("media_format") or {} encoding = media_format.get("encoding") or "PCMU" except (KeyError, AttributeError): logger.error("Missing stream_id or call_control_id in start message") await websocket.close(code=4400, reason="Missing stream identifiers") return if not stream_id or not call_control_id: logger.error( f"Empty stream identifiers: stream_id={stream_id}, " f"call_control_id={call_control_id}" ) await websocket.close(code=4400, reason="Missing stream identifiers") return logger.info( f"Telnyx stream started: stream_id={stream_id}, " f"call_control_id={call_control_id}, encoding={encoding}" ) await run_pipeline_telephony( websocket, provider_name=self.PROVIDER_NAME, workflow_id=workflow_id, workflow_run_id=workflow_run_id, user_id=user_id, call_id=call_control_id, transport_kwargs={ "stream_id": stream_id, "call_control_id": call_control_id, "encoding": encoding, }, ) except WebSocketDisconnect as e: # Telnyx opens the WebSocket during `bridging` (pre-answer) but only # sends the `start` event on `call.answered`. If the call ends before # answer (no-answer timeout, busy, declined), Telnyx closes the # socket abruptly — surface this as an expected end-of-call. logger.info( f"[run {workflow_run_id}] Telnyx WebSocket closed before stream start " f"(call ended pre-answer): code={e.code}, reason={e.reason!r}" ) except Exception as e: logger.error(f"Error in Telnyx WebSocket handler: {e}") raise async def answer_and_stream( self, call_control_id: str, stream_url: str, webhook_url: str ) -> None: """Answer an inbound Telnyx call and start WebSocket streaming inline. This is Telnyx-specific: unlike Twilio/Vobiz where you return XML in the webhook response, Telnyx requires an explicit REST API call to answer the call and set up streaming. Args: call_control_id: The call_control_id from the inbound webhook stream_url: WebSocket URL for bidirectional audio streaming webhook_url: URL for status callback events """ endpoint = f"{self.TELNYX_API_BASE}/calls/{call_control_id}/actions/answer" payload = { "stream_url": stream_url, "stream_track": "inbound_track", "stream_bidirectional_mode": "rtp", "stream_bidirectional_codec": "PCMU", "webhook_url": webhook_url, "webhook_url_method": "POST", } logger.info( f"Answering Telnyx inbound call {call_control_id} with stream_url={stream_url}" ) async with aiohttp.ClientSession() as session: async with session.post( endpoint, json=payload, headers=self._headers() ) as response: if response.status != 200: error_data = await response.text() logger.error( f"Failed to answer Telnyx call {call_control_id}: " f"status={response.status}, response={error_data}" ) raise Exception( f"Failed to answer Telnyx call: {response.status} {error_data}" ) logger.info(f"Successfully answered Telnyx call {call_control_id}") # ======== INBOUND CALL METHODS ======== @classmethod def can_handle_webhook( cls, webhook_data: Dict[str, Any], headers: Dict[str, str] ) -> bool: """Detect if a webhook is from Telnyx. Telnyx webhooks have a nested data.event_type structure and may include a telnyx-signature-ed25519 header. """ if "telnyx-signature-ed25519" in headers: return True # Check for Telnyx event structure data = webhook_data.get("data", {}) if data.get("record_type") == "event" and "event_type" in data: event_type = normalize_event_type(data.get("event_type", "")) if event_type.startswith("call."): return True return False @staticmethod def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData: """Parse Telnyx inbound webhook into normalized format.""" data = webhook_data.get("data", webhook_data) payload = data.get("payload", {}) # Telnyx uses "incoming" for inbound — normalize to "inbound" direction = payload.get("direction", "") if direction == "incoming": direction = "inbound" from_raw = payload.get("from", "") to_raw = payload.get("to", "") return NormalizedInboundData( provider=TelnyxProvider.PROVIDER_NAME, call_id=payload.get("call_control_id", ""), from_number=( normalize_telephony_address(from_raw).canonical if from_raw else "" ), to_number=normalize_telephony_address(to_raw).canonical if to_raw else "", direction=direction, call_status=normalize_event_type(data.get("event_type", "")), account_id=payload.get("connection_id"), raw_data=webhook_data, ) @staticmethod def validate_account_id(config_data: dict, webhook_account_id: str) -> bool: """Validate that the connection_id from webhook matches configuration.""" if not webhook_account_id: return False return config_data.get("connection_id") == webhook_account_id async def verify_inbound_signature( self, url: str, webhook_data: Dict[str, Any], headers: Dict[str, str], body: str = "", ) -> bool: """Verify the signature of an inbound Telnyx webhook.""" signature = _get_header(headers, "telnyx-signature-ed25519") timestamp = _get_header(headers, "telnyx-timestamp") return await self.verify_webhook_signature( url, {"telnyx_timestamp": timestamp, "_raw_body": body}, signature, ) async def configure_inbound( self, address: str, webhook_url: Optional[str] ) -> ProviderSyncResult: """Update webhook_event_url on the Telnyx Call Control Application. PATCH requires application_name even on partial updates, so we GET first to preserve whatever name the user set in the cockpit. The URL is shared across every number on the application — clearing is a no-op to avoid silently breaking inbound for sibling numbers. """ if webhook_url is None: logger.info( f"Telnyx configure_inbound clear for {address}: skipping " f"application update (webhook_event_url is shared across all " f"numbers on Call Control Application {self.connection_id})" ) return ProviderSyncResult(ok=True) if not self.validate_config(): return ProviderSyncResult( ok=False, message="Telnyx provider not properly configured" ) if not self.connection_id: return ProviderSyncResult( ok=False, message=( "Telnyx connection_id (Call Control Application ID) is " "not configured. Set it in the telephony configuration " "so inbound webhooks can be synced to the right " "application." ), ) app_endpoint = ( f"{self.TELNYX_API_BASE}/call_control_applications/{self.connection_id}" ) try: async with aiohttp.ClientSession() as session: async with session.get( app_endpoint, headers=self._headers() ) as response: if response.status != 200: body = await response.text() logger.error( f"Failed to fetch Telnyx Call Control Application " f"{self.connection_id}: {response.status} {body}" ) return ProviderSyncResult( ok=False, message=f"Telnyx API {response.status}: {body}", ) app_data = await response.json() except Exception as e: logger.error( f"Exception fetching Telnyx Call Control Application " f"{self.connection_id}: {e}" ) return ProviderSyncResult(ok=False, message=f"Telnyx lookup failed: {e}") application_name = (app_data.get("data") or {}).get("application_name") if not application_name: return ProviderSyncResult( ok=False, message=( f"Telnyx Call Control Application {self.connection_id} " f"did not return an application_name; cannot PATCH " f"without it." ), ) update_body = { "application_name": application_name, "webhook_event_url": webhook_url, } try: async with aiohttp.ClientSession() as session: async with session.patch( app_endpoint, json=update_body, headers=self._headers() ) as response: if response.status != 200: body = await response.text() logger.error( f"Telnyx Call Control Application update failed " f"for {self.connection_id}: {response.status} " f"{body}" ) return ProviderSyncResult( ok=False, message=f"Telnyx API {response.status}: {body}", ) except Exception as e: logger.error( f"Exception updating Telnyx Call Control Application " f"{self.connection_id}: {e}" ) return ProviderSyncResult(ok=False, message=f"Telnyx update failed: {e}") logger.info( f"Telnyx webhook_event_url set on Call Control Application " f"{self.connection_id} (triggered by address {address})" ) return ProviderSyncResult(ok=True) async def start_inbound_stream( self, *, websocket_url: str, workflow_run_id: int, normalized_data, backend_endpoint: str, ): """Answer the inbound Telnyx call via Call Control and start streaming. Unlike markup-response providers, Telnyx ignores webhook response bodies for call control — the call must be answered with a REST call back to Telnyx before media can flow. We do that here and return a simple acknowledgement; on failure, return the ANSWER_FAILED error response so the route stays provider-agnostic. """ events_url = ( f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}" ) try: await self.answer_and_stream( call_control_id=normalized_data.call_id, stream_url=websocket_url, webhook_url=events_url, ) except Exception as e: logger.error(f"Failed to answer Telnyx inbound call: {e}") return self.generate_error_response( "ANSWER_FAILED", "Failed to answer call" ) return {"status": "ok"} @staticmethod def generate_error_response(error_type: str, message: str) -> tuple: from fastapi import Response return Response( content=json.dumps({"error": error_type, "message": message}), media_type="application/json", ) @staticmethod def generate_validation_error_response(error_type) -> tuple: from fastapi import Response from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError message = TELEPHONY_ERROR_MESSAGES.get( error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED] ) return Response( content=json.dumps({"error": str(error_type), "message": message}), media_type="application/json", ) # ======== CALL TRANSFER METHODS ======== async def transfer_call( self, destination: str, transfer_id: str, conference_name: str, timeout: int = 30, **kwargs: Any, ) -> Dict[str, Any]: """Dial the destination as a plain call; conference is seeded later. Webhook (``call.answered``) seeds the conference with this leg; ``TelnyxConferenceStrategy`` joins the caller on pipeline teardown. https://developers.telnyx.com/api-reference/call-commands/dial """ if not self.validate_config(): raise ValueError("Telnyx provider not properly configured") from_number = random.choice(self.from_numbers) logger.info(f"Selected phone number {from_number} for Telnyx transfer call") backend_endpoint, _ = await get_backend_endpoints() webhook_url = ( f"{backend_endpoint}/api/v1/telephony/telnyx/transfer-result/{transfer_id}" ) payload = { "connection_id": self.connection_id, "to": destination, "from": from_number, "timeout_secs": timeout, "webhook_url": webhook_url, "webhook_url_method": "POST", } payload.update(kwargs) endpoint = f"{self.TELNYX_API_BASE}/calls" logger.debug( f"Telnyx transfer dial payload: " f"{json.dumps({k: v for k, v in payload.items() if k != 'connection_id'})}" ) try: async with aiohttp.ClientSession() as session: async with session.post( endpoint, json=payload, headers=self._headers() ) as response: response_text = await response.text() if response.status != 200: logger.error( f"Telnyx transfer dial failed: " f"status={response.status} body={response_text}" ) raise Exception( f"Telnyx transfer dial failed: " f"status={response.status} body={response_text}" ) response_data = json.loads(response_text) data = response_data.get("data", {}) call_control_id = data.get("call_control_id", "") logger.info( f"Telnyx transfer dial initiated: " f"call_control_id={call_control_id}, " f"to={destination}, conference_name={conference_name}" ) return { "call_sid": call_control_id, "status": "initiated", "provider": self.PROVIDER_NAME, "from_number": from_number, "to_number": destination, "raw_response": response_data, } except Exception as e: logger.error(f"Exception during Telnyx transfer dial: {e}") raise def supports_transfers(self) -> bool: return True async def create_conference( self, seed_call_control_id: str, name: str ) -> Optional[str]: """Seed a Telnyx conference with an existing call leg. Used by the transfer flow on ``call.answered`` to put the destination leg into a conference immediately. The returned ``conference_id`` is stored on the ``TransferContext`` so the strategy can later join the caller. https://developers.telnyx.com/api-reference/conference-commands/create-conference """ if not self.api_key: logger.error("Cannot create Telnyx conference: api_key missing") return None endpoint = f"{self.TELNYX_API_BASE}/conferences" payload = { "call_control_id": seed_call_control_id, "name": name, "start_conference_on_create": True, } try: async with aiohttp.ClientSession() as session: async with session.post( endpoint, json=payload, headers=self._headers() ) as response: body = await response.text() if response.status != 200: logger.error( f"Telnyx create_conference failed: " f"status={response.status} body={body}" ) return None data = json.loads(body).get("data", {}) conference_id = data.get("id") if not conference_id: logger.error( f"Telnyx create_conference response missing id: {body}" ) return None logger.info( f"Telnyx conference {conference_id} created (name={name}, " f"seeded with {seed_call_control_id})" ) return conference_id except Exception as e: logger.error(f"Exception during Telnyx create_conference: {e}") return None