refactor: add call strategies, cleanup transfer events

This commit is contained in:
Sabiha Khan 2026-02-26 16:30:11 +05:30
parent e1d8b52b42
commit 99ebede3e3
9 changed files with 154 additions and 177 deletions

View file

@ -1670,7 +1670,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
result = {
"status": "success",
"message": "Great! The destination number answered. Let me transfer you now.",
"action": "transfer_success",
"action": "destination_answered",
"conference_id": conference_name,
"transfer_call_sid": call_sid, # The outbound transfer call SID
"original_call_sid": original_call_sid, # The original caller's SID
@ -1714,9 +1714,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
try:
# Determine event type based on result status
if result["status"] == "success":
event_type = TransferEventType.TRANSFER_COMPLETED
elif result.get("reason") == "timeout":
event_type = TransferEventType.TRANSFER_TIMEOUT
event_type = TransferEventType.DESTINATION_ANSWERED
else:
event_type = TransferEventType.TRANSFER_FAILED

View file

@ -6,6 +6,14 @@ from api.constants import APP_ROOT_DIR
from api.db import db_client
from api.enums import OrganizationConfigurationKey
from api.services.pipecat.audio_config import AudioConfig
from api.services.telephony.providers.ari_call_strategies import (
ARIBridgeSwapStrategy,
ARIHangupStrategy,
)
from api.services.telephony.providers.twilio_call_strategies import (
TwilioConferenceStrategy,
TwilioHangupStrategy,
)
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
@ -54,12 +62,17 @@ async def create_twilio_transport(
raise ValueError(
f"Incomplete Twilio configuration for organization {organization_id}"
)
# Create strategy instances
transfer_strategy = TwilioConferenceStrategy()
hangup_strategy = TwilioHangupStrategy()
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=account_sid,
auth_token=auth_token,
transfer_strategy=transfer_strategy,
hangup_strategy=hangup_strategy,
)
return FastAPIWebsocketTransport(
@ -178,12 +191,17 @@ async def create_ari_transport(
f"Incomplete ARI configuration for organization {organization_id}. "
f"Required: ari_endpoint, app_name, app_password"
)
# Create strategy instances
transfer_strategy = ARIBridgeSwapStrategy()
hangup_strategy = ARIHangupStrategy()
serializer = AsteriskFrameSerializer(
channel_id=channel_id,
ari_endpoint=ari_endpoint,
app_name=app_name,
app_password=app_password,
transfer_strategy=transfer_strategy,
hangup_strategy=hangup_strategy,
params=AsteriskFrameSerializer.InputParams(
asterisk_sample_rate=audio_config.transport_in_sample_rate,
sample_rate=audio_config.pipeline_sample_rate,

View file

@ -284,7 +284,7 @@ class ARIConnection:
f"channel={channel_id}, transfer_id={transfer_id}"
)
asyncio.create_task(
self._handle_transfer_answered(transfer_id, channel_id)
self._handle_destination_answered(transfer_id, channel_id)
)
return
@ -753,7 +753,7 @@ class ARIConnection:
)
return None
async def _handle_transfer_answered(
async def _handle_destination_answered(
self, transfer_id: str, destination_channel_id: str
):
"""Handle transfer destination channel answered - publish success event."""
@ -780,16 +780,16 @@ class ARIConnection:
f"caller={context.original_call_sid} -> destination={destination_channel_id}"
)
# Publish transfer success event - this will trigger the bridge swap in serializer
# Publish destination answered event - this will trigger the bridge swap in serializer
success_event = TransferEvent(
type=TransferEventType.TRANSFER_ANSWERED,
type=TransferEventType.DESTINATION_ANSWERED,
transfer_id=transfer_id,
original_call_sid=context.original_call_sid,
transfer_call_sid=destination_channel_id,
conference_name=context.conference_name,
message="Transfer destination answered",
status="success",
action="transfer_success",
action="destination_answered",
end_call=True,
timestamp=time.time(),
)

View file

@ -158,16 +158,10 @@ class CallTransferManager:
)
# Check if this is a completion event
if (
event.type
in [
TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful
TransferEventType.TRANSFER_COMPLETED,
TransferEventType.TRANSFER_FAILED,
TransferEventType.TRANSFER_CANCELLED,
TransferEventType.TRANSFER_TIMEOUT,
]
):
if event.type in [
TransferEventType.DESTINATION_ANSWERED,
TransferEventType.TRANSFER_FAILED,
]:
return event
except Exception as e:
logger.error(f"Failed to parse transfer event: {e}")

View file

@ -6,7 +6,6 @@ The ARI WebSocket event listener runs as a separate process (ari_manager.py).
"""
import json
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlparse
@ -393,20 +392,9 @@ class ARIProvider(TelephonyProvider):
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
from api.services.telephony.transfer_event_protocol import TransferContext
# Store transfer context for event correlation
# Get call transfer manager for event correlation mapping
call_transfer_manager = await get_call_transfer_manager()
context = TransferContext(
transfer_id=transfer_id,
call_sid=None, # Will be updated after channel creation
target_number=destination,
tool_uuid=kwargs.get("tool_uuid", ""),
original_call_sid=kwargs.get("original_call_sid", ""),
conference_name=conference_name,
initiated_at=time.time(),
)
await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10)
# Build SIP endpoint
if destination.startswith("SIP/") or destination.startswith("PJSIP/"):
@ -450,12 +438,6 @@ class ARIProvider(TelephonyProvider):
await call_transfer_manager.remove_transfer_context(transfer_id)
raise Exception("Failed to create destination channel")
# Update transfer context with destination channel ID
context.call_sid = destination_channel_id
await call_transfer_manager.store_transfer_context(
context, ttl=timeout + 10
)
# Store transfer channel mapping for event correlation
await call_transfer_manager.store_transfer_channel_mapping(
destination_channel_id, transfer_id

View file

@ -13,12 +13,8 @@ from typing import Any, Dict, Optional
class TransferEventType(str, Enum):
"""Types of transfer events sent between instances."""
TRANSFER_INITIATED = "transfer_initiated"
TRANSFER_ANSWERED = "transfer_answered"
TRANSFER_COMPLETED = "transfer_completed"
DESTINATION_ANSWERED = "destination_answered"
TRANSFER_FAILED = "transfer_failed"
TRANSFER_CANCELLED = "transfer_cancelled"
TRANSFER_TIMEOUT = "transfer_timeout"
@dataclass

View file

@ -395,7 +395,6 @@ class CustomToolManager:
)
return
# Get telephony provider directly (no HTTP round-trip)
provider = await get_telephony_provider(organization_id)
if not provider.supports_transfers() or not provider.validate_config():
validation_error_result = {
@ -418,6 +417,19 @@ class CustomToolManager:
# Compute conference name from original call SID
conference_name = f"transfer-{original_call_sid}"
# Store initial transfer context in Redis before provider call to avoid race condition
call_transfer_manager = await get_call_transfer_manager()
transfer_context = TransferContext(
transfer_id=transfer_id,
call_sid=None, # Will be updated after provider response
target_number=destination,
tool_uuid=tool.tool_uuid,
original_call_sid=original_call_sid,
conference_name=conference_name,
initiated_at=time.time(),
)
await call_transfer_manager.store_transfer_context(transfer_context)
# Mute the pipeline
self._engine.set_mute_pipeline(True)
@ -432,21 +444,8 @@ class CustomToolManager:
call_sid = transfer_result.get("call_sid")
logger.info(f"Transfer call initiated successfully: {call_sid}")
# TODO: Possible race here between saving the transfer context
# and getting a callback response from Twilio? Should we store_transfer_context
# before sending request to Twilio and update the transfer context afterwards?
# Store transfer context in Redis
call_transfer_manager = await get_call_transfer_manager()
transfer_context = TransferContext(
transfer_id=transfer_id,
call_sid=call_sid,
target_number=destination,
tool_uuid=tool.tool_uuid,
original_call_sid=original_call_sid,
conference_name=conference_name,
initiated_at=time.time(),
)
# Update transfer context with actual call_sid from provider response
transfer_context.call_sid = call_sid
await call_transfer_manager.store_transfer_context(transfer_context)
# Wait for status callback completion using Redis pub/sub
@ -538,67 +537,57 @@ class CustomToolManager:
exception_result, function_call_params, properties
)
finally:
# Schedule background cleanup of transfer context after pipeline processing delay
if "transfer_id" in locals():
asyncio.create_task(
self._cleanup_transfer_context_delayed(transfer_id)
)
return transfer_call_handler
async def _cleanup_transfer_context_delayed(self, transfer_id: str):
"""Background task to clean up transfer context after pipeline processing delay."""
try:
# Wait for pipeline to process EndFrame(reason="transfer_call") in serializers
await asyncio.sleep(1.0) # 1 second delay for async pipeline processing
call_transfer_manager = await get_call_transfer_manager()
await call_transfer_manager.remove_transfer_context(transfer_id)
logger.info(f"Background cleanup: removed transfer context {transfer_id}")
except Exception as e:
logger.error(
f"Background cleanup error for transfer context {transfer_id}: {e}"
)
async def _handle_transfer_result(
self, result: dict, function_call_params, properties
):
"""Handle different transfer call outcomes and take appropriate action."""
"""Handle transfer call outcomes from any telephony provider (Twilio, ARI, etc).
This method is provider-agnostic and processes standardized result dictionaries
from transfer completion events, validation failures, timeouts, and errors.
Args:
result: Standardized result dict with keys: action, status, reason, message
function_call_params: LLM function call parameters for response callback
properties: Function call result properties (e.g., run_llm setting)
"""
action = result.get("action", "")
status = result.get("status", "")
logger.info(f"Handling transfer result: action={action}, status={status}")
if action == "transfer_success":
# Successful transfer - add original caller to conference and end pipeline
if action == "destination_answered":
# Transfer destination answered - proceeding with bridge swap/conference join
conference_id = result.get("conference_id")
original_call_sid = result.get("original_call_sid")
transfer_call_sid = result.get("transfer_call_sid")
logger.info(
f"Transfer successful! Conference: {conference_id}, Original: {original_call_sid}, Transfer: {transfer_call_sid}"
f"Transfer destination answered! Conference/Bridge: {conference_id}, "
f"Original: {original_call_sid}, Transfer: {transfer_call_sid}"
)
# Inform LLM of success and end the call with Transfer call reason
# Inform LLM of success and end the call (no further LLM processing needed)
response_properties = FunctionCallResultProperties(run_llm=False)
await function_call_params.result_callback(
{
"status": "transfer_success",
"message": "Transfer successful - connecting to conference",
"message": "Transfer destination answered - connecting calls",
"conference_id": conference_id,
},
properties=response_properties,
)
# End pipeline - providers complete bridge swap/conference join as final transfer leg
await self._engine.end_call_with_reason(
EndTaskReason.TRANSFER_CALL.value, abort_immediately=False
)
elif action == "transfer_failed":
# Transfer failed - inform user via LLM and then end the call
# Transfer failed - let LLM inform user with error details
reason = result.get("reason", "unknown")
logger.info(f"Transfer failed ({reason}), informing user")
logger.info(f"Transfer failed ({reason}), informing user via LLM")
await function_call_params.result_callback(
{