From 4a6752e62bf896c5815ccfc70897b3ebcd5733f1 Mon Sep 17 00:00:00 2001 From: Sabiha Khan <87858386+chewwbaka@users.noreply.github.com> Date: Tue, 12 May 2026 13:44:39 +0530 Subject: [PATCH] feat(telephony/telnyx): add call transfer via conference bridge (#274) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Conference-based transfer for Telnyx with a two-step flow: - transfer_call dials the destination with a per-transfer webhook URL. - On call.answered, the webhook seeds a conference with the destination's live call_control_id and publishes DESTINATION_ANSWERED. - TelnyxConferenceStrategy joins the caller into the conference on pipeline teardown (EndTaskReason.TRANSFER_CALL). - On post-answer destination hangup, the webhook hangs up the caller — Telnyx's create_conference doesn't accept end_conference_on_exit on the seed leg, so we tear down the bridge ourselves. TransferContext gains optional workflow_run_id (for webhook→provider resolution in multi-config orgs) and conference_id (set on answer, rd by the strategy). Also fixes the transfer tool's provider lookup to go through get_telephony_provider_for_run instead of the deprecated org-default shim, which was returning the wrong provider in multi-config orgs. --- .../telephony/providers/telnyx/provider.py | 125 ++++++++- .../telephony/providers/telnyx/routes.py | 247 +++++++++++++++++- .../telephony/providers/telnyx/strategies.py | 215 +++++++++++++++ .../telephony/providers/telnyx/transport.py | 3 + .../telephony/transfer_event_protocol.py | 4 + .../workflow/pipecat_engine_custom_tools.py | 7 +- pipecat | 2 +- 7 files changed, 596 insertions(+), 7 deletions(-) create mode 100644 api/services/telephony/providers/telnyx/strategies.py diff --git a/api/services/telephony/providers/telnyx/provider.py b/api/services/telephony/providers/telnyx/provider.py index 7ad684b..96ee77d 100644 --- a/api/services/telephony/providers/telnyx/provider.py +++ b/api/services/telephony/providers/telnyx/provider.py @@ -621,8 +621,127 @@ class TelnyxProvider(TelephonyProvider): timeout: int = 30, **kwargs: Any, ) -> Dict[str, Any]: - """Telnyx call transfer is not yet implemented.""" - raise NotImplementedError("Call transfer not yet supported for Telnyx") + """Dial the destination as a plain call; conference is seeded later. + + Webhook (``call.answered``) seeds the conference with this leg; + ``TelnyxConferenceStrategy`` joins the caller on pipeline teardown. + https://developers.telnyx.com/api-reference/call-commands/dial + """ + if not self.validate_config(): + raise ValueError("Telnyx provider not properly configured") + + from_number = random.choice(self.from_numbers) + logger.info(f"Selected phone number {from_number} for Telnyx transfer call") + + backend_endpoint, _ = await get_backend_endpoints() + webhook_url = ( + f"{backend_endpoint}/api/v1/telephony/telnyx/transfer-result/{transfer_id}" + ) + + payload = { + "connection_id": self.connection_id, + "to": destination, + "from": from_number, + "timeout_secs": timeout, + "webhook_url": webhook_url, + "webhook_url_method": "POST", + } + payload.update(kwargs) + + endpoint = f"{self.TELNYX_API_BASE}/calls" + + logger.debug( + f"Telnyx transfer dial payload: " + f"{json.dumps({k: v for k, v in payload.items() if k != 'connection_id'})}" + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, json=payload, headers=self._headers() + ) as response: + response_text = await response.text() + if response.status != 200: + logger.error( + f"Telnyx transfer dial failed: " + f"status={response.status} body={response_text}" + ) + raise Exception( + f"Telnyx transfer dial failed: " + f"status={response.status} body={response_text}" + ) + + response_data = json.loads(response_text) + data = response_data.get("data", {}) + call_control_id = data.get("call_control_id", "") + + logger.info( + f"Telnyx transfer dial initiated: " + f"call_control_id={call_control_id}, " + f"to={destination}, conference_name={conference_name}" + ) + + return { + "call_sid": call_control_id, + "status": "initiated", + "provider": self.PROVIDER_NAME, + "from_number": from_number, + "to_number": destination, + "raw_response": response_data, + } + except Exception as e: + logger.error(f"Exception during Telnyx transfer dial: {e}") + raise def supports_transfers(self) -> bool: - return False + return True + + async def create_conference( + self, seed_call_control_id: str, name: str + ) -> Optional[str]: + """Seed a Telnyx conference with an existing call leg. + + Used by the transfer flow on ``call.answered`` to put the destination + leg into a conference immediately. The returned ``conference_id`` is stored + on the ``TransferContext`` so the strategy can later join the caller. + + https://developers.telnyx.com/api-reference/conference-commands/create-conference + """ + if not self.api_key: + logger.error("Cannot create Telnyx conference: api_key missing") + return None + + endpoint = f"{self.TELNYX_API_BASE}/conferences" + payload = { + "call_control_id": seed_call_control_id, + "name": name, + "start_conference_on_create": True, + } + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, json=payload, headers=self._headers() + ) as response: + body = await response.text() + if response.status != 200: + logger.error( + f"Telnyx create_conference failed: " + f"status={response.status} body={body}" + ) + return None + data = json.loads(body).get("data", {}) + conference_id = data.get("id") + if not conference_id: + logger.error( + f"Telnyx create_conference response missing id: {body}" + ) + return None + logger.info( + f"Telnyx conference {conference_id} created (name={name}, " + f"seeded with {seed_call_control_id})" + ) + return conference_id + except Exception as e: + logger.error(f"Exception during Telnyx create_conference: {e}") + return None diff --git a/api/services/telephony/providers/telnyx/routes.py b/api/services/telephony/providers/telnyx/routes.py index 0947b14..f84300c 100644 --- a/api/services/telephony/providers/telnyx/routes.py +++ b/api/services/telephony/providers/telnyx/routes.py @@ -11,16 +11,39 @@ from loguru import logger from pipecat.utils.run_context import set_current_run_id from api.db import db_client +from api.services.telephony.call_transfer_manager import get_call_transfer_manager from api.services.telephony.factory import get_telephony_provider_for_run -from api.services.telephony.providers.telnyx.provider import normalize_event_type +from api.services.telephony.providers.telnyx.provider import ( + TelnyxProvider, + normalize_event_type, +) +from api.services.telephony.providers.telnyx.strategies import TelnyxHangupStrategy from api.services.telephony.status_processor import ( StatusCallbackRequest, _process_status_update, ) +from api.services.telephony.transfer_event_protocol import ( + TransferContext, + TransferEvent, + TransferEventType, +) router = APIRouter() +# Hangup causes that signal a failed transfer attempt (vs. a successful call +# that later ended normally). Mapped to user-facing reasons published in the +# TransferEvent. Source for cause values: Telnyx call.hangup payload spec — +# https://developers.telnyx.com/api-reference/callbacks/call-hangup +_HANGUP_CAUSE_TO_REASON = { + "busy": "busy", + "no_answer": "no_answer", + "timeout": "no_answer", + "call_rejected": "call_failed", + "unallocated_number": "call_failed", +} + + @router.post("/telnyx/events/{workflow_run_id}") async def handle_telnyx_events( request: Request, @@ -80,3 +103,225 @@ async def handle_telnyx_events( await _process_status_update(workflow_run_id, status_update) return {"status": "success"} + + +@router.post("/telnyx/transfer-result/{transfer_id}") +async def handle_telnyx_transfer_result(transfer_id: str, request: Request): + """Handle Telnyx Call Control events for the transfer destination leg. + + The destination leg is dialed by :meth:`TelnyxProvider.transfer_call` with + this URL as ``webhook_url``. Telnyx sends every event for that leg here. + Outcomes: + + - ``call.answered``: seed a conference with the destination's live + ``call_control_id``, stamp ``conference_id`` onto the TransferContext, + and publish ``DESTINATION_ANSWERED`` so ``transfer_call_handler`` can + end the pipeline. ``TelnyxConferenceStrategy`` then joins the caller + into this conference at pipeline teardown. + - ``call.hangup`` pre-answer (no ``conference_id`` on the context): + publish ``TRANSFER_FAILED`` so the LLM can recover. + - ``call.hangup`` post-answer (``conference_id`` set): the destination + left a bridged conference; hang up the caller's leg to tear down the + empty bridge (Telnyx's create_conference doesn't accept + ``end_conference_on_exit`` on the seed leg). + + Event references: + - call.answered: https://developers.telnyx.com/api-reference/callbacks/call-answered + - call.hangup: https://developers.telnyx.com/api-reference/callbacks/call-hangup + """ + event_data = await request.json() + logger.info( + f"Telnyx transfer-result webhook (transfer_id={transfer_id}): " + f"{json.dumps(event_data)}" + ) + + data = event_data.get("data", {}) + event_type = normalize_event_type(data.get("event_type", "")) + payload = data.get("payload", {}) + call_control_id = payload.get("call_control_id", "") + + # Pre-answer events carry no outcome — wait for answered/hangup. + if event_type in ("call.initiated", "call.bridging", "streaming.started"): + return {"status": "pending"} + + call_transfer_manager = await get_call_transfer_manager() + transfer_context = await call_transfer_manager.get_transfer_context(transfer_id) + original_call_sid = transfer_context.original_call_sid if transfer_context else "" + conference_name = transfer_context.conference_name if transfer_context else None + + if event_type == "call.answered": + # Seed the conference now with the destination's live call_control_id + # from the webhook payload. The strategy at pipeline-end then only has + # to join the caller into this conference. Idempotent on duplicate + # webhooks: if conference_id is already stamped on the context, skip. + conference_id = ( + transfer_context.conference_id if transfer_context else None + ) + if transfer_context and not conference_id: + conference_id = await _seed_destination_conference( + transfer_context=transfer_context, + destination_call_control_id=call_control_id, + ) + if conference_id: + transfer_context.conference_id = conference_id + # Refresh call_sid with the live id from the webhook — it can + # diverge from the dial-response value once the leg is routed + # through its post-answer POP. + transfer_context.call_sid = call_control_id + await call_transfer_manager.store_transfer_context(transfer_context) + + if not conference_id: + transfer_event = TransferEvent( + type=TransferEventType.TRANSFER_FAILED, + transfer_id=transfer_id, + original_call_sid=original_call_sid, + transfer_call_sid=call_control_id, + conference_name=conference_name, + status="transfer_failed", + action="transfer_failed", + reason="conference_create_failed", + message="Failed to bridge the transfer destination into a conference.", + end_call=True, + ) + else: + transfer_event = TransferEvent( + type=TransferEventType.DESTINATION_ANSWERED, + transfer_id=transfer_id, + original_call_sid=original_call_sid, + transfer_call_sid=call_control_id, + conference_name=conference_name, + status="success", + action="destination_answered", + message="Destination answered — bridging into conference.", + ) + elif event_type == "call.hangup": + hangup_cause = payload.get("hangup_cause", "") + + # Post-answer hangup: the destination was already bridged into a + # conference. Telnyx's create_conference doesn't accept + # end_conference_on_exit, so the destination's seed leg has no + # auto-teardown on exit. Hang up the caller explicitly so they + # aren't left in an empty conference. No event to publish — the + # pipeline already tore down on DESTINATION_ANSWERED. + if transfer_context and transfer_context.conference_id: + logger.info( + f"Destination left conference {transfer_context.conference_id} " + f"(transfer={transfer_id}, hangup_cause={hangup_cause}); " + f"hanging up caller to tear down the bridge." + ) + await _hangup_caller_leg(transfer_context) + await call_transfer_manager.remove_transfer_context(transfer_id) + return {"status": "success"} + + # Pre-answer hangup: destination didn't connect at all. + reason = _HANGUP_CAUSE_TO_REASON.get(hangup_cause, "call_failed") + transfer_event = TransferEvent( + type=TransferEventType.TRANSFER_FAILED, + transfer_id=transfer_id, + original_call_sid=original_call_sid, + transfer_call_sid=call_control_id, + conference_name=conference_name, + status="transfer_failed", + action="transfer_failed", + reason=reason, + message=( + f"Transfer destination did not connect (hangup_cause={hangup_cause})." + ), + end_call=True, + ) + else: + logger.debug( + f"Telnyx transfer-result ignoring event_type={event_type} for {transfer_id}" + ) + return {"status": "pending"} + + await call_transfer_manager.publish_transfer_event(transfer_event) + logger.info( + f"Published {transfer_event.type} event for transfer_id={transfer_id} " + f"(status={transfer_event.status})" + ) + + return {"status": "success"} + + +async def _resolve_telnyx_provider( + transfer_context: TransferContext, +) -> TelnyxProvider | None: + """Resolve the TelnyxProvider for this transfer via its workflow_run. + + Routes the lookup through ``get_telephony_provider_for_run`` so the + right credentials are used in multi-config orgs. + """ + workflow_run_id = transfer_context.workflow_run_id + if not workflow_run_id: + logger.error( + f"TransferContext {transfer_context.transfer_id} missing " + f"workflow_run_id; cannot resolve Telnyx provider" + ) + return None + + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if not workflow_run: + logger.error( + f"Workflow run {workflow_run_id} not found for transfer " + f"{transfer_context.transfer_id}" + ) + return None + + workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) + if not workflow: + logger.error( + f"Workflow {workflow_run.workflow_id} not found for transfer " + f"{transfer_context.transfer_id}" + ) + return None + + provider = await get_telephony_provider_for_run( + workflow_run, workflow.organization_id + ) + if not isinstance(provider, TelnyxProvider): + logger.error( + f"Transfer {transfer_context.transfer_id} resolved to non-Telnyx " + f"provider ({type(provider).__name__})" + ) + return None + + return provider + + +async def _seed_destination_conference( + *, + transfer_context: TransferContext, + destination_call_control_id: str, +) -> str | None: + """Create a Telnyx conference seeded with the destination leg.""" + provider = await _resolve_telnyx_provider(transfer_context) + if not provider: + return None + + return await provider.create_conference( + seed_call_control_id=destination_call_control_id, + name=transfer_context.conference_name, + ) + + +async def _hangup_caller_leg(transfer_context: TransferContext) -> None: + """Hang up the caller's leg after the destination left the conference. + + Used when ``call.hangup`` arrives on the transfer-result webhook after + the conference was already created — Telnyx's create_conference doesn't + accept end_conference_on_exit on the seed leg, so the caller has no + auto-teardown when the destination leaves. + + https://developers.telnyx.com/api-reference/call-commands/hangup + """ + provider = await _resolve_telnyx_provider(transfer_context) + if not provider: + return + + await TelnyxHangupStrategy().execute_hangup( + { + "call_control_id": transfer_context.original_call_sid, + "api_key": provider.api_key, + } + ) diff --git a/api/services/telephony/providers/telnyx/strategies.py b/api/services/telephony/providers/telnyx/strategies.py new file mode 100644 index 0000000..15b9458 --- /dev/null +++ b/api/services/telephony/providers/telnyx/strategies.py @@ -0,0 +1,215 @@ +"""Telnyx-specific call operation strategies. + +Caller-side leg of the conference-based transfer. The destination is already +seeded into the conference by the ``call.answered`` webhook handler (see +``providers/telnyx/routes.py``); this strategy just joins the caller into the +existing conference when the pipeline tears down with +``EndTaskReason.TRANSFER_CALL``. + +API reference: +- Join a conference: + https://developers.telnyx.com/api-reference/conference-commands/join-a-conference +- Hangup call: + https://developers.telnyx.com/api-reference/call-commands/hangup +""" + +from typing import Any, Dict + +import aiohttp +from loguru import logger +from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy + +TELNYX_API_BASE = "https://api.telnyx.com/v2" + + +class TelnyxConferenceStrategy(TransferStrategy): + """Joins the caller leg into the conference that the webhook handler + already created (seeded with the destination on ``call.answered``). + """ + + async def execute_transfer(self, context: Dict[str, Any]) -> bool: + caller_call_control_id = context["call_control_id"] + api_key = context["api_key"] + + transfer_context = await self._find_transfer_context_for_call( + caller_call_control_id + ) + if not transfer_context: + logger.error( + f"[Telnyx Transfer] No active transfer context found for " + f"call {caller_call_control_id}" + ) + return False + + conference_id = transfer_context.conference_id + if not conference_id: + logger.error( + f"[Telnyx Transfer] Transfer context {transfer_context.transfer_id} " + f"has no conference_id — webhook handler likely failed to seed " + f"the destination conference." + ) + await self._cleanup_transfer_context(transfer_context.transfer_id) + return False + + logger.info( + f"[Telnyx Transfer] Joining caller {caller_call_control_id} into " + f"conference {conference_id} (transfer={transfer_context.transfer_id})" + ) + + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Bearer {api_key}", + } + + try: + async with aiohttp.ClientSession() as session: + joined = await self._join_caller( + session, + headers, + conference_id=conference_id, + caller_call_control_id=caller_call_control_id, + ) + await self._cleanup_transfer_context(transfer_context.transfer_id) + return joined + + except Exception as e: + logger.error(f"[Telnyx Transfer] Failed to join caller into conference: {e}") + await self._cleanup_transfer_context(transfer_context.transfer_id) + return False + + async def _join_caller( + self, + session: aiohttp.ClientSession, + headers: Dict[str, str], + *, + conference_id: str, + caller_call_control_id: str, + ) -> bool: + """Join the caller leg into the conference. + + end_conference_on_exit=true so the conference tears down when the + caller hangs up. https://developers.telnyx.com/api-reference/conference-commands/join-a-conference + """ + endpoint = f"{TELNYX_API_BASE}/conferences/{conference_id}/actions/join" + payload = { + "call_control_id": caller_call_control_id, + "end_conference_on_exit": True, + } + async with session.post(endpoint, json=payload, headers=headers) as response: + body = await response.text() + if response.status != 200: + logger.error( + f"[Telnyx Transfer] Join caller {caller_call_control_id} into " + f"conference {conference_id} failed: " + f"status={response.status} body={body}" + ) + return False + logger.info( + f"[Telnyx Transfer] Caller {caller_call_control_id} joined " + f"conference {conference_id}" + ) + return True + + async def _find_transfer_context_for_call(self, caller_call_control_id: str): + """Find the active transfer context whose original_call_sid matches.""" + try: + import redis.asyncio as aioredis + + from api.constants import REDIS_URL + from api.services.telephony.transfer_event_protocol import TransferContext + + redis = aioredis.from_url(REDIS_URL, decode_responses=True) + transfer_keys = await redis.keys("transfer:context:*") + + for key in transfer_keys: + try: + context_data = await redis.get(key) + if context_data: + context = TransferContext.from_json(context_data) + if context.original_call_sid == caller_call_control_id: + return context + except Exception: + continue + + return None + + except Exception as e: + logger.error(f"[Telnyx Transfer] Error finding transfer context: {e}") + return None + + async def _cleanup_transfer_context(self, transfer_id: str): + try: + from api.services.telephony.call_transfer_manager import ( + get_call_transfer_manager, + ) + + manager = await get_call_transfer_manager() + await manager.remove_transfer_context(transfer_id) + except Exception as e: + logger.error(f"[Telnyx Transfer] Error cleaning up transfer context: {e}") + + +class TelnyxHangupStrategy(HangupStrategy): + """REST-API hangup for Telnyx calls. + + https://developers.telnyx.com/api-reference/call-commands/hangup + """ + + async def execute_hangup(self, context: Dict[str, Any]) -> bool: + call_control_id = context["call_control_id"] + api_key = context["api_key"] + + if not call_control_id or not api_key: + logger.warning( + "Cannot hang up Telnyx call: missing call_control_id or api_key" + ) + return False + + endpoint = ( + f"{TELNYX_API_BASE}/calls/{call_control_id}/actions/hangup" + ) + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}", + } + + try: + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, headers=headers) as response: + if response.status == 200: + logger.info( + f"Successfully terminated Telnyx call {call_control_id}" + ) + return True + if response.status == 422: + # 90018: "Call has already ended" + # https://developers.telnyx.com/api/errors/90018 + try: + error_data = await response.json() + if any( + err.get("code") == "90018" + for err in error_data.get("errors", []) + ): + logger.debug( + f"Telnyx call {call_control_id} was already terminated" + ) + return True + except Exception: + pass + text = await response.text() + logger.error( + f"Failed to terminate Telnyx call {call_control_id}: " + f"status={response.status} body={text}" + ) + return False + text = await response.text() + logger.error( + f"Failed to terminate Telnyx call {call_control_id}: " + f"status={response.status} body={text}" + ) + return False + + except Exception as e: + logger.exception(f"Failed to hang up Telnyx call: {e}") + return False diff --git a/api/services/telephony/providers/telnyx/transport.py b/api/services/telephony/providers/telnyx/transport.py index c2b96f4..393e688 100644 --- a/api/services/telephony/providers/telnyx/transport.py +++ b/api/services/telephony/providers/telnyx/transport.py @@ -11,6 +11,7 @@ from api.services.pipecat.audio_mixer import build_audio_out_mixer from api.services.telephony.factory import load_credentials_for_transport from .serializers import TelnyxFrameSerializer +from .strategies import TelnyxConferenceStrategy, TelnyxHangupStrategy async def create_transport( @@ -46,6 +47,8 @@ async def create_transport( api_key=api_key, inbound_encoding="PCMU", # Dograh → Telnyx; matches stream_bidirectional_codec outbound_encoding=encoding, # Telnyx → Dograh; from media_format.encoding + transfer_strategy=TelnyxConferenceStrategy(), + hangup_strategy=TelnyxHangupStrategy(), ) mixer = await build_audio_out_mixer( diff --git a/api/services/telephony/transfer_event_protocol.py b/api/services/telephony/transfer_event_protocol.py index 7e7815e..e47df08 100644 --- a/api/services/telephony/transfer_event_protocol.py +++ b/api/services/telephony/transfer_event_protocol.py @@ -68,6 +68,10 @@ class TransferContext: original_call_sid: str conference_name: str initiated_at: float + # workflow_run_id: lets transfer_id-keyed webhooks resolve org/credentials. + # conference_id: set by providers that seed the conference on answer (Telnyx). + workflow_run_id: Optional[int] = None + conference_id: Optional[str] = None def to_json(self) -> str: """Convert context to JSON string.""" diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index f4b5a2b..844cc10 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -25,7 +25,7 @@ from api.db import db_client from api.enums import ToolCategory, WorkflowRunMode from api.services.pipecat.audio_playback import play_audio, play_audio_loop from api.services.telephony.call_transfer_manager import get_call_transfer_manager -from api.services.telephony.factory import get_telephony_provider +from api.services.telephony.factory import get_telephony_provider_for_run from api.services.telephony.transfer_event_protocol import TransferContext from api.services.workflow.tools.calculator import get_calculator_tools, safe_calculator from api.services.workflow.tools.custom_tool import ( @@ -511,7 +511,9 @@ class CustomToolManager: ) return - provider = await get_telephony_provider(organization_id) + provider = await get_telephony_provider_for_run( + workflow_run, organization_id + ) if not provider.supports_transfers() or not provider.validate_config(): validation_error_result = { "status": "failed", @@ -542,6 +544,7 @@ class CustomToolManager: original_call_sid=original_call_sid, conference_name=conference_name, initiated_at=time.time(), + workflow_run_id=self._engine._workflow_run_id, ) await call_transfer_manager.store_transfer_context(transfer_context) diff --git a/pipecat b/pipecat index 0d0e3b3..31199bd 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 0d0e3b3bc0bc03f3d3c167dc609ea24eb22e72a0 +Subproject commit 31199bd1f155a872690ff1de24b0fa5ccff49396