dograh/api/services/telephony/transfer_event_protocol.py
shiminshen 37e7f4d2e6
fix(telephony): resolve transfer context via call-sid index instead of KEYS scan (#387)
Transfer-context lookup by original_call_sid ran
`redis.keys("transfer:context:*")` and iterated every match — an O(N)
blocking scan on call-control hot paths, duplicated across the ARI
manager and the Twilio/Telnyx conference strategies.

Maintain a `transfer:by_call_sid:{original_call_sid}` -> transfer_id
secondary index, written and cleared alongside the context in
store/remove, and resolve lookups with a direct GET. Route the
Twilio/Telnyx strategies through the manager so the lookup lives in one
place (also dropping per-call ad-hoc Redis connections).

Closes #328

Co-authored-by: shiminshen <16914659+shiminshen@users.noreply.github.com>
2026-06-02 12:56:05 +05:30

110 lines
3.4 KiB
Python

"""Redis communication protocol for call transfer coordination.
Defines event formats and Redis channels for coordinating call transfers
across multiple API server instances.
"""
import json
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Dict, Optional
class TransferEventType(str, Enum):
"""Types of transfer events sent between instances."""
DESTINATION_ANSWERED = "destination_answered"
TRANSFER_FAILED = "transfer_failed"
@dataclass
class TransferEvent:
"""Event data structure for transfer coordination."""
type: TransferEventType
transfer_id: str
original_call_sid: str
transfer_call_sid: Optional[str] = None
target_number: Optional[str] = None
conference_name: Optional[str] = None
message: Optional[str] = None
status: Optional[str] = None
action: Optional[str] = None
reason: Optional[str] = None
end_call: bool = False
timestamp: Optional[float] = None
def to_json(self) -> str:
"""Convert event to JSON string."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "TransferEvent":
"""Create event from JSON string."""
return cls(**json.loads(data))
def to_result_dict(self) -> Dict[str, Any]:
"""Convert to function call result format."""
result = {
"status": self.status or "success",
"message": self.message or "",
"action": self.action or self.type,
"conference_id": self.conference_name,
"transfer_call_sid": self.transfer_call_sid,
"original_call_sid": self.original_call_sid,
"reason": self.reason,
}
return result
@dataclass
class TransferContext:
"""Transfer context data stored in Redis."""
transfer_id: str
call_sid: Optional[str]
target_number: str
tool_uuid: str
original_call_sid: str
conference_name: str
initiated_at: float
# workflow_run_id: lets transfer_id-keyed webhooks resolve org/credentials.
# conference_id: set by providers that seed the conference on answer (Telnyx).
workflow_run_id: Optional[int] = None
conference_id: Optional[str] = None
def to_json(self) -> str:
"""Convert context to JSON string."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "TransferContext":
"""Create context from JSON string."""
return cls(**json.loads(data))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
class TransferRedisChannels:
"""Redis channel naming conventions for transfer events."""
@staticmethod
def transfer_events(transfer_id: str) -> str:
"""Channel for transfer events for a specific transfer."""
return f"transfer:events:{transfer_id}"
@staticmethod
def transfer_context_key(transfer_id: str) -> str:
"""Redis key for transfer context storage."""
return f"transfer:context:{transfer_id}"
@staticmethod
def transfer_context_by_call_sid_key(original_call_sid: str) -> str:
"""Redis key for the original_call_sid -> transfer_id secondary index.
Lets a caller's transfer context be resolved with a direct lookup
instead of an O(N) ``KEYS transfer:context:*`` keyspace scan.
"""
return f"transfer:by_call_sid:{original_call_sid}"