dograh/api/services/telephony/transfer_event_protocol.py
Sabiha Khan 700b7f1145 feat(telephony/telnyx): add call transfer via conference bridge
Conference-based transfer for Telnyx with a two-step flow:
- transfer_call dials the destination with a per-transfer webhook URL.
- On call.answered, the webhook seeds a conference with the destination's
  live call_control_id and publishes DESTINATION_ANSWERED.
- TelnyxConferenceStrategy joins the caller into the conference on
  pipeline teardown (EndTaskReason.TRANSFER_CALL).
- On post-answer destination hangup, the webhook hangs up the caller —
  Telnyx's create_conference doesn't accept end_conference_on_exit on
  the seed leg, so we tear down the bridge ourselves.

TransferContext gains optional workflow_run_id (for webhook→provider
resolution in multi-config orgs) and conference_id (set on answer,
rd by the strategy).

Also fixes the transfer tool's provider lookup to go through
get_telephony_provider_for_run instead of the deprecated org-default
shim, which was returning the wrong provider in multi-config orgs.
2026-05-12 12:04:37 +05:30

101 lines
3 KiB
Python

"""Redis communication protocol for call transfer coordination.
Defines event formats and Redis channels for coordinating call transfers
across multiple API server instances.
"""
import json
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, Optional
class TransferEventType(str, Enum):
"""Types of transfer events sent between instances."""
DESTINATION_ANSWERED = "destination_answered"
TRANSFER_FAILED = "transfer_failed"
@dataclass
class TransferEvent:
"""Event data structure for transfer coordination."""
type: TransferEventType
transfer_id: str
original_call_sid: str
transfer_call_sid: Optional[str] = None
target_number: Optional[str] = None
conference_name: Optional[str] = None
message: Optional[str] = None
status: Optional[str] = None
action: Optional[str] = None
reason: Optional[str] = None
end_call: bool = False
timestamp: Optional[float] = None
def to_json(self) -> str:
"""Convert event to JSON string."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "TransferEvent":
"""Create event from JSON string."""
return cls(**json.loads(data))
def to_result_dict(self) -> Dict[str, Any]:
"""Convert to function call result format."""
result = {
"status": self.status or "success",
"message": self.message or "",
"action": self.action or self.type,
"conference_id": self.conference_name,
"transfer_call_sid": self.transfer_call_sid,
"original_call_sid": self.original_call_sid,
"reason": self.reason,
}
return result
@dataclass
class TransferContext:
"""Transfer context data stored in Redis."""
transfer_id: str
call_sid: Optional[str]
target_number: str
tool_uuid: str
original_call_sid: str
conference_name: str
initiated_at: float
# workflow_run_id: lets transfer_id-keyed webhooks resolve org/credentials.
# conference_id: set by providers that seed the conference on answer (Telnyx).
workflow_run_id: Optional[int] = None
conference_id: Optional[str] = None
def to_json(self) -> str:
"""Convert context to JSON string."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "TransferContext":
"""Create context from JSON string."""
return cls(**json.loads(data))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
class TransferRedisChannels:
"""Redis channel naming conventions for transfer events."""
@staticmethod
def transfer_events(transfer_id: str) -> str:
"""Channel for transfer events for a specific transfer."""
return f"transfer:events:{transfer_id}"
@staticmethod
def transfer_context_key(transfer_id: str) -> str:
"""Redis key for transfer context storage."""
return f"transfer:context:{transfer_id}"