feat(telephony/telnyx): add call transfer via conference bridge (#274)

Conference-based transfer for Telnyx with a two-step flow:
- transfer_call dials the destination with a per-transfer webhook URL.
- On call.answered, the webhook seeds a conference with the destination's
  live call_control_id and publishes DESTINATION_ANSWERED.
- TelnyxConferenceStrategy joins the caller into the conference on
  pipeline teardown (EndTaskReason.TRANSFER_CALL).
- On post-answer destination hangup, the webhook hangs up the caller —
  Telnyx's create_conference doesn't accept end_conference_on_exit on
  the seed leg, so we tear down the bridge ourselves.

TransferContext gains optional workflow_run_id (for webhook→provider
resolution in multi-config orgs) and conference_id (set on answer,
rd by the strategy).

Also fixes the transfer tool's provider lookup to go through
get_telephony_provider_for_run instead of the deprecated org-default
shim, which was returning the wrong provider in multi-config orgs.
This commit is contained in:
Sabiha Khan 2026-05-12 13:44:39 +05:30 committed by GitHub
parent 4afe426f12
commit 4a6752e62b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 596 additions and 7 deletions

View file

@ -621,8 +621,127 @@ class TelnyxProvider(TelephonyProvider):
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""Telnyx call transfer is not yet implemented."""
raise NotImplementedError("Call transfer not yet supported for Telnyx")
"""Dial the destination as a plain call; conference is seeded later.
Webhook (``call.answered``) seeds the conference with this leg;
``TelnyxConferenceStrategy`` joins the caller on pipeline teardown.
https://developers.telnyx.com/api-reference/call-commands/dial
"""
if not self.validate_config():
raise ValueError("Telnyx provider not properly configured")
from_number = random.choice(self.from_numbers)
logger.info(f"Selected phone number {from_number} for Telnyx transfer call")
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = (
f"{backend_endpoint}/api/v1/telephony/telnyx/transfer-result/{transfer_id}"
)
payload = {
"connection_id": self.connection_id,
"to": destination,
"from": from_number,
"timeout_secs": timeout,
"webhook_url": webhook_url,
"webhook_url_method": "POST",
}
payload.update(kwargs)
endpoint = f"{self.TELNYX_API_BASE}/calls"
logger.debug(
f"Telnyx transfer dial payload: "
f"{json.dumps({k: v for k, v in payload.items() if k != 'connection_id'})}"
)
try:
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, json=payload, headers=self._headers()
) as response:
response_text = await response.text()
if response.status != 200:
logger.error(
f"Telnyx transfer dial failed: "
f"status={response.status} body={response_text}"
)
raise Exception(
f"Telnyx transfer dial failed: "
f"status={response.status} body={response_text}"
)
response_data = json.loads(response_text)
data = response_data.get("data", {})
call_control_id = data.get("call_control_id", "")
logger.info(
f"Telnyx transfer dial initiated: "
f"call_control_id={call_control_id}, "
f"to={destination}, conference_name={conference_name}"
)
return {
"call_sid": call_control_id,
"status": "initiated",
"provider": self.PROVIDER_NAME,
"from_number": from_number,
"to_number": destination,
"raw_response": response_data,
}
except Exception as e:
logger.error(f"Exception during Telnyx transfer dial: {e}")
raise
def supports_transfers(self) -> bool:
return False
return True
async def create_conference(
self, seed_call_control_id: str, name: str
) -> Optional[str]:
"""Seed a Telnyx conference with an existing call leg.
Used by the transfer flow on ``call.answered`` to put the destination
leg into a conference immediately. The returned ``conference_id`` is stored
on the ``TransferContext`` so the strategy can later join the caller.
https://developers.telnyx.com/api-reference/conference-commands/create-conference
"""
if not self.api_key:
logger.error("Cannot create Telnyx conference: api_key missing")
return None
endpoint = f"{self.TELNYX_API_BASE}/conferences"
payload = {
"call_control_id": seed_call_control_id,
"name": name,
"start_conference_on_create": True,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint, json=payload, headers=self._headers()
) as response:
body = await response.text()
if response.status != 200:
logger.error(
f"Telnyx create_conference failed: "
f"status={response.status} body={body}"
)
return None
data = json.loads(body).get("data", {})
conference_id = data.get("id")
if not conference_id:
logger.error(
f"Telnyx create_conference response missing id: {body}"
)
return None
logger.info(
f"Telnyx conference {conference_id} created (name={name}, "
f"seeded with {seed_call_control_id})"
)
return conference_id
except Exception as e:
logger.error(f"Exception during Telnyx create_conference: {e}")
return None

View file

@ -11,16 +11,39 @@ from loguru import logger
from pipecat.utils.run_context import set_current_run_id
from api.db import db_client
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.factory import get_telephony_provider_for_run
from api.services.telephony.providers.telnyx.provider import normalize_event_type
from api.services.telephony.providers.telnyx.provider import (
TelnyxProvider,
normalize_event_type,
)
from api.services.telephony.providers.telnyx.strategies import TelnyxHangupStrategy
from api.services.telephony.status_processor import (
StatusCallbackRequest,
_process_status_update,
)
from api.services.telephony.transfer_event_protocol import (
TransferContext,
TransferEvent,
TransferEventType,
)
router = APIRouter()
# Hangup causes that signal a failed transfer attempt (vs. a successful call
# that later ended normally). Mapped to user-facing reasons published in the
# TransferEvent. Source for cause values: Telnyx call.hangup payload spec —
# https://developers.telnyx.com/api-reference/callbacks/call-hangup
_HANGUP_CAUSE_TO_REASON = {
"busy": "busy",
"no_answer": "no_answer",
"timeout": "no_answer",
"call_rejected": "call_failed",
"unallocated_number": "call_failed",
}
@router.post("/telnyx/events/{workflow_run_id}")
async def handle_telnyx_events(
request: Request,
@ -80,3 +103,225 @@ async def handle_telnyx_events(
await _process_status_update(workflow_run_id, status_update)
return {"status": "success"}
@router.post("/telnyx/transfer-result/{transfer_id}")
async def handle_telnyx_transfer_result(transfer_id: str, request: Request):
"""Handle Telnyx Call Control events for the transfer destination leg.
The destination leg is dialed by :meth:`TelnyxProvider.transfer_call` with
this URL as ``webhook_url``. Telnyx sends every event for that leg here.
Outcomes:
- ``call.answered``: seed a conference with the destination's live
``call_control_id``, stamp ``conference_id`` onto the TransferContext,
and publish ``DESTINATION_ANSWERED`` so ``transfer_call_handler`` can
end the pipeline. ``TelnyxConferenceStrategy`` then joins the caller
into this conference at pipeline teardown.
- ``call.hangup`` pre-answer (no ``conference_id`` on the context):
publish ``TRANSFER_FAILED`` so the LLM can recover.
- ``call.hangup`` post-answer (``conference_id`` set): the destination
left a bridged conference; hang up the caller's leg to tear down the
empty bridge (Telnyx's create_conference doesn't accept
``end_conference_on_exit`` on the seed leg).
Event references:
- call.answered: https://developers.telnyx.com/api-reference/callbacks/call-answered
- call.hangup: https://developers.telnyx.com/api-reference/callbacks/call-hangup
"""
event_data = await request.json()
logger.info(
f"Telnyx transfer-result webhook (transfer_id={transfer_id}): "
f"{json.dumps(event_data)}"
)
data = event_data.get("data", {})
event_type = normalize_event_type(data.get("event_type", ""))
payload = data.get("payload", {})
call_control_id = payload.get("call_control_id", "")
# Pre-answer events carry no outcome — wait for answered/hangup.
if event_type in ("call.initiated", "call.bridging", "streaming.started"):
return {"status": "pending"}
call_transfer_manager = await get_call_transfer_manager()
transfer_context = await call_transfer_manager.get_transfer_context(transfer_id)
original_call_sid = transfer_context.original_call_sid if transfer_context else ""
conference_name = transfer_context.conference_name if transfer_context else None
if event_type == "call.answered":
# Seed the conference now with the destination's live call_control_id
# from the webhook payload. The strategy at pipeline-end then only has
# to join the caller into this conference. Idempotent on duplicate
# webhooks: if conference_id is already stamped on the context, skip.
conference_id = (
transfer_context.conference_id if transfer_context else None
)
if transfer_context and not conference_id:
conference_id = await _seed_destination_conference(
transfer_context=transfer_context,
destination_call_control_id=call_control_id,
)
if conference_id:
transfer_context.conference_id = conference_id
# Refresh call_sid with the live id from the webhook — it can
# diverge from the dial-response value once the leg is routed
# through its post-answer POP.
transfer_context.call_sid = call_control_id
await call_transfer_manager.store_transfer_context(transfer_context)
if not conference_id:
transfer_event = TransferEvent(
type=TransferEventType.TRANSFER_FAILED,
transfer_id=transfer_id,
original_call_sid=original_call_sid,
transfer_call_sid=call_control_id,
conference_name=conference_name,
status="transfer_failed",
action="transfer_failed",
reason="conference_create_failed",
message="Failed to bridge the transfer destination into a conference.",
end_call=True,
)
else:
transfer_event = TransferEvent(
type=TransferEventType.DESTINATION_ANSWERED,
transfer_id=transfer_id,
original_call_sid=original_call_sid,
transfer_call_sid=call_control_id,
conference_name=conference_name,
status="success",
action="destination_answered",
message="Destination answered — bridging into conference.",
)
elif event_type == "call.hangup":
hangup_cause = payload.get("hangup_cause", "")
# Post-answer hangup: the destination was already bridged into a
# conference. Telnyx's create_conference doesn't accept
# end_conference_on_exit, so the destination's seed leg has no
# auto-teardown on exit. Hang up the caller explicitly so they
# aren't left in an empty conference. No event to publish — the
# pipeline already tore down on DESTINATION_ANSWERED.
if transfer_context and transfer_context.conference_id:
logger.info(
f"Destination left conference {transfer_context.conference_id} "
f"(transfer={transfer_id}, hangup_cause={hangup_cause}); "
f"hanging up caller to tear down the bridge."
)
await _hangup_caller_leg(transfer_context)
await call_transfer_manager.remove_transfer_context(transfer_id)
return {"status": "success"}
# Pre-answer hangup: destination didn't connect at all.
reason = _HANGUP_CAUSE_TO_REASON.get(hangup_cause, "call_failed")
transfer_event = TransferEvent(
type=TransferEventType.TRANSFER_FAILED,
transfer_id=transfer_id,
original_call_sid=original_call_sid,
transfer_call_sid=call_control_id,
conference_name=conference_name,
status="transfer_failed",
action="transfer_failed",
reason=reason,
message=(
f"Transfer destination did not connect (hangup_cause={hangup_cause})."
),
end_call=True,
)
else:
logger.debug(
f"Telnyx transfer-result ignoring event_type={event_type} for {transfer_id}"
)
return {"status": "pending"}
await call_transfer_manager.publish_transfer_event(transfer_event)
logger.info(
f"Published {transfer_event.type} event for transfer_id={transfer_id} "
f"(status={transfer_event.status})"
)
return {"status": "success"}
async def _resolve_telnyx_provider(
transfer_context: TransferContext,
) -> TelnyxProvider | None:
"""Resolve the TelnyxProvider for this transfer via its workflow_run.
Routes the lookup through ``get_telephony_provider_for_run`` so the
right credentials are used in multi-config orgs.
"""
workflow_run_id = transfer_context.workflow_run_id
if not workflow_run_id:
logger.error(
f"TransferContext {transfer_context.transfer_id} missing "
f"workflow_run_id; cannot resolve Telnyx provider"
)
return None
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.error(
f"Workflow run {workflow_run_id} not found for transfer "
f"{transfer_context.transfer_id}"
)
return None
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.error(
f"Workflow {workflow_run.workflow_id} not found for transfer "
f"{transfer_context.transfer_id}"
)
return None
provider = await get_telephony_provider_for_run(
workflow_run, workflow.organization_id
)
if not isinstance(provider, TelnyxProvider):
logger.error(
f"Transfer {transfer_context.transfer_id} resolved to non-Telnyx "
f"provider ({type(provider).__name__})"
)
return None
return provider
async def _seed_destination_conference(
*,
transfer_context: TransferContext,
destination_call_control_id: str,
) -> str | None:
"""Create a Telnyx conference seeded with the destination leg."""
provider = await _resolve_telnyx_provider(transfer_context)
if not provider:
return None
return await provider.create_conference(
seed_call_control_id=destination_call_control_id,
name=transfer_context.conference_name,
)
async def _hangup_caller_leg(transfer_context: TransferContext) -> None:
"""Hang up the caller's leg after the destination left the conference.
Used when ``call.hangup`` arrives on the transfer-result webhook after
the conference was already created Telnyx's create_conference doesn't
accept end_conference_on_exit on the seed leg, so the caller has no
auto-teardown when the destination leaves.
https://developers.telnyx.com/api-reference/call-commands/hangup
"""
provider = await _resolve_telnyx_provider(transfer_context)
if not provider:
return
await TelnyxHangupStrategy().execute_hangup(
{
"call_control_id": transfer_context.original_call_sid,
"api_key": provider.api_key,
}
)

View file

@ -0,0 +1,215 @@
"""Telnyx-specific call operation strategies.
Caller-side leg of the conference-based transfer. The destination is already
seeded into the conference by the ``call.answered`` webhook handler (see
``providers/telnyx/routes.py``); this strategy just joins the caller into the
existing conference when the pipeline tears down with
``EndTaskReason.TRANSFER_CALL``.
API reference:
- Join a conference:
https://developers.telnyx.com/api-reference/conference-commands/join-a-conference
- Hangup call:
https://developers.telnyx.com/api-reference/call-commands/hangup
"""
from typing import Any, Dict
import aiohttp
from loguru import logger
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
TELNYX_API_BASE = "https://api.telnyx.com/v2"
class TelnyxConferenceStrategy(TransferStrategy):
"""Joins the caller leg into the conference that the webhook handler
already created (seeded with the destination on ``call.answered``).
"""
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
caller_call_control_id = context["call_control_id"]
api_key = context["api_key"]
transfer_context = await self._find_transfer_context_for_call(
caller_call_control_id
)
if not transfer_context:
logger.error(
f"[Telnyx Transfer] No active transfer context found for "
f"call {caller_call_control_id}"
)
return False
conference_id = transfer_context.conference_id
if not conference_id:
logger.error(
f"[Telnyx Transfer] Transfer context {transfer_context.transfer_id} "
f"has no conference_id — webhook handler likely failed to seed "
f"the destination conference."
)
await self._cleanup_transfer_context(transfer_context.transfer_id)
return False
logger.info(
f"[Telnyx Transfer] Joining caller {caller_call_control_id} into "
f"conference {conference_id} (transfer={transfer_context.transfer_id})"
)
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {api_key}",
}
try:
async with aiohttp.ClientSession() as session:
joined = await self._join_caller(
session,
headers,
conference_id=conference_id,
caller_call_control_id=caller_call_control_id,
)
await self._cleanup_transfer_context(transfer_context.transfer_id)
return joined
except Exception as e:
logger.error(f"[Telnyx Transfer] Failed to join caller into conference: {e}")
await self._cleanup_transfer_context(transfer_context.transfer_id)
return False
async def _join_caller(
self,
session: aiohttp.ClientSession,
headers: Dict[str, str],
*,
conference_id: str,
caller_call_control_id: str,
) -> bool:
"""Join the caller leg into the conference.
end_conference_on_exit=true so the conference tears down when the
caller hangs up. https://developers.telnyx.com/api-reference/conference-commands/join-a-conference
"""
endpoint = f"{TELNYX_API_BASE}/conferences/{conference_id}/actions/join"
payload = {
"call_control_id": caller_call_control_id,
"end_conference_on_exit": True,
}
async with session.post(endpoint, json=payload, headers=headers) as response:
body = await response.text()
if response.status != 200:
logger.error(
f"[Telnyx Transfer] Join caller {caller_call_control_id} into "
f"conference {conference_id} failed: "
f"status={response.status} body={body}"
)
return False
logger.info(
f"[Telnyx Transfer] Caller {caller_call_control_id} joined "
f"conference {conference_id}"
)
return True
async def _find_transfer_context_for_call(self, caller_call_control_id: str):
"""Find the active transfer context whose original_call_sid matches."""
try:
import redis.asyncio as aioredis
from api.constants import REDIS_URL
from api.services.telephony.transfer_event_protocol import TransferContext
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
transfer_keys = await redis.keys("transfer:context:*")
for key in transfer_keys:
try:
context_data = await redis.get(key)
if context_data:
context = TransferContext.from_json(context_data)
if context.original_call_sid == caller_call_control_id:
return context
except Exception:
continue
return None
except Exception as e:
logger.error(f"[Telnyx Transfer] Error finding transfer context: {e}")
return None
async def _cleanup_transfer_context(self, transfer_id: str):
try:
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
manager = await get_call_transfer_manager()
await manager.remove_transfer_context(transfer_id)
except Exception as e:
logger.error(f"[Telnyx Transfer] Error cleaning up transfer context: {e}")
class TelnyxHangupStrategy(HangupStrategy):
"""REST-API hangup for Telnyx calls.
https://developers.telnyx.com/api-reference/call-commands/hangup
"""
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
call_control_id = context["call_control_id"]
api_key = context["api_key"]
if not call_control_id or not api_key:
logger.warning(
"Cannot hang up Telnyx call: missing call_control_id or api_key"
)
return False
endpoint = (
f"{TELNYX_API_BASE}/calls/{call_control_id}/actions/hangup"
)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, headers=headers) as response:
if response.status == 200:
logger.info(
f"Successfully terminated Telnyx call {call_control_id}"
)
return True
if response.status == 422:
# 90018: "Call has already ended"
# https://developers.telnyx.com/api/errors/90018
try:
error_data = await response.json()
if any(
err.get("code") == "90018"
for err in error_data.get("errors", [])
):
logger.debug(
f"Telnyx call {call_control_id} was already terminated"
)
return True
except Exception:
pass
text = await response.text()
logger.error(
f"Failed to terminate Telnyx call {call_control_id}: "
f"status={response.status} body={text}"
)
return False
text = await response.text()
logger.error(
f"Failed to terminate Telnyx call {call_control_id}: "
f"status={response.status} body={text}"
)
return False
except Exception as e:
logger.exception(f"Failed to hang up Telnyx call: {e}")
return False

View file

@ -11,6 +11,7 @@ from api.services.pipecat.audio_mixer import build_audio_out_mixer
from api.services.telephony.factory import load_credentials_for_transport
from .serializers import TelnyxFrameSerializer
from .strategies import TelnyxConferenceStrategy, TelnyxHangupStrategy
async def create_transport(
@ -46,6 +47,8 @@ async def create_transport(
api_key=api_key,
inbound_encoding="PCMU", # Dograh → Telnyx; matches stream_bidirectional_codec
outbound_encoding=encoding, # Telnyx → Dograh; from media_format.encoding
transfer_strategy=TelnyxConferenceStrategy(),
hangup_strategy=TelnyxHangupStrategy(),
)
mixer = await build_audio_out_mixer(

View file

@ -68,6 +68,10 @@ class TransferContext:
original_call_sid: str
conference_name: str
initiated_at: float
# workflow_run_id: lets transfer_id-keyed webhooks resolve org/credentials.
# conference_id: set by providers that seed the conference on answer (Telnyx).
workflow_run_id: Optional[int] = None
conference_id: Optional[str] = None
def to_json(self) -> str:
"""Convert context to JSON string."""

View file

@ -25,7 +25,7 @@ from api.db import db_client
from api.enums import ToolCategory, WorkflowRunMode
from api.services.pipecat.audio_playback import play_audio, play_audio_loop
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.factory import get_telephony_provider
from api.services.telephony.factory import get_telephony_provider_for_run
from api.services.telephony.transfer_event_protocol import TransferContext
from api.services.workflow.tools.calculator import get_calculator_tools, safe_calculator
from api.services.workflow.tools.custom_tool import (
@ -511,7 +511,9 @@ class CustomToolManager:
)
return
provider = await get_telephony_provider(organization_id)
provider = await get_telephony_provider_for_run(
workflow_run, organization_id
)
if not provider.supports_transfers() or not provider.validate_config():
validation_error_result = {
"status": "failed",
@ -542,6 +544,7 @@ class CustomToolManager:
original_call_sid=original_call_sid,
conference_name=conference_name,
initiated_at=time.time(),
workflow_run_id=self._engine._workflow_run_id,
)
await call_transfer_manager.store_transfer_context(transfer_context)