feat: integrate Telnyx telephony for outbound and inbound calling (#206)

* feat: integrate Telnyx telephony for outbound and inbound calling

* chore: remove redundant code

---------

Co-authored-by: Abhishek <abhishek@a6k.me>
This commit is contained in:
Sabiha Khan 2026-03-25 18:01:41 +05:30 committed by GitHub
parent dc800bdd63
commit 5b820cb0ba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1050 additions and 12 deletions

View file

@ -0,0 +1,69 @@
"""add telnyx mode
Revision ID: b3a1c7e94f12
Revises: e54ddb048535
Create Date: 2026-03-24 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
from alembic_postgresql_enum import TableReference
# revision identifiers, used by Alembic.
revision: str = "b3a1c7e94f12"
down_revision: Union[str, None] = "e54ddb048535"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.sync_enum_values(
enum_schema="public",
enum_name="workflow_run_mode",
new_values=[
"ari",
"twilio",
"vonage",
"vobiz",
"cloudonix",
"telnyx",
"webrtc",
"smallwebrtc",
"stasis",
"VOICE",
"CHAT",
],
affected_columns=[
TableReference(
table_schema="public", table_name="workflow_runs", column_name="mode"
)
],
enum_values_to_rename=[],
)
def downgrade() -> None:
op.sync_enum_values(
enum_schema="public",
enum_name="workflow_run_mode",
new_values=[
"ari",
"twilio",
"vonage",
"vobiz",
"cloudonix",
"webrtc",
"smallwebrtc",
"stasis",
"VOICE",
"CHAT",
],
affected_columns=[
TableReference(
table_schema="public", table_name="workflow_runs", column_name="mode"
)
],
enum_values_to_rename=[],
)

View file

@ -23,6 +23,7 @@ class WorkflowRunMode(Enum):
VONAGE = "vonage"
VOBIZ = "vobiz"
CLOUDONIX = "cloudonix"
TELNYX = "telnyx"
WEBRTC = "webrtc"
SMALLWEBRTC = "smallwebrtc"

View file

@ -13,6 +13,8 @@ from api.schemas.telephony_config import (
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
TelephonyConfigurationResponse,
TelnyxConfigurationRequest,
TelnyxConfigurationResponse,
TwilioConfigurationRequest,
TwilioConfigurationResponse,
VobizConfigurationRequest,
@ -33,6 +35,7 @@ PROVIDER_MASKED_FIELDS = {
"vobiz": ["auth_id", "auth_token"],
"cloudonix": ["bearer_token"],
"ari": ["app_password"],
"telnyx": ["api_key"],
}
@ -149,6 +152,19 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
from_numbers=from_numbers,
),
)
elif stored_provider == "telnyx":
api_key = config.value.get("api_key", "")
connection_id = config.value.get("connection_id", "")
from_numbers = config.value.get("from_numbers", [])
return TelephonyConfigurationResponse(
telnyx=TelnyxConfigurationResponse(
provider="telnyx",
api_key=mask_key(api_key) if api_key else "",
connection_id=connection_id,
from_numbers=from_numbers,
),
)
else:
return TelephonyConfigurationResponse()
@ -161,6 +177,7 @@ async def save_telephony_configuration(
VobizConfigurationRequest,
CloudonixConfigurationRequest,
ARIConfigurationRequest,
TelnyxConfigurationRequest,
],
user: UserModel = Depends(get_user),
):
@ -205,6 +222,13 @@ async def save_telephony_configuration(
"domain_id": request.domain_id,
"from_numbers": request.from_numbers,
}
elif request.provider == "telnyx":
config_value = {
"provider": "telnyx",
"api_key": request.api_key,
"connection_id": request.connection_id,
"from_numbers": request.from_numbers,
}
elif request.provider == "ari":
config_value = {
"provider": "ari",

View file

@ -318,6 +318,7 @@ async def _validate_inbound_request(
x_vobiz_signature: str = None,
x_vobiz_timestamp: str = None,
x_cx_apikey: str = None,
telnyx_signature: str = None,
) -> tuple[bool, TelephonyError, dict, object]:
"""
Validate all aspects of inbound request.
@ -351,7 +352,7 @@ async def _validate_inbound_request(
# Verify webhook signature/API key if provided
provider_instance = None
if x_twilio_signature or x_vobiz_signature or x_cx_apikey:
if x_twilio_signature or x_vobiz_signature or x_cx_apikey or telnyx_signature:
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}"
@ -377,6 +378,11 @@ async def _validate_inbound_request(
signature_valid = await provider_instance.verify_inbound_signature(
webhook_url, webhook_data, x_cx_apikey
)
elif provider_class.PROVIDER_NAME == "telnyx" and telnyx_signature:
logger.info(f"Verifying Telnyx signature for URL: {webhook_url}")
signature_valid = await provider_instance.verify_inbound_signature(
webhook_url, webhook_data, telnyx_signature
)
else:
logger.warning(
f"No signature/API key validation for provider {provider_class.PROVIDER_NAME}"
@ -818,6 +824,63 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq
)
@router.post("/telnyx/events/{workflow_run_id}")
async def handle_telnyx_events(
request: Request,
workflow_run_id: int,
):
"""Handle Telnyx Call Control webhook events.
Telnyx sends all call lifecycle events (call.initiated, call.answered,
call.hangup, streaming.started, streaming.stopped) as JSON POST requests.
"""
set_current_run_id(workflow_run_id)
event_data = await request.json()
logger.info(
f"[run {workflow_run_id}] Received Telnyx event: {json.dumps(event_data)}"
)
# Extract event type from Telnyx envelope
data = event_data.get("data", {})
event_type = data.get("event_type", "")
# Skip streaming events — they're informational only
if event_type in ("streaming.started", "streaming.stopped"):
logger.debug(f"[run {workflow_run_id}] Telnyx streaming event: {event_type}")
return {"status": "success"}
# Get workflow run and provider
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(f"Workflow run {workflow_run_id} not found for Telnyx event")
return {"status": "ignored", "reason": "workflow_run_not_found"}
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(f"Workflow {workflow_run.workflow_id} not found")
return {"status": "ignored", "reason": "workflow_not_found"}
provider = await get_telephony_provider(workflow.organization_id)
# Parse the callback data into generic format
parsed_data = provider.parse_status_callback(event_data)
status_update = StatusCallbackRequest(
call_id=parsed_data["call_id"],
status=parsed_data["status"],
from_number=parsed_data.get("from_number"),
to_number=parsed_data.get("to_number"),
direction=parsed_data.get("direction"),
duration=parsed_data.get("duration"),
extra=parsed_data.get("extra", {}),
)
await _process_status_update(workflow_run_id, status_update)
return {"status": "success"}
@router.post("/vonage/events/{workflow_run_id}")
async def handle_vonage_events(
request: Request,
@ -1355,6 +1418,7 @@ async def handle_inbound_telephony(
x_vobiz_signature: Optional[str] = Header(None),
x_vobiz_timestamp: Optional[str] = Header(None),
x_cx_apikey: Optional[str] = Header(None),
telnyx_signature: Optional[str] = Header(None, alias="telnyx-signature-ed25519"),
):
"""Handle inbound telephony calls from any supported provider with common processing"""
logger.info(f"Inbound call received for workflow_id: {workflow_id}")
@ -1409,6 +1473,7 @@ async def handle_inbound_telephony(
x_vobiz_signature,
x_vobiz_timestamp,
x_cx_apikey,
telnyx_signature,
)
if not is_valid:
@ -1436,8 +1501,38 @@ async def handle_inbound_telephony(
)
# Generate response URLs
_, wss_backend_endpoint = await get_backend_endpoints()
backend_endpoint, wss_backend_endpoint = await get_backend_endpoints()
websocket_url = f"{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{workflow_context['user_id']}/{workflow_run_id}"
# Telnyx requires answering the call via REST API (not via webhook response)
if provider_class.PROVIDER_NAME == "telnyx":
# Get provider instance with credentials if not already loaded
if not provider_instance:
provider_instance = await get_telephony_provider(
workflow_context["organization_id"]
)
events_url = (
f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}"
)
try:
await provider_instance.answer_and_stream(
call_control_id=normalized_data.call_id,
stream_url=websocket_url,
webhook_url=events_url,
)
except Exception as e:
logger.error(f"Failed to answer Telnyx inbound call: {e}")
return provider_class.generate_error_response(
"ANSWER_FAILED", "Failed to answer call"
)
logger.info(
f"Answered Telnyx inbound call {normalized_data.call_id} for workflow_run {workflow_run_id}"
)
return {"status": "ok"}
response = await provider_class.generate_inbound_response(
websocket_url, workflow_run_id
)

View file

@ -125,6 +125,28 @@ class ARIConfigurationResponse(BaseModel):
from_numbers: List[str]
class TelnyxConfigurationRequest(BaseModel):
"""Request schema for Telnyx configuration."""
provider: str = Field(default="telnyx")
api_key: str = Field(..., description="Telnyx API Key")
connection_id: str = Field(
..., description="Telnyx Call Control Application ID (connection_id)"
)
from_numbers: List[str] = Field(
..., min_length=1, description="List of Telnyx phone numbers (E.164 format)"
)
class TelnyxConfigurationResponse(BaseModel):
"""Response schema for Telnyx configuration with masked sensitive fields."""
provider: str
api_key: str # Masked
connection_id: str
from_numbers: List[str]
class TelephonyConfigurationResponse(BaseModel):
"""Top-level telephony configuration response."""
@ -133,3 +155,4 @@ class TelephonyConfigurationResponse(BaseModel):
vobiz: Optional[VobizConfigurationResponse] = None
cloudonix: Optional[CloudonixConfigurationResponse] = None
ari: Optional[ARIConfigurationResponse] = None
telnyx: Optional[TelnyxConfigurationResponse] = None

View file

@ -97,8 +97,9 @@ def create_audio_config(transport_type: str) -> AudioConfig:
WorkflowRunMode.VOBIZ.value,
WorkflowRunMode.CLOUDONIX.value,
WorkflowRunMode.ARI.value,
WorkflowRunMode.TELNYX.value,
):
# Twilio, Cloudonix, Vobiz, and ARI use MULAW at 8kHz
# Twilio, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz
return AudioConfig(
transport_in_sample_rate=8000,
transport_out_sample_rate=8000,

View file

@ -44,6 +44,7 @@ from api.services.pipecat.tracing_config import (
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
create_telnyx_transport,
create_twilio_transport,
create_vobiz_transport,
create_vonage_transport,
@ -344,6 +345,74 @@ async def run_pipeline_vobiz(
raise
async def run_pipeline_telnyx(
websocket_client: WebSocket,
stream_id: str,
call_control_id: str,
workflow_id: int,
workflow_run_id: int,
user_id: int,
) -> None:
"""Run pipeline for Telnyx Call Control WebSocket connections.
Telnyx uses PCMU at 8kHz over WebSocket with base64-encoded media events,
similar to Twilio's protocol.
"""
logger.info(
f"[run {workflow_run_id}] Starting Telnyx pipeline - "
f"stream_id={stream_id}, call_control_id={call_control_id}, "
f"workflow_id={workflow_id}"
)
set_current_run_id(workflow_run_id)
cost_info = {"call_id": call_control_id}
await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
workflow = await db_client.get_workflow(workflow_id, user_id)
if workflow:
set_current_org_id(workflow.organization_id)
vad_config = None
ambient_noise_config = None
if workflow and workflow.workflow_configurations:
if "vad_configuration" in workflow.workflow_configurations:
vad_config = workflow.workflow_configurations["vad_configuration"]
if "ambient_noise_configuration" in workflow.workflow_configurations:
ambient_noise_config = workflow.workflow_configurations[
"ambient_noise_configuration"
]
try:
audio_config = create_audio_config(WorkflowRunMode.TELNYX.value)
transport = await create_telnyx_transport(
websocket_client,
stream_id,
call_control_id,
workflow_run_id,
audio_config,
workflow.organization_id,
vad_config,
ambient_noise_config,
)
await _run_pipeline(
transport,
workflow_id,
workflow_run_id,
user_id,
audio_config=audio_config,
)
logger.info(f"[run {workflow_run_id}] Telnyx pipeline completed successfully")
except Exception as e:
logger.error(
f"[run {workflow_run_id}] Error in Telnyx pipeline: {e}", exc_info=True
)
raise
async def run_pipeline_cloudonix(
websocket_client: WebSocket,
stream_sid: str,

View file

@ -20,6 +20,7 @@ from api.services.telephony.providers.twilio_call_strategies import (
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
from pipecat.serializers.telnyx import TelnyxFrameSerializer
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.serializers.vobiz import VobizFrameSerializer
from pipecat.serializers.vonage import VonageFrameSerializer
@ -169,6 +170,70 @@ async def create_cloudonix_transport(
)
async def create_telnyx_transport(
websocket_client: WebSocket,
stream_id: str,
call_control_id: str,
workflow_run_id: int,
audio_config: AudioConfig,
organization_id: int,
vad_config: dict | None = None,
ambient_noise_config: dict | None = None,
):
"""Create a transport for Telnyx connections."""
config = await db_client.get_configuration(
organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value
)
if not config or not config.value:
raise ValueError(
f"Telnyx credentials not configured for organization {organization_id}"
)
if config.value.get("provider") != "telnyx":
raise ValueError(
f"Expected Telnyx provider, got {config.value.get('provider')}"
)
api_key = config.value.get("api_key")
if not api_key:
raise ValueError(
f"Incomplete Telnyx configuration for organization {organization_id}"
)
serializer = TelnyxFrameSerializer(
stream_id=stream_id,
call_control_id=call_control_id,
api_key=api_key,
outbound_encoding="PCMU",
inbound_encoding="PCMU",
)
return FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_mixer=(
SoundfileMixer(
sound_files={
"office": APP_ROOT_DIR
/ "assets"
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
},
default_sound="office",
volume=ambient_noise_config.get("volume", 0.3),
)
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
serializer=serializer,
),
)
async def create_ari_transport(
websocket_client: WebSocket,
channel_id: str,

View file

@ -13,6 +13,7 @@ from api.enums import OrganizationConfigurationKey
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.providers.ari_provider import ARIProvider
from api.services.telephony.providers.cloudonix_provider import CloudonixProvider
from api.services.telephony.providers.telnyx_provider import TelnyxProvider
from api.services.telephony.providers.twilio_provider import TwilioProvider
from api.services.telephony.providers.vobiz_provider import VobizProvider
from api.services.telephony.providers.vonage_provider import VonageProvider
@ -76,6 +77,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"domain_id": config.value.get("domain_id"),
"from_numbers": config.value.get("from_numbers", []),
}
elif provider == "telnyx":
return {
"provider": "telnyx",
"api_key": config.value.get("api_key"),
"connection_id": config.value.get("connection_id"),
"from_numbers": config.value.get("from_numbers", []),
}
elif provider == "ari":
return {
"provider": "ari",
@ -125,6 +133,9 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider:
elif provider_type == "cloudonix":
return CloudonixProvider(config)
elif provider_type == "telnyx":
return TelnyxProvider(config)
elif provider_type == "ari":
return ARIProvider(config)
@ -143,6 +154,7 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
return [
ARIProvider,
CloudonixProvider,
TelnyxProvider,
TwilioProvider,
VobizProvider,
VonageProvider,

View file

@ -0,0 +1,466 @@
"""
Telnyx implementation of the TelephonyProvider interface.
Uses the Telnyx Call Control API v2 for outbound calling with
inline WebSocket media streaming.
"""
import json
import random
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
from fastapi import HTTPException
from loguru import logger
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
NormalizedInboundData,
TelephonyProvider,
)
from api.utils.common import get_backend_endpoints
if TYPE_CHECKING:
from fastapi import WebSocket
class TelnyxProvider(TelephonyProvider):
"""
Telnyx implementation of TelephonyProvider.
Uses the Call Control API v2 with inline WebSocket streaming for audio.
"""
PROVIDER_NAME = WorkflowRunMode.TELNYX.value
WEBHOOK_ENDPOINT = "telnyx/webhook"
TELNYX_API_BASE = "https://api.telnyx.com/v2"
def __init__(self, config: Dict[str, Any]):
self.api_key = config.get("api_key")
self.connection_id = config.get("connection_id")
self.from_numbers = config.get("from_numbers", [])
if isinstance(self.from_numbers, str):
self.from_numbers = [self.from_numbers]
def _headers(self) -> Dict[str, str]:
return {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
async def initiate_call(
self,
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""Initiate an outbound call via Telnyx Call Control API."""
if not self.validate_config():
raise ValueError("Telnyx provider not properly configured")
if from_number is None:
from_number = random.choice(self.from_numbers)
logger.info(f"Selected phone number {from_number} for outbound call")
backend_endpoint, wss_backend_endpoint = await get_backend_endpoints()
# Build the WebSocket stream URL for inline audio streaming
workflow_id = kwargs.get("workflow_id")
user_id = kwargs.get("user_id")
stream_url = (
f"{wss_backend_endpoint}/api/v1/telephony/ws"
f"/{workflow_id}/{user_id}/{workflow_run_id}"
)
# Build the webhook URL for status callbacks
events_url = (
f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}"
)
payload = {
"connection_id": self.connection_id,
"to": to_number,
"from": from_number,
"stream_url": stream_url,
"stream_track": "inbound_track",
"stream_bidirectional_mode": "rtp",
"stream_bidirectional_codec": "PCMU",
"webhook_url": events_url,
"webhook_url_method": "POST",
}
logger.info(
f"Telnyx dial payload: {json.dumps({k: v for k, v in payload.items() if k != 'connection_id'})}"
)
endpoint = f"{self.TELNYX_API_BASE}/calls"
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, json=payload, headers=self._headers()
) as response:
if response.status != 200:
error_data = await response.json()
logger.error(f"Telnyx API error: {error_data}")
raise HTTPException(
status_code=response.status, detail=json.dumps(error_data)
)
response_data = await response.json()
data = response_data.get("data", {})
call_control_id = data.get("call_control_id", "")
call_leg_id = data.get("call_leg_id", "")
call_session_id = data.get("call_session_id", "")
logger.info(
f"Telnyx call initiated: call_control_id={call_control_id}, "
f"call_leg_id={call_leg_id}, call_session_id={call_session_id}"
)
return CallInitiationResult(
call_id=call_control_id,
status="initiated",
provider_metadata={
"call_control_id": call_control_id,
"call_leg_id": call_leg_id,
"call_session_id": call_session_id,
},
raw_response=response_data,
)
async def get_call_status(self, call_id: str) -> Dict[str, Any]:
"""Get the current status of a Telnyx call."""
endpoint = f"{self.TELNYX_API_BASE}/calls/{call_id}"
async with aiohttp.ClientSession() as session:
async with session.get(endpoint, headers=self._headers()) as response:
if response.status != 200:
error_data = await response.json()
raise Exception(f"Failed to get call status: {error_data}")
return await response.json()
async def get_available_phone_numbers(self) -> List[str]:
return self.from_numbers
def validate_config(self) -> bool:
return bool(self.api_key and self.connection_id and self.from_numbers)
async def verify_webhook_signature(
self, url: str, params: Dict[str, Any], signature: str
) -> bool:
"""Required by the abstract interface but not actively called for Telnyx.
Telnyx webhook signature verification uses Ed25519 (via the
telnyx-signature-ed25519 header). This can be implemented in the
future using the Telnyx SDK if needed.
"""
return True
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
"""Not used for Telnyx — streaming is inline with the dial request."""
return ""
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
"""Get cost information for a Telnyx call.
Telnyx doesn't provide per-call cost via the Call Control API.
Cost data is available through the billing/CDR APIs.
"""
return {
"cost_usd": 0.0,
"duration": 0,
"status": "unknown",
"raw_response": {},
}
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Telnyx webhook event data into generic format."""
event_data = data.get("data", data)
event_type = event_data.get("event_type", "")
payload = event_data.get("payload", {})
status = self._resolve_status(event_type, payload)
duration_secs = payload.get("duration_secs")
return {
"call_id": payload.get("call_control_id", ""),
"status": status,
"from_number": payload.get("from"),
"to_number": payload.get("to"),
"direction": payload.get("direction"),
"duration": str(duration_secs) if duration_secs else None,
"extra": data,
}
@staticmethod
def _resolve_status(event_type: str, payload: Dict[str, Any]) -> str:
"""Map a Telnyx event type (and hangup cause) to a normalized status."""
EVENT_STATUS = {
"call.initiated": "initiated",
"call.answered": "in-progress",
"call.hangup": "completed",
"call.machine.detection.ended": "machine-detected",
"streaming.started": "streaming-started",
"streaming.stopped": "streaming-stopped",
}
HANGUP_STATUS = {
"busy": "busy",
"no_answer": "no-answer",
"timeout": "no-answer",
"call_rejected": "failed",
"unallocated_number": "failed",
}
status = EVENT_STATUS.get(event_type, event_type)
if event_type == "call.hangup":
hangup_cause = payload.get("hangup_cause", "")
status = HANGUP_STATUS.get(hangup_cause, status)
return status
async def handle_websocket(
self,
websocket: "WebSocket",
workflow_id: int,
user_id: int,
workflow_run_id: int,
) -> None:
"""Handle Telnyx WebSocket connection for real-time audio.
Telnyx sends:
1. "connected" event on WebSocket open
2. "start" event with stream_id, call_control_id, media_format
3. "media" events with base64-encoded audio
"""
from api.services.pipecat.run_pipeline import run_pipeline_telnyx
try:
# Wait for "connected" event
first_msg = await websocket.receive_text()
msg = json.loads(first_msg)
if msg.get("event") != "connected":
logger.error(f"Expected 'connected' event, got: {msg.get('event')}")
await websocket.close(code=4400, reason="Expected connected event")
return
logger.debug(
f"Telnyx WebSocket connected for workflow_run {workflow_run_id}"
)
# Wait for "start" event with stream details
start_msg = await websocket.receive_text()
logger.debug(f"Received start message: {start_msg}")
start_data = json.loads(start_msg)
if start_data.get("event") != "start":
logger.error("Expected 'start' event second")
await websocket.close(code=4400, reason="Expected start event")
return
# Extract Telnyx-specific identifiers
try:
stream_id = start_data.get("stream_id", "")
start_info = start_data.get("start", {})
call_control_id = start_info.get("call_control_id", "")
except (KeyError, AttributeError):
logger.error("Missing stream_id or call_control_id in start message")
await websocket.close(code=4400, reason="Missing stream identifiers")
return
if not stream_id or not call_control_id:
logger.error(
f"Empty stream identifiers: stream_id={stream_id}, "
f"call_control_id={call_control_id}"
)
await websocket.close(code=4400, reason="Missing stream identifiers")
return
logger.info(
f"Telnyx stream started: stream_id={stream_id}, "
f"call_control_id={call_control_id}"
)
# Run the Telnyx pipeline
await run_pipeline_telnyx(
websocket,
stream_id,
call_control_id,
workflow_id,
workflow_run_id,
user_id,
)
except Exception as e:
logger.error(f"Error in Telnyx WebSocket handler: {e}")
raise
async def answer_and_stream(
self, call_control_id: str, stream_url: str, webhook_url: str
) -> None:
"""Answer an inbound Telnyx call and start WebSocket streaming inline.
This is Telnyx-specific: unlike Twilio/Vobiz where you return XML in the
webhook response, Telnyx requires an explicit REST API call to answer
the call and set up streaming.
Args:
call_control_id: The call_control_id from the inbound webhook
stream_url: WebSocket URL for bidirectional audio streaming
webhook_url: URL for status callback events
"""
endpoint = f"{self.TELNYX_API_BASE}/calls/{call_control_id}/actions/answer"
payload = {
"stream_url": stream_url,
"stream_track": "inbound_track",
"stream_bidirectional_mode": "rtp",
"stream_bidirectional_codec": "PCMU",
"webhook_url": webhook_url,
"webhook_url_method": "POST",
}
logger.info(
f"Answering Telnyx inbound call {call_control_id} with stream_url={stream_url}"
)
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, json=payload, headers=self._headers()
) as response:
if response.status != 200:
error_data = await response.text()
logger.error(
f"Failed to answer Telnyx call {call_control_id}: "
f"status={response.status}, response={error_data}"
)
raise Exception(
f"Failed to answer Telnyx call: {response.status} {error_data}"
)
logger.info(f"Successfully answered Telnyx call {call_control_id}")
# ======== INBOUND CALL METHODS ========
@classmethod
def can_handle_webhook(
cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
) -> bool:
"""Detect if a webhook is from Telnyx.
Telnyx webhooks have a nested data.event_type structure
and may include a telnyx-signature-ed25519 header.
"""
if "telnyx-signature-ed25519" in headers:
return True
# Check for Telnyx event structure
data = webhook_data.get("data", {})
if data.get("record_type") == "event" and "event_type" in data:
event_type = data.get("event_type", "")
if event_type.startswith("call."):
return True
return False
@staticmethod
def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
"""Parse Telnyx inbound webhook into normalized format."""
data = webhook_data.get("data", webhook_data)
payload = data.get("payload", {})
# Telnyx uses "incoming" for inbound — normalize to "inbound"
direction = payload.get("direction", "")
if direction == "incoming":
direction = "inbound"
return NormalizedInboundData(
provider=TelnyxProvider.PROVIDER_NAME,
call_id=payload.get("call_control_id", ""),
from_number=TelnyxProvider.normalize_phone_number(payload.get("from", "")),
to_number=TelnyxProvider.normalize_phone_number(payload.get("to", "")),
direction=direction,
call_status=data.get("event_type", ""),
account_id=payload.get("connection_id"),
raw_data=webhook_data,
)
@staticmethod
def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
"""Validate that the connection_id from webhook matches configuration."""
if not webhook_account_id:
return False
return config_data.get("connection_id") == webhook_account_id
@staticmethod
def normalize_phone_number(phone_number: str) -> str:
"""Normalize phone number to E.164 format.
Telnyx already provides numbers in E.164 format
"""
return phone_number or ""
async def verify_inbound_signature(
self, url: str, webhook_data: Dict[str, Any], signature: str
) -> bool:
"""Required by the abstract interface. Telnyx signature verification
(Ed25519) is not yet implemented accepts all inbound webhooks for now.
"""
return True
@staticmethod
async def generate_inbound_response(
websocket_url: str, workflow_run_id: int = None
) -> tuple:
"""Telnyx inbound calls don't use a webhook response for streaming.
The streaming is set up via Call Control commands.
"""
from fastapi import Response
return Response(content="{}", media_type="application/json")
@staticmethod
def generate_error_response(error_type: str, message: str) -> tuple:
from fastapi import Response
return Response(
content=json.dumps({"error": error_type, "message": message}),
media_type="application/json",
)
@staticmethod
def generate_validation_error_response(error_type) -> tuple:
from fastapi import Response
from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
message = TELEPHONY_ERROR_MESSAGES.get(
error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
)
return Response(
content=json.dumps({"error": str(error_type), "message": message}),
media_type="application/json",
)
# ======== CALL TRANSFER METHODS ========
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""Telnyx call transfer is not yet implemented."""
raise NotImplementedError("Call transfer not yet supported for Telnyx")
def supports_transfers(self) -> bool:
return False

View file

@ -13,6 +13,8 @@ import type {
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
TelephonyConfigurationResponse,
TelnyxConfigurationRequest,
TelnyxConfigurationResponse,
TwilioConfigurationRequest,
VobizConfigurationRequest,
VonageConfigurationRequest
@ -50,6 +52,9 @@ interface TelephonyConfigForm {
// Vobiz fields
auth_id?: string;
vobiz_auth_token?: string;
// Telnyx fields
telnyx_api_key?: string;
connection_id?: string;
// Cloudonix fields
bearer_token?: string;
domain_id?: string;
@ -161,6 +166,13 @@ export default function ConfigureTelephonyPage() {
typeof ariConfig.inbound_workflow_id === "number" ? ariConfig.inbound_workflow_id : undefined
);
setValue("from_numbers", ariConfig.from_numbers?.length > 0 ? ariConfig.from_numbers : [""]);
} else if ((response.data as TelephonyConfigurationResponse)?.telnyx) {
const telnyxConfig = (response.data as TelephonyConfigurationResponse).telnyx as TelnyxConfigurationResponse;
setHasExistingConfig(true);
setValue("provider", "telnyx");
setValue("telnyx_api_key", telnyxConfig.api_key);
setValue("connection_id", telnyxConfig.connection_id);
setValue("from_numbers", telnyxConfig.from_numbers?.length > 0 ? telnyxConfig.from_numbers : [""]);
}
}
} catch (error) {
@ -183,7 +195,8 @@ export default function ConfigureTelephonyPage() {
| VonageConfigurationRequest
| VobizConfigurationRequest
| CloudonixConfigurationRequest
| AriConfigurationRequest;
| AriConfigurationRequest
| TelnyxConfigurationRequest;
const filteredNumbers = data.from_numbers.filter(n => n.trim() !== "");
@ -201,7 +214,7 @@ export default function ConfigureTelephonyPage() {
let pattern: RegExp;
let formatMessage: string;
if (data.provider === "twilio") {
if (data.provider === "twilio" || data.provider === "telnyx") {
pattern = twilioPattern;
formatMessage = "with + prefix (e.g., +1234567890)";
} else if (data.provider === "cloudonix") {
@ -246,6 +259,13 @@ export default function ConfigureTelephonyPage() {
auth_id: data.auth_id,
auth_token: data.vobiz_auth_token,
} as VobizConfigurationRequest;
} else if (data.provider === "telnyx") {
requestBody = {
provider: data.provider,
from_numbers: filteredNumbers,
api_key: data.telnyx_api_key!,
connection_id: data.connection_id!,
} as TelnyxConfigurationRequest;
} else if (data.provider === "cloudonix") {
requestBody = {
provider: data.provider,
@ -312,13 +332,21 @@ export default function ConfigureTelephonyPage() {
? "Vonage"
: selectedProvider === "vobiz"
? "Vobiz"
: selectedProvider === "telnyx"
? "Telnyx"
: selectedProvider === "ari"
? "Asterisk ARI"
: "Cloudonix"}{" "}
Setup Guide
</CardTitle>
<CardDescription>
{selectedProvider === "ari" ? (
{selectedProvider === "telnyx" ? (
<>
Telnyx is a cloud communications platform providing programmable voice, messaging,
and networking services. Use the Call Control API to build voice applications
with real-time WebSocket audio streaming.
</>
) : selectedProvider === "ari" ? (
<>
Connect Dograh to your Asterisk PBX using the Asterisk REST Interface (ARI).
ARI provides a WebSocket-based event model for controlling calls via Stasis applications.
@ -368,7 +396,26 @@ export default function ConfigureTelephonyPage() {
</CardDescription>
</CardHeader>
<CardContent>
{selectedProvider === "ari" ? (
{selectedProvider === "telnyx" ? (
<div className="space-y-4 text-sm">
<div>
<h4 className="font-semibold mb-2">Getting Started with Telnyx:</h4>
<ol className="list-decimal list-inside space-y-1 text-muted-foreground">
<li>Sign up at <a href="https://telnyx.com" target="_blank" rel="noopener noreferrer" className="text-blue-600 hover:underline">telnyx.com</a> and create an API Key in the Mission Control Portal</li>
<li>Create a Call Control Application under Voice &gt; Programmable Voice</li>
<li>Note the Connection ID (Application ID) from your Call Control App</li>
<li>Purchase a phone number and assign it to your Call Control Application</li>
<li>Enter your API Key, Connection ID, and phone numbers below</li>
</ol>
</div>
<div className="bg-muted border border-border rounded p-3">
<p className="text-sm">
<strong>Note:</strong> Telnyx uses the Call Control API with WebSocket-based
bidirectional audio streaming. Phone numbers must be in E.164 format (e.g., +1234567890).
</p>
</div>
</div>
) : selectedProvider === "ari" ? (
<div className="space-y-4 text-sm">
<div>
<h4 className="font-semibold mb-2">Getting Started with Asterisk ARI:</h4>
@ -469,6 +516,7 @@ export default function ConfigureTelephonyPage() {
<SelectItem value="twilio">Twilio</SelectItem>
<SelectItem value="vonage">Vonage</SelectItem>
<SelectItem value="vobiz">Vobiz</SelectItem>
<SelectItem value="telnyx">Telnyx</SelectItem>
<SelectItem value="cloudonix">Cloudonix</SelectItem>
<SelectItem value="ari">Asterisk (ARI)</SelectItem>
</SelectContent>
@ -744,6 +792,97 @@ export default function ConfigureTelephonyPage() {
</>
)}
{/* Telnyx-specific fields */}
{selectedProvider === "telnyx" && (
<>
<div className="space-y-2">
<Label htmlFor="telnyx_api_key">API Key</Label>
<Input
id="telnyx_api_key"
type="password"
autoComplete="current-password"
placeholder={
hasExistingConfig
? "Leave masked to keep existing"
: "Enter your Telnyx API key"
}
{...register("telnyx_api_key", {
required: selectedProvider === "telnyx" && !hasExistingConfig
? "API Key is required"
: false,
})}
/>
{errors.telnyx_api_key && (
<p className="text-sm text-red-500">
{errors.telnyx_api_key.message}
</p>
)}
<p className="text-xs text-muted-foreground">
Found in the Telnyx Mission Control Portal under Auth &gt; API Keys
</p>
</div>
<div className="space-y-2">
<Label htmlFor="connection_id">Connection ID (Application ID)</Label>
<Input
id="connection_id"
placeholder="1234567890"
{...register("connection_id", {
required: selectedProvider === "telnyx"
? "Connection ID is required"
: false,
})}
/>
{errors.connection_id && (
<p className="text-sm text-red-500">
{errors.connection_id.message}
</p>
)}
<p className="text-xs text-muted-foreground">
The ID of your Call Control Application in Telnyx Mission Control
</p>
</div>
<div className="space-y-2">
<Label>CLI Phone Numbers</Label>
{fromNumbers.map((number, index) => (
<div key={index} className="flex gap-2">
<Input
autoComplete="tel"
placeholder="+1234567890"
value={number}
onChange={(e) => updatePhoneNumber(index, e.target.value)}
/>
{fromNumbers.length > 1 && (
<Button
type="button"
variant="outline"
size="icon"
onClick={() => removePhoneNumber(index)}
>
<Trash2 className="h-4 w-4" />
</Button>
)}
</div>
))}
<Button
type="button"
variant="outline"
size="sm"
onClick={addPhoneNumber}
>
<Plus className="h-4 w-4 mr-2" />
Add Phone Number
</Button>
{fromNumbers.some(n => n.trim() !== "" && !/^\+[1-9]\d{1,14}$/.test(n)) && (
<p className="text-sm text-red-500">
Enter valid phone numbers with country code (e.g., +1234567890)
</p>
)}
</div>
</>
)}
{/* Cloudonix-specific fields */}
{selectedProvider === "cloudonix" && (
<>

View file

@ -57,7 +57,7 @@ export const PhoneCallDialog = ({
try {
const configResponse = await getTelephonyConfigurationApiV1OrganizationsTelephonyConfigGet({});
if (configResponse.error || (!configResponse.data?.twilio && !configResponse.data?.vonage && !configResponse.data?.vobiz && !configResponse.data?.cloudonix && !configResponse.data?.ari)) {
if (configResponse.error || (!configResponse.data?.twilio && !configResponse.data?.vonage && !configResponse.data?.vobiz && !configResponse.data?.cloudonix && !configResponse.data?.ari && !configResponse.data?.telnyx)) {
setNeedsConfiguration(true);
} else {
setNeedsConfiguration(false);

File diff suppressed because one or more lines are too long

View file

@ -1030,6 +1030,36 @@ export type TelephonyConfigurationResponse = {
vobiz?: VobizConfigurationResponse | null;
cloudonix?: CloudonixConfigurationResponse | null;
ari?: AriConfigurationResponse | null;
telnyx?: TelnyxConfigurationResponse | null;
};
/**
* Request schema for Telnyx configuration.
*/
export type TelnyxConfigurationRequest = {
provider?: string;
/**
* Telnyx API Key
*/
api_key: string;
/**
* Telnyx Call Control Application ID (connection_id)
*/
connection_id: string;
/**
* List of Telnyx phone numbers (E.164 format)
*/
from_numbers: Array<string>;
};
/**
* Response schema for Telnyx configuration with masked sensitive fields.
*/
export type TelnyxConfigurationResponse = {
provider: string;
api_key: string;
connection_id: string;
from_numbers: Array<string>;
};
export type TestSessionResponse = {
@ -1611,6 +1641,35 @@ export type HandleTwilioStatusCallbackApiV1TelephonyTwilioStatusCallbackWorkflow
200: unknown;
};
export type HandleTelnyxEventsApiV1TelephonyTelnyxEventsWorkflowRunIdPostData = {
body?: never;
path: {
workflow_run_id: number;
};
query?: never;
url: '/api/v1/telephony/telnyx/events/{workflow_run_id}';
};
export type HandleTelnyxEventsApiV1TelephonyTelnyxEventsWorkflowRunIdPostErrors = {
/**
* Not found
*/
404: unknown;
/**
* Validation Error
*/
422: HttpValidationError;
};
export type HandleTelnyxEventsApiV1TelephonyTelnyxEventsWorkflowRunIdPostError = HandleTelnyxEventsApiV1TelephonyTelnyxEventsWorkflowRunIdPostErrors[keyof HandleTelnyxEventsApiV1TelephonyTelnyxEventsWorkflowRunIdPostErrors];
export type HandleTelnyxEventsApiV1TelephonyTelnyxEventsWorkflowRunIdPostResponses = {
/**
* Successful Response
*/
200: unknown;
};
export type HandleVonageEventsApiV1TelephonyVonageEventsWorkflowRunIdPostData = {
body?: never;
path: {
@ -1775,6 +1834,7 @@ export type HandleInboundTelephonyApiV1TelephonyInboundWorkflowIdPostData = {
'x-vobiz-signature'?: string | null;
'x-vobiz-timestamp'?: string | null;
'x-cx-apikey'?: string | null;
'telnyx-signature-ed25519'?: string | null;
};
path: {
workflow_id: number;
@ -3792,7 +3852,7 @@ export type GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetRespons
export type GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponse = GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponses[keyof GetTelephonyConfigurationApiV1OrganizationsTelephonyConfigGetResponses];
export type SaveTelephonyConfigurationApiV1OrganizationsTelephonyConfigPostData = {
body: TwilioConfigurationRequest | VonageConfigurationRequest | VobizConfigurationRequest | CloudonixConfigurationRequest | AriConfigurationRequest;
body: TwilioConfigurationRequest | VonageConfigurationRequest | VobizConfigurationRequest | CloudonixConfigurationRequest | AriConfigurationRequest | TelnyxConfigurationRequest;
headers?: {
authorization?: string | null;
'X-API-Key'?: string | null;

View file

@ -9,7 +9,8 @@ export const WORKFLOW_RUN_MODES = {
CLOUDONIX: 'cloudonix',
WEBRTC: 'webrtc',
SMALL_WEBRTC: 'smallwebrtc',
ARI: 'ari'
ARI: 'ari',
TELNYX: 'telnyx'
} as const;
export type WorkflowRunMode = typeof WORKFLOW_RUN_MODES[keyof typeof WORKFLOW_RUN_MODES];