feat: telephony call transfer (#155)

* transfer call

* fix: ignore completed call status

* chore: refactor telephony

* chore: refactor pipecat engine custom tools and other telephony services

* chore: code refactor

* chore: put back office ambient sound files

* chore: remove transport from engine

* fix: fix alembic revision

* chore: remove set_transferring_call from engine

* fix: send OutputAudio frame and let transport chunk it

* fix: reinstate docker compose

* chore: remove unused transfer-twmil route for caller

* chore: update pipecat submodule

---------

Co-authored-by: Abhishek Kumar <abhishek@a6k.me>
This commit is contained in:
Sabiha Khan 2026-02-16 14:33:33 +05:30 committed by GitHub
parent 5d14d17ceb
commit c711920165
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
30 changed files with 1965 additions and 128 deletions

View file

@ -309,3 +309,46 @@ class TelephonyProvider(ABC):
Tuple of (Response, media_type) - Response object and content type
"""
pass
# ======== CALL TRANSFER METHODS ========
@abstractmethod
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Initiate a call transfer to a destination number.
Args:
destination: The destination phone number (E.164 format)
transfer_id: Unique identifier for tracking this transfer
conference_name: Name of the conference to join the destination into
timeout: Transfer timeout in seconds
**kwargs: Provider-specific additional parameters
Returns:
Dict containing:
- call_sid: Provider's call identifier
- status: Transfer initiation status
- provider: Provider name
Raises:
NotImplementedError: If provider doesn't support transfers
ValueError: If provider configuration is invalid
"""
pass
@abstractmethod
def supports_transfers(self) -> bool:
"""
Check if this provider supports call transfers.
Returns:
True if provider supports call transfers, False otherwise
"""
pass

View file

@ -0,0 +1,200 @@
"""Redis-based transfer event coordination service
Handles transfer event publishing, subscription, and context storage
"""
import asyncio
import time
from typing import Dict, Optional
import redis.asyncio as aioredis
from loguru import logger
from api.constants import REDIS_URL
from api.services.telephony.transfer_event_protocol import (
TransferContext,
TransferEvent,
TransferEventType,
TransferRedisChannels,
)
class CallTransferManager:
"""Manages call transfer events and context storage using Redis."""
def __init__(self, redis_client: Optional[aioredis.Redis] = None):
self._redis_client = redis_client
self._pubsub_connections: Dict[str, aioredis.client.PubSub] = {}
async def _get_redis(self) -> aioredis.Redis:
"""Get Redis client instance."""
if not self._redis_client:
self._redis_client = await aioredis.from_url(
REDIS_URL, decode_responses=True
)
return self._redis_client
async def store_transfer_context(
self, context: TransferContext, ttl: int = 300
) -> None:
"""Store transfer context in Redis with TTL.
Args:
context: Transfer context data
ttl: Time to live in seconds (default 5 minutes)
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(context.transfer_id)
await redis.setex(key, ttl, context.to_json())
logger.debug(f"Stored transfer context for {context.transfer_id}")
except Exception as e:
logger.error(f"Failed to store transfer context: {e}")
async def get_transfer_context(self, transfer_id: str) -> Optional[TransferContext]:
"""Retrieve transfer context from Redis.
Args:
transfer_id: Transfer identifier
Returns:
Transfer context if found, None otherwise
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(transfer_id)
data = await redis.get(key)
if data:
return TransferContext.from_json(data)
return None
except Exception as e:
logger.error(f"Failed to get transfer context: {e}")
return None
async def remove_transfer_context(self, transfer_id: str) -> None:
"""Remove transfer context from Redis.
Args:
transfer_id: Transfer identifier
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(transfer_id)
await redis.delete(key)
logger.debug(f"Removed transfer context for {transfer_id}")
except Exception as e:
logger.error(f"Failed to remove transfer context: {e}")
async def publish_transfer_event(self, event: TransferEvent) -> None:
"""Publish transfer event to Redis channel.
Args:
event: Transfer event to publish
"""
try:
# Add timestamp if not present
if event.timestamp is None:
event.timestamp = time.time()
redis = await self._get_redis()
channel = TransferRedisChannels.transfer_events(event.transfer_id)
await redis.publish(channel, event.to_json())
logger.info(f"Published {event.type} event for {event.transfer_id}")
except Exception as e:
logger.error(f"Failed to publish transfer event: {e}")
async def wait_for_transfer_completion(
self, transfer_id: str, timeout_seconds: float = 30.0
) -> Optional[TransferEvent]:
"""Wait for transfer completion event using Redis pub/sub.
Args:
transfer_id: Transfer identifier to wait for
timeout_seconds: Maximum time to wait
Returns:
Transfer completion event if received, None on timeout
"""
channel = TransferRedisChannels.transfer_events(transfer_id)
redis = await self._get_redis()
pubsub = redis.pubsub()
try:
await pubsub.subscribe(channel)
logger.info(
f"Waiting for transfer completion on {channel} (timeout: {timeout_seconds}s)"
)
# Wait for completion event with timeout
async def wait_for_message():
async for message in pubsub.listen():
if message["type"] == "message":
try:
event = TransferEvent.from_json(message["data"])
logger.info(
f"Received {event.type} event for {transfer_id}"
)
# Check if this is a completion event
if (
event.type
in [
TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful
TransferEventType.TRANSFER_COMPLETED,
TransferEventType.TRANSFER_FAILED,
TransferEventType.TRANSFER_CANCELLED,
TransferEventType.TRANSFER_TIMEOUT,
]
):
return event
except Exception as e:
logger.error(f"Failed to parse transfer event: {e}")
continue
return None
# Wait with timeout
result = await asyncio.wait_for(wait_for_message(), timeout=timeout_seconds)
return result
except asyncio.TimeoutError:
logger.debug(f"Transfer completion wait timed out for {transfer_id}")
return None
except Exception as e:
logger.error(f"Error waiting for transfer completion: {e}")
return None
finally:
try:
await pubsub.unsubscribe(channel)
await pubsub.close()
except Exception as e:
logger.error(f"Error closing pubsub connection: {e}")
async def cleanup(self):
"""Clean up Redis connections."""
try:
# Close pubsub connections
for pubsub in self._pubsub_connections.values():
try:
await pubsub.close()
except:
pass
self._pubsub_connections.clear()
# Close main Redis connection
if self._redis_client:
await self._redis_client.close()
self._redis_client = None
except Exception as e:
logger.error(f"Error during transfer coordinator cleanup: {e}")
# Global call transfer manager instance
_call_transfer_manager: Optional[CallTransferManager] = None
async def get_call_transfer_manager() -> CallTransferManager:
"""Get or create the global call transfer manager instance."""
global _call_transfer_manager
if not _call_transfer_manager:
_call_transfer_manager = CallTransferManager()
return _call_transfer_manager

View file

@ -680,3 +680,30 @@ class CloudonixProvider(TelephonyProvider):
</Response>"""
return Response(content=twiml, media_type="application/xml"), "application/xml"
# ======== CALL TRANSFER METHODS ========
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Cloudonix provider does not support call transfers.
Raises:
NotImplementedError: Cloudonix call transfers are yet to be implemented
"""
raise NotImplementedError("Cloudonix provider does not support call transfers")
def supports_transfers(self) -> bool:
"""
Cloudonix does not support call transfers.
Returns:
False - Cloudonix provider does not support call transfers
"""
return False

View file

@ -72,6 +72,7 @@ class TwilioProvider(TelephonyProvider):
if from_number is None:
from_number = random.choice(self.from_numbers)
logger.info(f"Selected phone number {from_number} for outbound call")
logger.info(f"Webhook url received - {webhook_url}")
# Prepare call data
data = {"To": to_number, "From": from_number, "Url": webhook_url}
@ -172,6 +173,7 @@ class TwilioProvider(TelephonyProvider):
</Connect>
<Pause length="40"/>
</Response>"""
logger.info(f"Twiml content generated - {twiml_content}")
return twiml_content
async def get_call_cost(self, call_id: str) -> Dict[str, Any]:
@ -459,3 +461,129 @@ class TwilioProvider(TelephonyProvider):
</Response>"""
return Response(content=twiml_content, media_type="application/xml")
# ======== CALL TRANSFER METHODS ========
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Initiate a call transfer via Twilio.
Uses inline TwiML to put the destination into a conference when they answer,
and a status callback to track the transfer outcome.
Args:
destination: The destination phone number (E.164 format)
transfer_id: Unique identifier for tracking this transfer
conference_name: Name of the conference to join the destination into
timeout: Transfer timeout in seconds
**kwargs: Additional Twilio-specific parameters
Returns:
Dict containing transfer result information
Raises:
ValueError: If provider configuration is invalid
Exception: If Twilio API call fails
"""
if not self.validate_config():
raise ValueError("Twilio provider not properly configured")
# Select a random phone number for the transfer
from_number = random.choice(self.from_numbers)
logger.info(f"Selected phone number {from_number} for transfer call")
backend_endpoint, _ = await get_backend_endpoints()
status_callback_url = (
f"{backend_endpoint}/api/v1/telephony/transfer-result/{transfer_id}"
)
# Inline TwiML: when the destination answers, put them into the conference
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>You have answered a transfer call. Connecting you now.</Say>
<Dial>
<Conference endConferenceOnExit="true">{conference_name}</Conference>
</Dial>
</Response>"""
# Prepare Twilio API call data
endpoint = f"{self.base_url}/Calls.json"
data = {
"To": destination,
"From": from_number,
"Timeout": timeout,
"Twiml": twiml,
"StatusCallback": status_callback_url,
"StatusCallbackEvent": [
"answered",
"no-answer",
"busy",
"failed",
"completed",
],
"StatusCallbackMethod": "POST",
}
# Add any additional kwargs
data.update(kwargs)
try:
logger.debug(f"Transfer call data: {data}")
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth(self.account_sid, self.auth_token)
async with session.post(endpoint, data=data, auth=auth) as response:
response_status = response.status
response_text = await response.text()
logger.info(
f"Twilio transfer API response status: {response_status}"
)
logger.debug(f"Twilio transfer API response body: {response_text}")
if response_status in [200, 201]:
try:
response_data = await response.json()
call_sid = response_data.get("sid")
logger.info(
f"Transfer call initiated successfully: {call_sid}"
)
return {
"call_sid": call_sid,
"status": response_data.get("status", "queued"),
"provider": self.PROVIDER_NAME,
"from_number": from_number,
"to_number": destination,
"raw_response": response_data,
}
except Exception as e:
logger.error(
f"Failed to parse Twilio transfer response JSON: {e}"
)
raise Exception(f"Failed to parse transfer response: {e}")
else:
error_msg = f"Twilio API call failed with status {response_status}: {response_text}"
logger.error(error_msg)
raise Exception(error_msg)
except Exception as e:
logger.error(f"Exception during Twilio transfer call: {e}")
raise
def supports_transfers(self) -> bool:
"""
Twilio supports call transfers.
Returns:
True - Twilio provider supports call transfers
"""
return True

View file

@ -533,3 +533,30 @@ class VobizProvider(TelephonyProvider):
</Response>"""
return Response(content=vobiz_xml_content, media_type="application/xml")
# ======== CALL TRANSFER METHODS ========
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Vobiz provider does not support call transfers.
Raises:
NotImplementedError: Vobiz call transfers are yet to be implemented
"""
raise NotImplementedError("Vobiz provider does not support call transfers")
def supports_transfers(self) -> bool:
"""
Vobiz does not support call transfers.
Returns:
False - Vobiz provider does not support call transfers
"""
return False

View file

@ -484,3 +484,30 @@ class VonageProvider(TelephonyProvider):
]
return Response(content=json.dumps(error_ncco), media_type="application/json")
# ======== CALL TRANSFER METHODS ========
async def transfer_call(
self,
destination: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Vonage provider does not support call transfers.
Raises:
NotImplementedError: call transfers are yet to be implemented
"""
raise NotImplementedError("Vonage provider does not support call transfers")
def supports_transfers(self) -> bool:
"""
Vonage does not support call transfers.
Returns:
False - Vonage provider does not support call transfers
"""
return False

View file

@ -0,0 +1,102 @@
"""Redis communication protocol for call transfer coordination.
Defines event formats and Redis channels for coordinating call transfers
across multiple API server instances.
"""
import json
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, Optional
class TransferEventType(str, Enum):
"""Types of transfer events sent between instances."""
TRANSFER_INITIATED = "transfer_initiated"
TRANSFER_ANSWERED = "transfer_answered"
TRANSFER_COMPLETED = "transfer_completed"
TRANSFER_FAILED = "transfer_failed"
TRANSFER_CANCELLED = "transfer_cancelled"
TRANSFER_TIMEOUT = "transfer_timeout"
@dataclass
class TransferEvent:
"""Event data structure for transfer coordination."""
type: TransferEventType
transfer_id: str
original_call_sid: str
transfer_call_sid: Optional[str] = None
target_number: Optional[str] = None
conference_name: Optional[str] = None
message: Optional[str] = None
status: Optional[str] = None
action: Optional[str] = None
reason: Optional[str] = None
end_call: bool = False
timestamp: Optional[float] = None
def to_json(self) -> str:
"""Convert event to JSON string."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "TransferEvent":
"""Create event from JSON string."""
return cls(**json.loads(data))
def to_result_dict(self) -> Dict[str, Any]:
"""Convert to function call result format."""
result = {
"status": self.status or "success",
"message": self.message or "",
"action": self.action or self.type,
"conference_id": self.conference_name,
"transfer_call_sid": self.transfer_call_sid,
"original_call_sid": self.original_call_sid,
"end_call": self.end_call,
"reason": self.reason,
}
return result
@dataclass
class TransferContext:
"""Transfer context data stored in Redis."""
transfer_id: str
call_sid: Optional[str]
target_number: str
tool_uuid: str
original_call_sid: str
conference_name: str
initiated_at: float
def to_json(self) -> str:
"""Convert context to JSON string."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "TransferContext":
"""Create context from JSON string."""
return cls(**json.loads(data))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
class TransferRedisChannels:
"""Redis channel naming conventions for transfer events."""
@staticmethod
def transfer_events(transfer_id: str) -> str:
"""Channel for transfer events for a specific transfer."""
return f"transfer:events:{transfer_id}"
@staticmethod
def transfer_context_key(transfer_id: str) -> str:
"""Redis key for transfer context storage."""
return f"transfer:context:{transfer_id}"