dograh/api/services/telephony/call_transfer_manager.py

201 lines
7.2 KiB
Python
Raw Normal View History

"""Redis-based transfer event coordination service
2026-01-22 12:19:34 +05:30
Handles transfer event publishing, subscription, and context storage
"""
import asyncio
import time
2026-02-14 13:43:20 +05:30
from typing import Dict, Optional
2026-01-22 12:19:34 +05:30
import redis.asyncio as aioredis
2026-02-14 13:43:20 +05:30
from loguru import logger
2026-01-22 12:19:34 +05:30
from api.constants import REDIS_URL
from api.services.telephony.transfer_event_protocol import (
TransferContext,
2026-02-14 13:43:20 +05:30
TransferEvent,
2026-01-22 12:19:34 +05:30
TransferEventType,
2026-02-14 13:43:20 +05:30
TransferRedisChannels,
2026-01-22 12:19:34 +05:30
)
2026-02-14 07:28:36 +05:30
class CallTransferManager:
"""Manages call transfer events and context storage using Redis."""
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
def __init__(self, redis_client: Optional[aioredis.Redis] = None):
self._redis_client = redis_client
self._pubsub_connections: Dict[str, aioredis.client.PubSub] = {}
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
async def _get_redis(self) -> aioredis.Redis:
"""Get Redis client instance."""
if not self._redis_client:
self._redis_client = await aioredis.from_url(
REDIS_URL, decode_responses=True
)
return self._redis_client
2026-02-14 13:43:20 +05:30
async def store_transfer_context(
self, context: TransferContext, ttl: int = 300
) -> None:
2026-01-22 12:19:34 +05:30
"""Store transfer context in Redis with TTL.
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
Args:
context: Transfer context data
ttl: Time to live in seconds (default 5 minutes)
"""
try:
redis = await self._get_redis()
2026-02-14 13:43:20 +05:30
key = TransferRedisChannels.transfer_context_key(context.transfer_id)
2026-01-22 12:19:34 +05:30
await redis.setex(key, ttl, context.to_json())
2026-02-14 13:43:20 +05:30
logger.debug(f"Stored transfer context for {context.transfer_id}")
2026-01-22 12:19:34 +05:30
except Exception as e:
logger.error(f"Failed to store transfer context: {e}")
2026-02-14 13:43:20 +05:30
async def get_transfer_context(self, transfer_id: str) -> Optional[TransferContext]:
2026-01-22 12:19:34 +05:30
"""Retrieve transfer context from Redis.
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
Args:
2026-02-14 13:43:20 +05:30
transfer_id: Transfer identifier
2026-01-22 12:19:34 +05:30
Returns:
Transfer context if found, None otherwise
"""
try:
redis = await self._get_redis()
2026-02-14 13:43:20 +05:30
key = TransferRedisChannels.transfer_context_key(transfer_id)
2026-01-22 12:19:34 +05:30
data = await redis.get(key)
if data:
return TransferContext.from_json(data)
return None
except Exception as e:
logger.error(f"Failed to get transfer context: {e}")
return None
2026-02-14 13:43:20 +05:30
async def remove_transfer_context(self, transfer_id: str) -> None:
2026-01-22 12:19:34 +05:30
"""Remove transfer context from Redis.
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
Args:
2026-02-14 13:43:20 +05:30
transfer_id: Transfer identifier
2026-01-22 12:19:34 +05:30
"""
try:
redis = await self._get_redis()
2026-02-14 13:43:20 +05:30
key = TransferRedisChannels.transfer_context_key(transfer_id)
2026-01-22 12:19:34 +05:30
await redis.delete(key)
2026-02-14 13:43:20 +05:30
logger.debug(f"Removed transfer context for {transfer_id}")
2026-01-22 12:19:34 +05:30
except Exception as e:
logger.error(f"Failed to remove transfer context: {e}")
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
async def publish_transfer_event(self, event: TransferEvent) -> None:
"""Publish transfer event to Redis channel.
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
Args:
event: Transfer event to publish
"""
try:
# Add timestamp if not present
if event.timestamp is None:
event.timestamp = time.time()
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
redis = await self._get_redis()
2026-02-14 13:43:20 +05:30
channel = TransferRedisChannels.transfer_events(event.transfer_id)
2026-01-22 12:19:34 +05:30
await redis.publish(channel, event.to_json())
2026-02-14 13:43:20 +05:30
logger.info(f"Published {event.type} event for {event.transfer_id}")
2026-01-22 12:19:34 +05:30
except Exception as e:
logger.error(f"Failed to publish transfer event: {e}")
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
async def wait_for_transfer_completion(
2026-02-14 13:43:20 +05:30
self, transfer_id: str, timeout_seconds: float = 30.0
2026-01-22 12:19:34 +05:30
) -> Optional[TransferEvent]:
"""Wait for transfer completion event using Redis pub/sub.
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
Args:
2026-02-14 13:43:20 +05:30
transfer_id: Transfer identifier to wait for
2026-01-22 12:19:34 +05:30
timeout_seconds: Maximum time to wait
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
Returns:
Transfer completion event if received, None on timeout
"""
2026-02-14 13:43:20 +05:30
channel = TransferRedisChannels.transfer_events(transfer_id)
2026-01-22 12:19:34 +05:30
redis = await self._get_redis()
pubsub = redis.pubsub()
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
try:
await pubsub.subscribe(channel)
2026-02-14 13:43:20 +05:30
logger.info(
f"Waiting for transfer completion on {channel} (timeout: {timeout_seconds}s)"
)
2026-01-22 12:19:34 +05:30
# Wait for completion event with timeout
async def wait_for_message():
async for message in pubsub.listen():
if message["type"] == "message":
try:
event = TransferEvent.from_json(message["data"])
2026-02-14 13:43:20 +05:30
logger.info(
f"Received {event.type} event for {transfer_id}"
)
2026-01-22 12:19:34 +05:30
# Check if this is a completion event
2026-02-14 13:43:20 +05:30
if (
event.type
in [
TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful
TransferEventType.TRANSFER_COMPLETED,
TransferEventType.TRANSFER_FAILED,
TransferEventType.TRANSFER_CANCELLED,
TransferEventType.TRANSFER_TIMEOUT,
]
):
2026-01-22 12:19:34 +05:30
return event
except Exception as e:
logger.error(f"Failed to parse transfer event: {e}")
continue
return None
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
# Wait with timeout
result = await asyncio.wait_for(wait_for_message(), timeout=timeout_seconds)
return result
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
except asyncio.TimeoutError:
2026-02-14 13:43:20 +05:30
logger.debug(f"Transfer completion wait timed out for {transfer_id}")
2026-01-22 12:19:34 +05:30
return None
except Exception as e:
logger.error(f"Error waiting for transfer completion: {e}")
return None
finally:
try:
await pubsub.unsubscribe(channel)
await pubsub.close()
except Exception as e:
logger.error(f"Error closing pubsub connection: {e}")
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
async def cleanup(self):
"""Clean up Redis connections."""
try:
# Close pubsub connections
for pubsub in self._pubsub_connections.values():
try:
await pubsub.close()
except:
pass
self._pubsub_connections.clear()
2026-02-14 13:43:20 +05:30
2026-01-22 12:19:34 +05:30
# Close main Redis connection
if self._redis_client:
await self._redis_client.close()
self._redis_client = None
except Exception as e:
logger.error(f"Error during transfer coordinator cleanup: {e}")
2026-02-14 07:28:36 +05:30
# Global call transfer manager instance
_call_transfer_manager: Optional[CallTransferManager] = None
2026-01-22 12:19:34 +05:30
2026-02-14 07:28:36 +05:30
async def get_call_transfer_manager() -> CallTransferManager:
"""Get or create the global call transfer manager instance."""
global _call_transfer_manager
if not _call_transfer_manager:
_call_transfer_manager = CallTransferManager()
2026-02-14 13:43:20 +05:30
return _call_transfer_manager