diff --git a/api/alembic/versions/b3a1c7e94f12_add_telnyx_mode.py b/api/alembic/versions/b3a1c7e94f12_add_telnyx_mode.py
new file mode 100644
index 0000000..a95e415
--- /dev/null
+++ b/api/alembic/versions/b3a1c7e94f12_add_telnyx_mode.py
@@ -0,0 +1,69 @@
+"""add telnyx mode
+
+Revision ID: b3a1c7e94f12
+Revises: e54ddb048535
+Create Date: 2026-03-24 12:00:00.000000
+
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+from alembic_postgresql_enum import TableReference
+
+# revision identifiers, used by Alembic.
+revision: str = "b3a1c7e94f12"
+down_revision: Union[str, None] = "e54ddb048535"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.sync_enum_values(
+ enum_schema="public",
+ enum_name="workflow_run_mode",
+ new_values=[
+ "ari",
+ "twilio",
+ "vonage",
+ "vobiz",
+ "cloudonix",
+ "telnyx",
+ "webrtc",
+ "smallwebrtc",
+ "stasis",
+ "VOICE",
+ "CHAT",
+ ],
+ affected_columns=[
+ TableReference(
+ table_schema="public", table_name="workflow_runs", column_name="mode"
+ )
+ ],
+ enum_values_to_rename=[],
+ )
+
+
+def downgrade() -> None:
+ op.sync_enum_values(
+ enum_schema="public",
+ enum_name="workflow_run_mode",
+ new_values=[
+ "ari",
+ "twilio",
+ "vonage",
+ "vobiz",
+ "cloudonix",
+ "webrtc",
+ "smallwebrtc",
+ "stasis",
+ "VOICE",
+ "CHAT",
+ ],
+ affected_columns=[
+ TableReference(
+ table_schema="public", table_name="workflow_runs", column_name="mode"
+ )
+ ],
+ enum_values_to_rename=[],
+ )
diff --git a/api/enums.py b/api/enums.py
index e6ea04b..bd3425b 100644
--- a/api/enums.py
+++ b/api/enums.py
@@ -23,6 +23,7 @@ class WorkflowRunMode(Enum):
VONAGE = "vonage"
VOBIZ = "vobiz"
CLOUDONIX = "cloudonix"
+ TELNYX = "telnyx"
WEBRTC = "webrtc"
SMALLWEBRTC = "smallwebrtc"
diff --git a/api/routes/organization.py b/api/routes/organization.py
index b212821..088cabf 100644
--- a/api/routes/organization.py
+++ b/api/routes/organization.py
@@ -13,6 +13,8 @@ from api.schemas.telephony_config import (
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
TelephonyConfigurationResponse,
+ TelnyxConfigurationRequest,
+ TelnyxConfigurationResponse,
TwilioConfigurationRequest,
TwilioConfigurationResponse,
VobizConfigurationRequest,
@@ -33,6 +35,7 @@ PROVIDER_MASKED_FIELDS = {
"vobiz": ["auth_id", "auth_token"],
"cloudonix": ["bearer_token"],
"ari": ["app_password"],
+ "telnyx": ["api_key"],
}
@@ -149,6 +152,19 @@ async def get_telephony_configuration(user: UserModel = Depends(get_user)):
from_numbers=from_numbers,
),
)
+ elif stored_provider == "telnyx":
+ api_key = config.value.get("api_key", "")
+ connection_id = config.value.get("connection_id", "")
+ from_numbers = config.value.get("from_numbers", [])
+
+ return TelephonyConfigurationResponse(
+ telnyx=TelnyxConfigurationResponse(
+ provider="telnyx",
+ api_key=mask_key(api_key) if api_key else "",
+ connection_id=connection_id,
+ from_numbers=from_numbers,
+ ),
+ )
else:
return TelephonyConfigurationResponse()
@@ -161,6 +177,7 @@ async def save_telephony_configuration(
VobizConfigurationRequest,
CloudonixConfigurationRequest,
ARIConfigurationRequest,
+ TelnyxConfigurationRequest,
],
user: UserModel = Depends(get_user),
):
@@ -205,6 +222,13 @@ async def save_telephony_configuration(
"domain_id": request.domain_id,
"from_numbers": request.from_numbers,
}
+ elif request.provider == "telnyx":
+ config_value = {
+ "provider": "telnyx",
+ "api_key": request.api_key,
+ "connection_id": request.connection_id,
+ "from_numbers": request.from_numbers,
+ }
elif request.provider == "ari":
config_value = {
"provider": "ari",
diff --git a/api/routes/telephony.py b/api/routes/telephony.py
index 2ab12bf..723a6cd 100644
--- a/api/routes/telephony.py
+++ b/api/routes/telephony.py
@@ -318,6 +318,7 @@ async def _validate_inbound_request(
x_vobiz_signature: str = None,
x_vobiz_timestamp: str = None,
x_cx_apikey: str = None,
+ telnyx_signature: str = None,
) -> tuple[bool, TelephonyError, dict, object]:
"""
Validate all aspects of inbound request.
@@ -351,7 +352,7 @@ async def _validate_inbound_request(
# Verify webhook signature/API key if provided
provider_instance = None
- if x_twilio_signature or x_vobiz_signature or x_cx_apikey:
+ if x_twilio_signature or x_vobiz_signature or x_cx_apikey or telnyx_signature:
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/inbound/{workflow_id}"
@@ -377,6 +378,11 @@ async def _validate_inbound_request(
signature_valid = await provider_instance.verify_inbound_signature(
webhook_url, webhook_data, x_cx_apikey
)
+ elif provider_class.PROVIDER_NAME == "telnyx" and telnyx_signature:
+ logger.info(f"Verifying Telnyx signature for URL: {webhook_url}")
+ signature_valid = await provider_instance.verify_inbound_signature(
+ webhook_url, webhook_data, telnyx_signature
+ )
else:
logger.warning(
f"No signature/API key validation for provider {provider_class.PROVIDER_NAME}"
@@ -818,6 +824,63 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq
)
+@router.post("/telnyx/events/{workflow_run_id}")
+async def handle_telnyx_events(
+ request: Request,
+ workflow_run_id: int,
+):
+ """Handle Telnyx Call Control webhook events.
+
+ Telnyx sends all call lifecycle events (call.initiated, call.answered,
+ call.hangup, streaming.started, streaming.stopped) as JSON POST requests.
+ """
+ set_current_run_id(workflow_run_id)
+
+ event_data = await request.json()
+ logger.info(
+ f"[run {workflow_run_id}] Received Telnyx event: {json.dumps(event_data)}"
+ )
+
+ # Extract event type from Telnyx envelope
+ data = event_data.get("data", {})
+ event_type = data.get("event_type", "")
+
+ # Skip streaming events — they're informational only
+ if event_type in ("streaming.started", "streaming.stopped"):
+ logger.debug(f"[run {workflow_run_id}] Telnyx streaming event: {event_type}")
+ return {"status": "success"}
+
+ # Get workflow run and provider
+ workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
+ if not workflow_run:
+ logger.warning(f"Workflow run {workflow_run_id} not found for Telnyx event")
+ return {"status": "ignored", "reason": "workflow_run_not_found"}
+
+ workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
+ if not workflow:
+ logger.warning(f"Workflow {workflow_run.workflow_id} not found")
+ return {"status": "ignored", "reason": "workflow_not_found"}
+
+ provider = await get_telephony_provider(workflow.organization_id)
+
+ # Parse the callback data into generic format
+ parsed_data = provider.parse_status_callback(event_data)
+
+ status_update = StatusCallbackRequest(
+ call_id=parsed_data["call_id"],
+ status=parsed_data["status"],
+ from_number=parsed_data.get("from_number"),
+ to_number=parsed_data.get("to_number"),
+ direction=parsed_data.get("direction"),
+ duration=parsed_data.get("duration"),
+ extra=parsed_data.get("extra", {}),
+ )
+
+ await _process_status_update(workflow_run_id, status_update)
+
+ return {"status": "success"}
+
+
@router.post("/vonage/events/{workflow_run_id}")
async def handle_vonage_events(
request: Request,
@@ -1355,6 +1418,7 @@ async def handle_inbound_telephony(
x_vobiz_signature: Optional[str] = Header(None),
x_vobiz_timestamp: Optional[str] = Header(None),
x_cx_apikey: Optional[str] = Header(None),
+ telnyx_signature: Optional[str] = Header(None, alias="telnyx-signature-ed25519"),
):
"""Handle inbound telephony calls from any supported provider with common processing"""
logger.info(f"Inbound call received for workflow_id: {workflow_id}")
@@ -1409,6 +1473,7 @@ async def handle_inbound_telephony(
x_vobiz_signature,
x_vobiz_timestamp,
x_cx_apikey,
+ telnyx_signature,
)
if not is_valid:
@@ -1436,8 +1501,38 @@ async def handle_inbound_telephony(
)
# Generate response URLs
- _, wss_backend_endpoint = await get_backend_endpoints()
+ backend_endpoint, wss_backend_endpoint = await get_backend_endpoints()
websocket_url = f"{wss_backend_endpoint}/api/v1/telephony/ws/{workflow_id}/{workflow_context['user_id']}/{workflow_run_id}"
+
+ # Telnyx requires answering the call via REST API (not via webhook response)
+ if provider_class.PROVIDER_NAME == "telnyx":
+ # Get provider instance with credentials if not already loaded
+ if not provider_instance:
+ provider_instance = await get_telephony_provider(
+ workflow_context["organization_id"]
+ )
+
+ events_url = (
+ f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}"
+ )
+
+ try:
+ await provider_instance.answer_and_stream(
+ call_control_id=normalized_data.call_id,
+ stream_url=websocket_url,
+ webhook_url=events_url,
+ )
+ except Exception as e:
+ logger.error(f"Failed to answer Telnyx inbound call: {e}")
+ return provider_class.generate_error_response(
+ "ANSWER_FAILED", "Failed to answer call"
+ )
+
+ logger.info(
+ f"Answered Telnyx inbound call {normalized_data.call_id} for workflow_run {workflow_run_id}"
+ )
+ return {"status": "ok"}
+
response = await provider_class.generate_inbound_response(
websocket_url, workflow_run_id
)
diff --git a/api/schemas/telephony_config.py b/api/schemas/telephony_config.py
index da6d605..e1c3525 100644
--- a/api/schemas/telephony_config.py
+++ b/api/schemas/telephony_config.py
@@ -125,6 +125,28 @@ class ARIConfigurationResponse(BaseModel):
from_numbers: List[str]
+class TelnyxConfigurationRequest(BaseModel):
+ """Request schema for Telnyx configuration."""
+
+ provider: str = Field(default="telnyx")
+ api_key: str = Field(..., description="Telnyx API Key")
+ connection_id: str = Field(
+ ..., description="Telnyx Call Control Application ID (connection_id)"
+ )
+ from_numbers: List[str] = Field(
+ ..., min_length=1, description="List of Telnyx phone numbers (E.164 format)"
+ )
+
+
+class TelnyxConfigurationResponse(BaseModel):
+ """Response schema for Telnyx configuration with masked sensitive fields."""
+
+ provider: str
+ api_key: str # Masked
+ connection_id: str
+ from_numbers: List[str]
+
+
class TelephonyConfigurationResponse(BaseModel):
"""Top-level telephony configuration response."""
@@ -133,3 +155,4 @@ class TelephonyConfigurationResponse(BaseModel):
vobiz: Optional[VobizConfigurationResponse] = None
cloudonix: Optional[CloudonixConfigurationResponse] = None
ari: Optional[ARIConfigurationResponse] = None
+ telnyx: Optional[TelnyxConfigurationResponse] = None
diff --git a/api/services/pipecat/audio_config.py b/api/services/pipecat/audio_config.py
index 0a5bd54..6f59de2 100644
--- a/api/services/pipecat/audio_config.py
+++ b/api/services/pipecat/audio_config.py
@@ -97,8 +97,9 @@ def create_audio_config(transport_type: str) -> AudioConfig:
WorkflowRunMode.VOBIZ.value,
WorkflowRunMode.CLOUDONIX.value,
WorkflowRunMode.ARI.value,
+ WorkflowRunMode.TELNYX.value,
):
- # Twilio, Cloudonix, Vobiz, and ARI use MULAW at 8kHz
+ # Twilio, Cloudonix, Vobiz, Telnyx, and ARI use MULAW at 8kHz
return AudioConfig(
transport_in_sample_rate=8000,
transport_out_sample_rate=8000,
diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py
index d5bcb2c..abbe20b 100644
--- a/api/services/pipecat/run_pipeline.py
+++ b/api/services/pipecat/run_pipeline.py
@@ -44,6 +44,7 @@ from api.services.pipecat.tracing_config import (
from api.services.pipecat.transport_setup import (
create_ari_transport,
create_cloudonix_transport,
+ create_telnyx_transport,
create_twilio_transport,
create_vobiz_transport,
create_vonage_transport,
@@ -344,6 +345,74 @@ async def run_pipeline_vobiz(
raise
+async def run_pipeline_telnyx(
+ websocket_client: WebSocket,
+ stream_id: str,
+ call_control_id: str,
+ workflow_id: int,
+ workflow_run_id: int,
+ user_id: int,
+) -> None:
+ """Run pipeline for Telnyx Call Control WebSocket connections.
+
+ Telnyx uses PCMU at 8kHz over WebSocket with base64-encoded media events,
+ similar to Twilio's protocol.
+ """
+ logger.info(
+ f"[run {workflow_run_id}] Starting Telnyx pipeline - "
+ f"stream_id={stream_id}, call_control_id={call_control_id}, "
+ f"workflow_id={workflow_id}"
+ )
+ set_current_run_id(workflow_run_id)
+
+ cost_info = {"call_id": call_control_id}
+ await db_client.update_workflow_run(workflow_run_id, cost_info=cost_info)
+
+ workflow = await db_client.get_workflow(workflow_id, user_id)
+
+ if workflow:
+ set_current_org_id(workflow.organization_id)
+
+ vad_config = None
+ ambient_noise_config = None
+ if workflow and workflow.workflow_configurations:
+ if "vad_configuration" in workflow.workflow_configurations:
+ vad_config = workflow.workflow_configurations["vad_configuration"]
+ if "ambient_noise_configuration" in workflow.workflow_configurations:
+ ambient_noise_config = workflow.workflow_configurations[
+ "ambient_noise_configuration"
+ ]
+
+ try:
+ audio_config = create_audio_config(WorkflowRunMode.TELNYX.value)
+
+ transport = await create_telnyx_transport(
+ websocket_client,
+ stream_id,
+ call_control_id,
+ workflow_run_id,
+ audio_config,
+ workflow.organization_id,
+ vad_config,
+ ambient_noise_config,
+ )
+
+ await _run_pipeline(
+ transport,
+ workflow_id,
+ workflow_run_id,
+ user_id,
+ audio_config=audio_config,
+ )
+ logger.info(f"[run {workflow_run_id}] Telnyx pipeline completed successfully")
+
+ except Exception as e:
+ logger.error(
+ f"[run {workflow_run_id}] Error in Telnyx pipeline: {e}", exc_info=True
+ )
+ raise
+
+
async def run_pipeline_cloudonix(
websocket_client: WebSocket,
stream_sid: str,
diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py
index 7caf5b2..a4556cb 100644
--- a/api/services/pipecat/transport_setup.py
+++ b/api/services/pipecat/transport_setup.py
@@ -20,6 +20,7 @@ from api.services.telephony.providers.twilio_call_strategies import (
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
+from pipecat.serializers.telnyx import TelnyxFrameSerializer
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.serializers.vobiz import VobizFrameSerializer
from pipecat.serializers.vonage import VonageFrameSerializer
@@ -169,6 +170,70 @@ async def create_cloudonix_transport(
)
+async def create_telnyx_transport(
+ websocket_client: WebSocket,
+ stream_id: str,
+ call_control_id: str,
+ workflow_run_id: int,
+ audio_config: AudioConfig,
+ organization_id: int,
+ vad_config: dict | None = None,
+ ambient_noise_config: dict | None = None,
+):
+ """Create a transport for Telnyx connections."""
+ config = await db_client.get_configuration(
+ organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value
+ )
+
+ if not config or not config.value:
+ raise ValueError(
+ f"Telnyx credentials not configured for organization {organization_id}"
+ )
+
+ if config.value.get("provider") != "telnyx":
+ raise ValueError(
+ f"Expected Telnyx provider, got {config.value.get('provider')}"
+ )
+
+ api_key = config.value.get("api_key")
+ if not api_key:
+ raise ValueError(
+ f"Incomplete Telnyx configuration for organization {organization_id}"
+ )
+
+ serializer = TelnyxFrameSerializer(
+ stream_id=stream_id,
+ call_control_id=call_control_id,
+ api_key=api_key,
+ outbound_encoding="PCMU",
+ inbound_encoding="PCMU",
+ )
+
+ return FastAPIWebsocketTransport(
+ websocket=websocket_client,
+ params=FastAPIWebsocketParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ audio_in_sample_rate=audio_config.transport_in_sample_rate,
+ audio_out_sample_rate=audio_config.transport_out_sample_rate,
+ audio_out_mixer=(
+ SoundfileMixer(
+ sound_files={
+ "office": APP_ROOT_DIR
+ / "assets"
+ / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
+ },
+ default_sound="office",
+ volume=ambient_noise_config.get("volume", 0.3),
+ )
+ if ambient_noise_config and ambient_noise_config.get("enabled", False)
+ else SilenceAudioMixer()
+ ),
+ serializer=serializer,
+ ),
+ )
+
+
async def create_ari_transport(
websocket_client: WebSocket,
channel_id: str,
diff --git a/api/services/telephony/factory.py b/api/services/telephony/factory.py
index 0e2bb6c..9044dc9 100644
--- a/api/services/telephony/factory.py
+++ b/api/services/telephony/factory.py
@@ -13,6 +13,7 @@ from api.enums import OrganizationConfigurationKey
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.providers.ari_provider import ARIProvider
from api.services.telephony.providers.cloudonix_provider import CloudonixProvider
+from api.services.telephony.providers.telnyx_provider import TelnyxProvider
from api.services.telephony.providers.twilio_provider import TwilioProvider
from api.services.telephony.providers.vobiz_provider import VobizProvider
from api.services.telephony.providers.vonage_provider import VonageProvider
@@ -76,6 +77,13 @@ async def load_telephony_config(organization_id: int) -> Dict[str, Any]:
"domain_id": config.value.get("domain_id"),
"from_numbers": config.value.get("from_numbers", []),
}
+ elif provider == "telnyx":
+ return {
+ "provider": "telnyx",
+ "api_key": config.value.get("api_key"),
+ "connection_id": config.value.get("connection_id"),
+ "from_numbers": config.value.get("from_numbers", []),
+ }
elif provider == "ari":
return {
"provider": "ari",
@@ -125,6 +133,9 @@ async def get_telephony_provider(organization_id: int) -> TelephonyProvider:
elif provider_type == "cloudonix":
return CloudonixProvider(config)
+ elif provider_type == "telnyx":
+ return TelnyxProvider(config)
+
elif provider_type == "ari":
return ARIProvider(config)
@@ -143,6 +154,7 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
return [
ARIProvider,
CloudonixProvider,
+ TelnyxProvider,
TwilioProvider,
VobizProvider,
VonageProvider,
diff --git a/api/services/telephony/providers/telnyx_provider.py b/api/services/telephony/providers/telnyx_provider.py
new file mode 100644
index 0000000..9b33cac
--- /dev/null
+++ b/api/services/telephony/providers/telnyx_provider.py
@@ -0,0 +1,466 @@
+"""
+Telnyx implementation of the TelephonyProvider interface.
+Uses the Telnyx Call Control API v2 for outbound calling with
+inline WebSocket media streaming.
+"""
+
+import json
+import random
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+
+import aiohttp
+from fastapi import HTTPException
+from loguru import logger
+
+from api.enums import WorkflowRunMode
+from api.services.telephony.base import (
+ CallInitiationResult,
+ NormalizedInboundData,
+ TelephonyProvider,
+)
+from api.utils.common import get_backend_endpoints
+
+if TYPE_CHECKING:
+ from fastapi import WebSocket
+
+
+class TelnyxProvider(TelephonyProvider):
+ """
+ Telnyx implementation of TelephonyProvider.
+ Uses the Call Control API v2 with inline WebSocket streaming for audio.
+ """
+
+ PROVIDER_NAME = WorkflowRunMode.TELNYX.value
+ WEBHOOK_ENDPOINT = "telnyx/webhook"
+
+ TELNYX_API_BASE = "https://api.telnyx.com/v2"
+
+ def __init__(self, config: Dict[str, Any]):
+ self.api_key = config.get("api_key")
+ self.connection_id = config.get("connection_id")
+ self.from_numbers = config.get("from_numbers", [])
+
+ if isinstance(self.from_numbers, str):
+ self.from_numbers = [self.from_numbers]
+
+ def _headers(self) -> Dict[str, str]:
+ return {
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ "Authorization": f"Bearer {self.api_key}",
+ }
+
+ async def initiate_call(
+ self,
+ to_number: str,
+ webhook_url: str,
+ workflow_run_id: Optional[int] = None,
+ from_number: Optional[str] = None,
+ **kwargs: Any,
+ ) -> CallInitiationResult:
+ """Initiate an outbound call via Telnyx Call Control API."""
+ if not self.validate_config():
+ raise ValueError("Telnyx provider not properly configured")
+
+ if from_number is None:
+ from_number = random.choice(self.from_numbers)
+ logger.info(f"Selected phone number {from_number} for outbound call")
+
+ backend_endpoint, wss_backend_endpoint = await get_backend_endpoints()
+
+ # Build the WebSocket stream URL for inline audio streaming
+ workflow_id = kwargs.get("workflow_id")
+ user_id = kwargs.get("user_id")
+ stream_url = (
+ f"{wss_backend_endpoint}/api/v1/telephony/ws"
+ f"/{workflow_id}/{user_id}/{workflow_run_id}"
+ )
+
+ # Build the webhook URL for status callbacks
+ events_url = (
+ f"{backend_endpoint}/api/v1/telephony/telnyx/events/{workflow_run_id}"
+ )
+
+ payload = {
+ "connection_id": self.connection_id,
+ "to": to_number,
+ "from": from_number,
+ "stream_url": stream_url,
+ "stream_track": "inbound_track",
+ "stream_bidirectional_mode": "rtp",
+ "stream_bidirectional_codec": "PCMU",
+ "webhook_url": events_url,
+ "webhook_url_method": "POST",
+ }
+
+ logger.info(
+ f"Telnyx dial payload: {json.dumps({k: v for k, v in payload.items() if k != 'connection_id'})}"
+ )
+
+ endpoint = f"{self.TELNYX_API_BASE}/calls"
+
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ endpoint, json=payload, headers=self._headers()
+ ) as response:
+ if response.status != 200:
+ error_data = await response.json()
+ logger.error(f"Telnyx API error: {error_data}")
+ raise HTTPException(
+ status_code=response.status, detail=json.dumps(error_data)
+ )
+
+ response_data = await response.json()
+ data = response_data.get("data", {})
+ call_control_id = data.get("call_control_id", "")
+ call_leg_id = data.get("call_leg_id", "")
+ call_session_id = data.get("call_session_id", "")
+
+ logger.info(
+ f"Telnyx call initiated: call_control_id={call_control_id}, "
+ f"call_leg_id={call_leg_id}, call_session_id={call_session_id}"
+ )
+
+ return CallInitiationResult(
+ call_id=call_control_id,
+ status="initiated",
+ provider_metadata={
+ "call_control_id": call_control_id,
+ "call_leg_id": call_leg_id,
+ "call_session_id": call_session_id,
+ },
+ raw_response=response_data,
+ )
+
+ async def get_call_status(self, call_id: str) -> Dict[str, Any]:
+ """Get the current status of a Telnyx call."""
+ endpoint = f"{self.TELNYX_API_BASE}/calls/{call_id}"
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(endpoint, headers=self._headers()) as response:
+ if response.status != 200:
+ error_data = await response.json()
+ raise Exception(f"Failed to get call status: {error_data}")
+ return await response.json()
+
+ async def get_available_phone_numbers(self) -> List[str]:
+ return self.from_numbers
+
+ def validate_config(self) -> bool:
+ return bool(self.api_key and self.connection_id and self.from_numbers)
+
+ async def verify_webhook_signature(
+ self, url: str, params: Dict[str, Any], signature: str
+ ) -> bool:
+ """Required by the abstract interface but not actively called for Telnyx.
+
+ Telnyx webhook signature verification uses Ed25519 (via the
+ telnyx-signature-ed25519 header). This can be implemented in the
+ future using the Telnyx SDK if needed.
+ """
+ return True
+
+ async def get_webhook_response(
+ self, workflow_id: int, user_id: int, workflow_run_id: int
+ ) -> str:
+ """Not used for Telnyx — streaming is inline with the dial request."""
+ return ""
+
+ async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
+ """Get cost information for a Telnyx call.
+
+ Telnyx doesn't provide per-call cost via the Call Control API.
+ Cost data is available through the billing/CDR APIs.
+ """
+ return {
+ "cost_usd": 0.0,
+ "duration": 0,
+ "status": "unknown",
+ "raw_response": {},
+ }
+
+ def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Parse Telnyx webhook event data into generic format."""
+ event_data = data.get("data", data)
+ event_type = event_data.get("event_type", "")
+ payload = event_data.get("payload", {})
+
+ status = self._resolve_status(event_type, payload)
+
+ duration_secs = payload.get("duration_secs")
+ return {
+ "call_id": payload.get("call_control_id", ""),
+ "status": status,
+ "from_number": payload.get("from"),
+ "to_number": payload.get("to"),
+ "direction": payload.get("direction"),
+ "duration": str(duration_secs) if duration_secs else None,
+ "extra": data,
+ }
+
+ @staticmethod
+ def _resolve_status(event_type: str, payload: Dict[str, Any]) -> str:
+ """Map a Telnyx event type (and hangup cause) to a normalized status."""
+ EVENT_STATUS = {
+ "call.initiated": "initiated",
+ "call.answered": "in-progress",
+ "call.hangup": "completed",
+ "call.machine.detection.ended": "machine-detected",
+ "streaming.started": "streaming-started",
+ "streaming.stopped": "streaming-stopped",
+ }
+
+ HANGUP_STATUS = {
+ "busy": "busy",
+ "no_answer": "no-answer",
+ "timeout": "no-answer",
+ "call_rejected": "failed",
+ "unallocated_number": "failed",
+ }
+
+ status = EVENT_STATUS.get(event_type, event_type)
+
+ if event_type == "call.hangup":
+ hangup_cause = payload.get("hangup_cause", "")
+ status = HANGUP_STATUS.get(hangup_cause, status)
+
+ return status
+
+ async def handle_websocket(
+ self,
+ websocket: "WebSocket",
+ workflow_id: int,
+ user_id: int,
+ workflow_run_id: int,
+ ) -> None:
+ """Handle Telnyx WebSocket connection for real-time audio.
+
+ Telnyx sends:
+ 1. "connected" event on WebSocket open
+ 2. "start" event with stream_id, call_control_id, media_format
+ 3. "media" events with base64-encoded audio
+ """
+ from api.services.pipecat.run_pipeline import run_pipeline_telnyx
+
+ try:
+ # Wait for "connected" event
+ first_msg = await websocket.receive_text()
+ msg = json.loads(first_msg)
+
+ if msg.get("event") != "connected":
+ logger.error(f"Expected 'connected' event, got: {msg.get('event')}")
+ await websocket.close(code=4400, reason="Expected connected event")
+ return
+
+ logger.debug(
+ f"Telnyx WebSocket connected for workflow_run {workflow_run_id}"
+ )
+
+ # Wait for "start" event with stream details
+ start_msg = await websocket.receive_text()
+ logger.debug(f"Received start message: {start_msg}")
+
+ start_data = json.loads(start_msg)
+ if start_data.get("event") != "start":
+ logger.error("Expected 'start' event second")
+ await websocket.close(code=4400, reason="Expected start event")
+ return
+
+ # Extract Telnyx-specific identifiers
+ try:
+ stream_id = start_data.get("stream_id", "")
+ start_info = start_data.get("start", {})
+ call_control_id = start_info.get("call_control_id", "")
+ except (KeyError, AttributeError):
+ logger.error("Missing stream_id or call_control_id in start message")
+ await websocket.close(code=4400, reason="Missing stream identifiers")
+ return
+
+ if not stream_id or not call_control_id:
+ logger.error(
+ f"Empty stream identifiers: stream_id={stream_id}, "
+ f"call_control_id={call_control_id}"
+ )
+ await websocket.close(code=4400, reason="Missing stream identifiers")
+ return
+
+ logger.info(
+ f"Telnyx stream started: stream_id={stream_id}, "
+ f"call_control_id={call_control_id}"
+ )
+
+ # Run the Telnyx pipeline
+ await run_pipeline_telnyx(
+ websocket,
+ stream_id,
+ call_control_id,
+ workflow_id,
+ workflow_run_id,
+ user_id,
+ )
+
+ except Exception as e:
+ logger.error(f"Error in Telnyx WebSocket handler: {e}")
+ raise
+
+ async def answer_and_stream(
+ self, call_control_id: str, stream_url: str, webhook_url: str
+ ) -> None:
+ """Answer an inbound Telnyx call and start WebSocket streaming inline.
+
+ This is Telnyx-specific: unlike Twilio/Vobiz where you return XML in the
+ webhook response, Telnyx requires an explicit REST API call to answer
+ the call and set up streaming.
+
+ Args:
+ call_control_id: The call_control_id from the inbound webhook
+ stream_url: WebSocket URL for bidirectional audio streaming
+ webhook_url: URL for status callback events
+ """
+ endpoint = f"{self.TELNYX_API_BASE}/calls/{call_control_id}/actions/answer"
+
+ payload = {
+ "stream_url": stream_url,
+ "stream_track": "inbound_track",
+ "stream_bidirectional_mode": "rtp",
+ "stream_bidirectional_codec": "PCMU",
+ "webhook_url": webhook_url,
+ "webhook_url_method": "POST",
+ }
+
+ logger.info(
+ f"Answering Telnyx inbound call {call_control_id} with stream_url={stream_url}"
+ )
+
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ endpoint, json=payload, headers=self._headers()
+ ) as response:
+ if response.status != 200:
+ error_data = await response.text()
+ logger.error(
+ f"Failed to answer Telnyx call {call_control_id}: "
+ f"status={response.status}, response={error_data}"
+ )
+ raise Exception(
+ f"Failed to answer Telnyx call: {response.status} {error_data}"
+ )
+
+ logger.info(f"Successfully answered Telnyx call {call_control_id}")
+
+ # ======== INBOUND CALL METHODS ========
+
+ @classmethod
+ def can_handle_webhook(
+ cls, webhook_data: Dict[str, Any], headers: Dict[str, str]
+ ) -> bool:
+ """Detect if a webhook is from Telnyx.
+
+ Telnyx webhooks have a nested data.event_type structure
+ and may include a telnyx-signature-ed25519 header.
+ """
+ if "telnyx-signature-ed25519" in headers:
+ return True
+
+ # Check for Telnyx event structure
+ data = webhook_data.get("data", {})
+ if data.get("record_type") == "event" and "event_type" in data:
+ event_type = data.get("event_type", "")
+ if event_type.startswith("call."):
+ return True
+
+ return False
+
+ @staticmethod
+ def parse_inbound_webhook(webhook_data: Dict[str, Any]) -> NormalizedInboundData:
+ """Parse Telnyx inbound webhook into normalized format."""
+ data = webhook_data.get("data", webhook_data)
+ payload = data.get("payload", {})
+
+ # Telnyx uses "incoming" for inbound — normalize to "inbound"
+ direction = payload.get("direction", "")
+ if direction == "incoming":
+ direction = "inbound"
+
+ return NormalizedInboundData(
+ provider=TelnyxProvider.PROVIDER_NAME,
+ call_id=payload.get("call_control_id", ""),
+ from_number=TelnyxProvider.normalize_phone_number(payload.get("from", "")),
+ to_number=TelnyxProvider.normalize_phone_number(payload.get("to", "")),
+ direction=direction,
+ call_status=data.get("event_type", ""),
+ account_id=payload.get("connection_id"),
+ raw_data=webhook_data,
+ )
+
+ @staticmethod
+ def validate_account_id(config_data: dict, webhook_account_id: str) -> bool:
+ """Validate that the connection_id from webhook matches configuration."""
+ if not webhook_account_id:
+ return False
+ return config_data.get("connection_id") == webhook_account_id
+
+ @staticmethod
+ def normalize_phone_number(phone_number: str) -> str:
+ """Normalize phone number to E.164 format.
+ Telnyx already provides numbers in E.164 format
+ """
+ return phone_number or ""
+
+ async def verify_inbound_signature(
+ self, url: str, webhook_data: Dict[str, Any], signature: str
+ ) -> bool:
+ """Required by the abstract interface. Telnyx signature verification
+ (Ed25519) is not yet implemented — accepts all inbound webhooks for now.
+ """
+ return True
+
+ @staticmethod
+ async def generate_inbound_response(
+ websocket_url: str, workflow_run_id: int = None
+ ) -> tuple:
+ """Telnyx inbound calls don't use a webhook response for streaming.
+ The streaming is set up via Call Control commands.
+ """
+ from fastapi import Response
+
+ return Response(content="{}", media_type="application/json")
+
+ @staticmethod
+ def generate_error_response(error_type: str, message: str) -> tuple:
+ from fastapi import Response
+
+ return Response(
+ content=json.dumps({"error": error_type, "message": message}),
+ media_type="application/json",
+ )
+
+ @staticmethod
+ def generate_validation_error_response(error_type) -> tuple:
+ from fastapi import Response
+
+ from api.errors.telephony_errors import TELEPHONY_ERROR_MESSAGES, TelephonyError
+
+ message = TELEPHONY_ERROR_MESSAGES.get(
+ error_type, TELEPHONY_ERROR_MESSAGES[TelephonyError.GENERAL_AUTH_FAILED]
+ )
+ return Response(
+ content=json.dumps({"error": str(error_type), "message": message}),
+ media_type="application/json",
+ )
+
+ # ======== CALL TRANSFER METHODS ========
+
+ async def transfer_call(
+ self,
+ destination: str,
+ transfer_id: str,
+ conference_name: str,
+ timeout: int = 30,
+ **kwargs: Any,
+ ) -> Dict[str, Any]:
+ """Telnyx call transfer is not yet implemented."""
+ raise NotImplementedError("Call transfer not yet supported for Telnyx")
+
+ def supports_transfers(self) -> bool:
+ return False
diff --git a/ui/src/app/telephony-configurations/page.tsx b/ui/src/app/telephony-configurations/page.tsx
index acc126d..c8aa0f4 100644
--- a/ui/src/app/telephony-configurations/page.tsx
+++ b/ui/src/app/telephony-configurations/page.tsx
@@ -13,6 +13,8 @@ import type {
CloudonixConfigurationRequest,
CloudonixConfigurationResponse,
TelephonyConfigurationResponse,
+ TelnyxConfigurationRequest,
+ TelnyxConfigurationResponse,
TwilioConfigurationRequest,
VobizConfigurationRequest,
VonageConfigurationRequest
@@ -50,6 +52,9 @@ interface TelephonyConfigForm {
// Vobiz fields
auth_id?: string;
vobiz_auth_token?: string;
+ // Telnyx fields
+ telnyx_api_key?: string;
+ connection_id?: string;
// Cloudonix fields
bearer_token?: string;
domain_id?: string;
@@ -161,6 +166,13 @@ export default function ConfigureTelephonyPage() {
typeof ariConfig.inbound_workflow_id === "number" ? ariConfig.inbound_workflow_id : undefined
);
setValue("from_numbers", ariConfig.from_numbers?.length > 0 ? ariConfig.from_numbers : [""]);
+ } else if ((response.data as TelephonyConfigurationResponse)?.telnyx) {
+ const telnyxConfig = (response.data as TelephonyConfigurationResponse).telnyx as TelnyxConfigurationResponse;
+ setHasExistingConfig(true);
+ setValue("provider", "telnyx");
+ setValue("telnyx_api_key", telnyxConfig.api_key);
+ setValue("connection_id", telnyxConfig.connection_id);
+ setValue("from_numbers", telnyxConfig.from_numbers?.length > 0 ? telnyxConfig.from_numbers : [""]);
}
}
} catch (error) {
@@ -183,7 +195,8 @@ export default function ConfigureTelephonyPage() {
| VonageConfigurationRequest
| VobizConfigurationRequest
| CloudonixConfigurationRequest
- | AriConfigurationRequest;
+ | AriConfigurationRequest
+ | TelnyxConfigurationRequest;
const filteredNumbers = data.from_numbers.filter(n => n.trim() !== "");
@@ -201,7 +214,7 @@ export default function ConfigureTelephonyPage() {
let pattern: RegExp;
let formatMessage: string;
- if (data.provider === "twilio") {
+ if (data.provider === "twilio" || data.provider === "telnyx") {
pattern = twilioPattern;
formatMessage = "with + prefix (e.g., +1234567890)";
} else if (data.provider === "cloudonix") {
@@ -246,6 +259,13 @@ export default function ConfigureTelephonyPage() {
auth_id: data.auth_id,
auth_token: data.vobiz_auth_token,
} as VobizConfigurationRequest;
+ } else if (data.provider === "telnyx") {
+ requestBody = {
+ provider: data.provider,
+ from_numbers: filteredNumbers,
+ api_key: data.telnyx_api_key!,
+ connection_id: data.connection_id!,
+ } as TelnyxConfigurationRequest;
} else if (data.provider === "cloudonix") {
requestBody = {
provider: data.provider,
@@ -312,13 +332,21 @@ export default function ConfigureTelephonyPage() {
? "Vonage"
: selectedProvider === "vobiz"
? "Vobiz"
+ : selectedProvider === "telnyx"
+ ? "Telnyx"
: selectedProvider === "ari"
? "Asterisk ARI"
: "Cloudonix"}{" "}
Setup Guide
- {selectedProvider === "ari" ? (
+ {selectedProvider === "telnyx" ? (
+ <>
+ Telnyx is a cloud communications platform providing programmable voice, messaging,
+ and networking services. Use the Call Control API to build voice applications
+ with real-time WebSocket audio streaming.
+ >
+ ) : selectedProvider === "ari" ? (
<>
Connect Dograh to your Asterisk PBX using the Asterisk REST Interface (ARI).
ARI provides a WebSocket-based event model for controlling calls via Stasis applications.
@@ -368,7 +396,26 @@ export default function ConfigureTelephonyPage() {
- {selectedProvider === "ari" ? (
+ {selectedProvider === "telnyx" ? (
+
+
+
Getting Started with Telnyx:
+
+
Sign up at telnyx.com and create an API Key in the Mission Control Portal
+
Create a Call Control Application under Voice > Programmable Voice
+
Note the Connection ID (Application ID) from your Call Control App
+
Purchase a phone number and assign it to your Call Control Application
+
Enter your API Key, Connection ID, and phone numbers below
+
+
+
+
+ Note: Telnyx uses the Call Control API with WebSocket-based
+ bidirectional audio streaming. Phone numbers must be in E.164 format (e.g., +1234567890).
+