dograh/api/services/telephony/call_transfer_manager.py
shiminshen 37e7f4d2e6
fix(telephony): resolve transfer context via call-sid index instead of KEYS scan (#387)
Transfer-context lookup by original_call_sid ran
`redis.keys("transfer:context:*")` and iterated every match — an O(N)
blocking scan on call-control hot paths, duplicated across the ARI
manager and the Twilio/Telnyx conference strategies.

Maintain a `transfer:by_call_sid:{original_call_sid}` -> transfer_id
secondary index, written and cleared alongside the context in
store/remove, and resolve lookups with a direct GET. Route the
Twilio/Telnyx strategies through the manager so the lookup lives in one
place (also dropping per-call ad-hoc Redis connections).

Closes #328

Co-authored-by: shiminshen <16914659+shiminshen@users.noreply.github.com>
2026-06-02 12:56:05 +05:30

253 lines
9.1 KiB
Python

"""Redis-based transfer event coordination service
Handles transfer event publishing, subscription, and context storage
"""
import asyncio
import time
from typing import Dict, Optional
import redis.asyncio as aioredis
from loguru import logger
from api.constants import REDIS_URL
from api.services.telephony.transfer_event_protocol import (
TransferContext,
TransferEvent,
TransferEventType,
TransferRedisChannels,
)
class CallTransferManager:
"""Manages call transfer events and context storage using Redis."""
def __init__(self, redis_client: Optional[aioredis.Redis] = None):
self._redis_client = redis_client
self._pubsub_connections: Dict[str, aioredis.client.PubSub] = {}
async def _get_redis(self) -> aioredis.Redis:
"""Get Redis client instance."""
if not self._redis_client:
self._redis_client = await aioredis.from_url(
REDIS_URL, decode_responses=True
)
return self._redis_client
async def store_transfer_context(
self, context: TransferContext, ttl: int = 300
) -> None:
"""Store transfer context in Redis with TTL.
Args:
context: Transfer context data
ttl: Time to live in seconds (default 5 minutes)
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(context.transfer_id)
await redis.setex(key, ttl, context.to_json())
if context.original_call_sid:
index_key = TransferRedisChannels.transfer_context_by_call_sid_key(
context.original_call_sid
)
await redis.setex(index_key, ttl, context.transfer_id)
logger.debug(f"Stored transfer context for {context.transfer_id}")
except Exception as e:
logger.error(f"Failed to store transfer context: {e}")
async def get_transfer_context(self, transfer_id: str) -> Optional[TransferContext]:
"""Retrieve transfer context from Redis.
Args:
transfer_id: Transfer identifier
Returns:
Transfer context if found, None otherwise
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(transfer_id)
data = await redis.get(key)
if data:
return TransferContext.from_json(data)
return None
except Exception as e:
logger.error(f"Failed to get transfer context: {e}")
return None
async def remove_transfer_context(self, transfer_id: str) -> None:
"""Remove transfer context from Redis.
Args:
transfer_id: Transfer identifier
"""
try:
redis = await self._get_redis()
context = await self.get_transfer_context(transfer_id)
key = TransferRedisChannels.transfer_context_key(transfer_id)
if context and context.original_call_sid:
index_key = TransferRedisChannels.transfer_context_by_call_sid_key(
context.original_call_sid
)
await redis.delete(key, index_key)
else:
await redis.delete(key)
logger.debug(f"Removed transfer context for {transfer_id}")
except Exception as e:
logger.error(f"Failed to remove transfer context: {e}")
async def store_transfer_channel_mapping(
self, channel_id: str, transfer_id: str
) -> None:
"""Store channel->transfer mapping in Redis for event correlation.
Args:
channel_id: ARI channel ID
transfer_id: Transfer identifier
"""
try:
redis = await self._get_redis()
await redis.setex(
f"ari:transfer_channel:{channel_id}", 300, transfer_id
) # 5 minute TTL
logger.debug(
f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}"
)
except Exception as e:
logger.error(
f"[Transfer Manager] Error storing transfer channel mapping: {e}"
)
async def publish_transfer_event(self, event: TransferEvent) -> None:
"""Publish transfer event to Redis channel.
Args:
event: Transfer event to publish
"""
try:
# Add timestamp if not present
if event.timestamp is None:
event.timestamp = time.time()
redis = await self._get_redis()
channel = TransferRedisChannels.transfer_events(event.transfer_id)
await redis.publish(channel, event.to_json())
logger.info(f"Published {event.type} event for {event.transfer_id}")
except Exception as e:
logger.error(f"Failed to publish transfer event: {e}")
async def wait_for_transfer_completion(
self, transfer_id: str, timeout_seconds: float = 30.0
) -> Optional[TransferEvent]:
"""Wait for transfer completion event using Redis pub/sub.
Args:
transfer_id: Transfer identifier to wait for
timeout_seconds: Maximum time to wait
Returns:
Transfer completion event if received, None on timeout
"""
channel = TransferRedisChannels.transfer_events(transfer_id)
redis = await self._get_redis()
pubsub = redis.pubsub()
try:
await pubsub.subscribe(channel)
logger.info(
f"Waiting for transfer completion on {channel} (timeout: {timeout_seconds}s)"
)
# Wait for completion event with timeout
async def wait_for_message():
async for message in pubsub.listen():
if message["type"] == "message":
try:
event = TransferEvent.from_json(message["data"])
logger.info(
f"Received {event.type} event for {transfer_id}"
)
# Check if this is a completion event
if event.type in [
TransferEventType.DESTINATION_ANSWERED,
TransferEventType.TRANSFER_FAILED,
]:
return event
except Exception as e:
logger.error(f"Failed to parse transfer event: {e}")
continue
return None
# Wait with timeout
result = await asyncio.wait_for(wait_for_message(), timeout=timeout_seconds)
return result
except asyncio.TimeoutError:
logger.debug(f"Transfer completion wait timed out for {transfer_id}")
return None
except Exception as e:
logger.error(f"Error waiting for transfer completion: {e}")
return None
finally:
try:
await pubsub.unsubscribe(channel)
await pubsub.close()
except Exception as e:
logger.error(f"Error closing pubsub connection: {e}")
async def find_transfer_context_for_call(self, caller_channel_id: str):
"""Find the active transfer context for this caller channel.
Resolves via the original_call_sid -> transfer_id secondary index
(see store_transfer_context) instead of scanning the keyspace with
``KEYS transfer:context:*``.
"""
try:
redis = await self._get_redis()
index_key = TransferRedisChannels.transfer_context_by_call_sid_key(
caller_channel_id
)
transfer_id = await redis.get(index_key)
if not transfer_id:
return None
context = await self.get_transfer_context(transfer_id)
if context and context.original_call_sid == caller_channel_id:
return context
return None
except Exception as e:
logger.error(f"[ARI Transfer] Error finding transfer context: {e}")
return None
async def cleanup(self):
"""Clean up Redis connections."""
try:
# Close pubsub connections
for pubsub in self._pubsub_connections.values():
try:
await pubsub.close()
except:
pass
self._pubsub_connections.clear()
# Close main Redis connection
if self._redis_client:
await self._redis_client.close()
self._redis_client = None
except Exception as e:
logger.error(f"Error during transfer coordinator cleanup: {e}")
# Global call transfer manager instance
_call_transfer_manager: Optional[CallTransferManager] = None
async def get_call_transfer_manager() -> CallTransferManager:
"""Get or create the global call transfer manager instance."""
global _call_transfer_manager
if not _call_transfer_manager:
_call_transfer_manager = CallTransferManager()
return _call_transfer_manager