mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-22 08:38:13 +02:00
feat: tansfer calls with aasterisk (#171)
* feat: tansfer calls with aasterisk * chore: format code with pre-commit script * chore: refactor code * refactor: add call strategies, cleanup transfer events * fix: docker compose, add missing files from merge conflicts * chore: update pipecat * docs: restructure & add mintilify pages for tool * chore: upgrade pipecat
This commit is contained in:
parent
9e058699c5
commit
bd07b753cd
19 changed files with 1043 additions and 106 deletions
|
|
@ -1670,7 +1670,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
|
|||
result = {
|
||||
"status": "success",
|
||||
"message": "Great! The destination number answered. Let me transfer you now.",
|
||||
"action": "transfer_success",
|
||||
"action": "destination_answered",
|
||||
"conference_id": conference_name,
|
||||
"transfer_call_sid": call_sid, # The outbound transfer call SID
|
||||
"original_call_sid": original_call_sid, # The original caller's SID
|
||||
|
|
@ -1714,9 +1714,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
|
|||
try:
|
||||
# Determine event type based on result status
|
||||
if result["status"] == "success":
|
||||
event_type = TransferEventType.TRANSFER_COMPLETED
|
||||
elif result.get("reason") == "timeout":
|
||||
event_type = TransferEventType.TRANSFER_TIMEOUT
|
||||
event_type = TransferEventType.DESTINATION_ANSWERED
|
||||
else:
|
||||
event_type = TransferEventType.TRANSFER_FAILED
|
||||
|
||||
|
|
@ -1730,7 +1728,6 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
|
|||
status=result["status"],
|
||||
action=result.get("action", ""),
|
||||
reason=result.get("reason"),
|
||||
end_call=result.get("end_call", False),
|
||||
)
|
||||
|
||||
# Publish the event via Redis
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ class TransferCallConfig(BaseModel):
|
|||
"""Configuration for Transfer Call tools."""
|
||||
|
||||
destination: str = Field(
|
||||
description="Phone number to transfer the call to (E.164 format, e.g., +1234567890)"
|
||||
description="Phone number or SIP endpoint to transfer the call to (E.164 format e.g., +1234567890, or SIP endpoint e.g., PJSIP/1234)"
|
||||
)
|
||||
messageType: Literal["none", "custom"] = Field(
|
||||
default="none", description="Type of message to play before transfer"
|
||||
|
|
@ -89,16 +89,23 @@ class TransferCallConfig(BaseModel):
|
|||
@field_validator("destination")
|
||||
@classmethod
|
||||
def validate_destination(cls, v: str) -> str:
|
||||
"""Validate that destination is a valid E.164 phone number."""
|
||||
"""Validate that destination is a valid E.164 phone number or SIP endpoint."""
|
||||
# Allow empty string for initial creation (like HTTP API tools with empty URL)
|
||||
if not v.strip():
|
||||
return v
|
||||
|
||||
# E.164 format: +[1-9]\d{1,14}
|
||||
e164_pattern = r"^\+[1-9]\d{1,14}$"
|
||||
if not re.match(e164_pattern, v):
|
||||
|
||||
# SIP endpoint format: PJSIP/extension or SIP/extension
|
||||
sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$"
|
||||
|
||||
is_valid_e164 = re.match(e164_pattern, v)
|
||||
is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE)
|
||||
|
||||
if not (is_valid_e164 or is_valid_sip):
|
||||
raise ValueError(
|
||||
"Destination must be a valid E.164 phone number (e.g., +1234567890)"
|
||||
"Destination must be a valid E.164 phone number (e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)"
|
||||
)
|
||||
return v
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,14 @@ from api.constants import APP_ROOT_DIR
|
|||
from api.db import db_client
|
||||
from api.enums import OrganizationConfigurationKey
|
||||
from api.services.pipecat.audio_config import AudioConfig
|
||||
from api.services.telephony.providers.ari_call_strategies import (
|
||||
ARIBridgeSwapStrategy,
|
||||
ARIHangupStrategy,
|
||||
)
|
||||
from api.services.telephony.providers.twilio_call_strategies import (
|
||||
TwilioConferenceStrategy,
|
||||
TwilioHangupStrategy,
|
||||
)
|
||||
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
|
||||
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
|
||||
from pipecat.serializers.asterisk import AsteriskFrameSerializer
|
||||
|
|
@ -54,12 +62,17 @@ async def create_twilio_transport(
|
|||
raise ValueError(
|
||||
f"Incomplete Twilio configuration for organization {organization_id}"
|
||||
)
|
||||
# Create strategy instances
|
||||
transfer_strategy = TwilioConferenceStrategy()
|
||||
hangup_strategy = TwilioHangupStrategy()
|
||||
|
||||
serializer = TwilioFrameSerializer(
|
||||
stream_sid=stream_sid,
|
||||
call_sid=call_sid,
|
||||
account_sid=account_sid,
|
||||
auth_token=auth_token,
|
||||
transfer_strategy=transfer_strategy,
|
||||
hangup_strategy=hangup_strategy,
|
||||
)
|
||||
|
||||
return FastAPIWebsocketTransport(
|
||||
|
|
@ -178,12 +191,17 @@ async def create_ari_transport(
|
|||
f"Incomplete ARI configuration for organization {organization_id}. "
|
||||
f"Required: ari_endpoint, app_name, app_password"
|
||||
)
|
||||
# Create strategy instances
|
||||
transfer_strategy = ARIBridgeSwapStrategy()
|
||||
hangup_strategy = ARIHangupStrategy()
|
||||
|
||||
serializer = AsteriskFrameSerializer(
|
||||
channel_id=channel_id,
|
||||
ari_endpoint=ari_endpoint,
|
||||
app_name=app_name,
|
||||
app_password=app_password,
|
||||
transfer_strategy=transfer_strategy,
|
||||
hangup_strategy=hangup_strategy,
|
||||
params=AsteriskFrameSerializer.InputParams(
|
||||
asterisk_sample_rate=audio_config.transport_in_sample_rate,
|
||||
sample_rate=audio_config.pipeline_sample_rate,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ setup_logging()
|
|||
import asyncio
|
||||
import json
|
||||
import signal
|
||||
import time
|
||||
from typing import Dict, Optional, Set
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
|
@ -26,6 +27,11 @@ from api.constants import REDIS_URL
|
|||
from api.db import db_client
|
||||
from api.enums import CallType, OrganizationConfigurationKey, WorkflowRunMode
|
||||
from api.services.quota_service import check_dograh_quota_by_user_id
|
||||
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
|
||||
from api.services.telephony.transfer_event_protocol import (
|
||||
TransferEvent,
|
||||
TransferEventType,
|
||||
)
|
||||
|
||||
# Redis key pattern and TTL for channel-to-run mapping
|
||||
_CHANNEL_KEY_PREFIX = "ari:channel:"
|
||||
|
|
@ -62,6 +68,9 @@ class ARIConnection:
|
|||
# Redis client for channel-to-run reverse mapping (lazy init)
|
||||
self._redis_client: Optional[aioredis.Redis] = None
|
||||
|
||||
# Transfer manager for handling call transfers
|
||||
self._call_transfer_manager = None
|
||||
|
||||
async def _get_redis(self) -> aioredis.Redis:
|
||||
"""Get Redis client instance (lazy init)."""
|
||||
if not self._redis_client:
|
||||
|
|
@ -70,6 +79,12 @@ class ARIConnection:
|
|||
)
|
||||
return self._redis_client
|
||||
|
||||
async def _get_transfer_manager(self):
|
||||
"""Get transfer manager instance."""
|
||||
if not self._call_transfer_manager:
|
||||
self._call_transfer_manager = await get_call_transfer_manager()
|
||||
return self._call_transfer_manager
|
||||
|
||||
async def _set_channel_run(self, channel_id: str, workflow_run_id: str):
|
||||
"""Store channel_id -> workflow_run_id mapping in Redis."""
|
||||
r = await self._get_redis()
|
||||
|
|
@ -229,6 +244,11 @@ class ARIConnection:
|
|||
channel_id = channel.get("id", "unknown")
|
||||
channel_state = channel.get("state", "unknown")
|
||||
|
||||
# Log all events for each channel
|
||||
logger.debug(
|
||||
f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}"
|
||||
)
|
||||
|
||||
if event_type == "StasisStart":
|
||||
# Skip external media channels we created — they fire
|
||||
# their own StasisStart but need no further handling.
|
||||
|
|
@ -255,6 +275,19 @@ class ARIConnection:
|
|||
)
|
||||
else:
|
||||
# Outbound call (state == "Up") — originated by us
|
||||
# Check if this is a transfer destination channel (app_args starts with "transfer")
|
||||
# Transfer destinations run externally - we only track status to publish transfer event, not run the pipeline
|
||||
transfer_id = self._get_transfer_id(app_args)
|
||||
if transfer_id:
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] Transfer destination answered: "
|
||||
f"channel={channel_id}, transfer_id={transfer_id}"
|
||||
)
|
||||
asyncio.create_task(
|
||||
self._handle_destination_answered(transfer_id, channel_id)
|
||||
)
|
||||
return
|
||||
|
||||
# Parse args to extract workflow context
|
||||
args_dict = {}
|
||||
for arg in app_args:
|
||||
|
|
@ -298,13 +331,26 @@ class ARIConnection:
|
|||
)
|
||||
|
||||
elif event_type == "ChannelDestroyed":
|
||||
cause = channel.get("cause", 0)
|
||||
cause_txt = channel.get("cause_txt", "unknown")
|
||||
cause = event.get("cause", 0)
|
||||
cause_txt = event.get("cause_txt", "unknown")
|
||||
tech_cause = event.get("tech_cause", "unknown")
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] ChannelDestroyed: "
|
||||
f"channel={channel_id}, cause={cause} ({cause_txt})"
|
||||
f"channel={channel_id}, cause={cause} ({cause_txt}), tech_cause = {tech_cause}"
|
||||
)
|
||||
|
||||
# Check if this is a transfer destination that failed
|
||||
transfer_id = await self._get_transfer_id_for_channel(channel_id)
|
||||
if transfer_id:
|
||||
failure_message = self._map_hangup_cause_to_message(
|
||||
cause, tech_cause, cause_txt
|
||||
)
|
||||
asyncio.create_task(
|
||||
self._handle_transfer_failed(
|
||||
transfer_id, channel_id, failure_message
|
||||
)
|
||||
)
|
||||
|
||||
elif event_type == "ChannelDtmfReceived":
|
||||
digit = event.get("digit", "")
|
||||
logger.debug(
|
||||
|
|
@ -580,7 +626,38 @@ class ARIConnection:
|
|||
call_id = ctx.get("call_id")
|
||||
ext_channel_id = ctx.get("ext_channel_id")
|
||||
bridge_id = ctx.get("bridge_id")
|
||||
transfer_state = ctx.get("transfer_state")
|
||||
|
||||
# Check if this is a call transfer scenario external channel. Skip full teardown if
|
||||
# transfer is in progress and this is the external media channel
|
||||
# During call transfer, we preserve the caller-destination bridge
|
||||
if (
|
||||
transfer_state == "in-progress"
|
||||
and channel_id == ext_channel_id
|
||||
and ext_channel_id is not None
|
||||
):
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] Transfer in progress - skipping full teardown "
|
||||
f"for external channel {channel_id}, preserving bridge {bridge_id} and caller {call_id}"
|
||||
)
|
||||
|
||||
# Update transfer state to complete
|
||||
ctx["transfer_state"] = "complete"
|
||||
await db_client.update_workflow_run(
|
||||
run_id=int(workflow_run_id), gathered_context=ctx
|
||||
)
|
||||
|
||||
# Clean up only Redis markers for external channel
|
||||
await self._delete_channel_run(channel_id)
|
||||
await self._delete_ext_channel(channel_id)
|
||||
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] Transfer cleanup complete - preserved caller {call_id} "
|
||||
f"in bridge {bridge_id}"
|
||||
)
|
||||
return
|
||||
|
||||
# Normal full teardown for non-transfer scenarios (transfer_state is None or not in-progress)
|
||||
# Delete the bridge first (removes channels from it)
|
||||
if bridge_id:
|
||||
await self._delete_bridge(bridge_id)
|
||||
|
|
@ -633,6 +710,124 @@ class ARIConnection:
|
|||
f"{response.status} {text}"
|
||||
)
|
||||
|
||||
# ======== CALL TRANSFER HELPER METHODS ========
|
||||
|
||||
def _map_hangup_cause_to_message(
|
||||
self, cause: int, tech_cause: str, cause_txt: str
|
||||
) -> str:
|
||||
"""Map Asterisk cause codes to user-friendly transfer failure messages."""
|
||||
if cause == 17 and tech_cause == "486": # User busy/declined
|
||||
return "The person declined the call or their line is busy."
|
||||
elif cause == 19 and tech_cause == "480": # No answer
|
||||
return "The transfer call was not answered. The person may be busy or unavailable right now."
|
||||
elif cause == 21: # Call rejected
|
||||
return "The transfer call failed to connect. There may be a network issue or the number is unavailable."
|
||||
else:
|
||||
return f"Transfer failed: {cause_txt}"
|
||||
|
||||
def _get_transfer_id(self, app_args: list) -> Optional[str]:
|
||||
"""Get transfer_id if this is a transfer channel, None otherwise.
|
||||
|
||||
Args format: ['transfer', '{transfer_id}', '{conf_name}']
|
||||
"""
|
||||
if len(app_args) > 1 and app_args[0] == "transfer":
|
||||
transfer_id = app_args[1]
|
||||
logger.debug(
|
||||
f"[ARI org={self.organization_id}] Detected transfer channel with transfer_id: {transfer_id}"
|
||||
)
|
||||
return transfer_id
|
||||
return None
|
||||
|
||||
async def _get_transfer_id_for_channel(self, channel_id: str) -> Optional[str]:
|
||||
"""Get transfer_id for a channel by checking Redis mapping."""
|
||||
try:
|
||||
r = await self._get_redis()
|
||||
transfer_id = await r.get(f"ari:transfer_channel:{channel_id}")
|
||||
logger.debug(
|
||||
f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}"
|
||||
)
|
||||
return transfer_id
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
async def _handle_destination_answered(
|
||||
self, transfer_id: str, destination_channel_id: str
|
||||
):
|
||||
"""Handle transfer destination channel answered - publish success event."""
|
||||
try:
|
||||
logger.info(
|
||||
f"[ARI Transfer org={self.organization_id}] Destination {destination_channel_id} "
|
||||
f"answered for transfer {transfer_id}"
|
||||
)
|
||||
|
||||
# Store channel mapping for potential future events and get transfer context
|
||||
transfer_manager = await self._get_transfer_manager()
|
||||
await transfer_manager.store_transfer_channel_mapping(
|
||||
destination_channel_id, transfer_id
|
||||
)
|
||||
context = await transfer_manager.get_transfer_context(transfer_id)
|
||||
if not context:
|
||||
logger.error(
|
||||
f"[ARI Transfer org={self.organization_id}] No transfer context found for {transfer_id}"
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer org={self.organization_id}] Transfer {transfer_id} success: "
|
||||
f"caller={context.original_call_sid} -> destination={destination_channel_id}"
|
||||
)
|
||||
|
||||
# Publish destination answered event - this will trigger the bridge swap in serializer
|
||||
success_event = TransferEvent(
|
||||
type=TransferEventType.DESTINATION_ANSWERED,
|
||||
transfer_id=transfer_id,
|
||||
original_call_sid=context.original_call_sid,
|
||||
transfer_call_sid=destination_channel_id,
|
||||
conference_name=context.conference_name,
|
||||
message="Transfer destination answered",
|
||||
status="success",
|
||||
action="destination_answered",
|
||||
)
|
||||
await transfer_manager.publish_transfer_event(success_event)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}"
|
||||
)
|
||||
# On error, publish failure event
|
||||
await self._handle_transfer_failed(
|
||||
transfer_id, destination_channel_id, f"Transfer processing error: {e}"
|
||||
)
|
||||
|
||||
async def _handle_transfer_failed(
|
||||
self, transfer_id: str, channel_id: str, reason: str
|
||||
):
|
||||
"""Handle transfer failure - publish failure event."""
|
||||
try:
|
||||
logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}")
|
||||
|
||||
transfer_manager = await self._get_transfer_manager()
|
||||
context = await transfer_manager.get_transfer_context(transfer_id)
|
||||
|
||||
# Publish failure event
|
||||
failure_event = TransferEvent(
|
||||
type=TransferEventType.TRANSFER_FAILED,
|
||||
transfer_id=transfer_id,
|
||||
original_call_sid=context.original_call_sid if context else "",
|
||||
transfer_call_sid=channel_id,
|
||||
message=f"Transfer failed: {reason}",
|
||||
status="failed",
|
||||
action="transfer_failed",
|
||||
reason=reason,
|
||||
)
|
||||
await transfer_manager.publish_transfer_event(failure_event)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI Transfer] Error handling transfer failure: {e}")
|
||||
|
||||
async def _delete_channel(self, channel_id: str):
|
||||
"""Delete (hang up) an ARI channel. Ignores 404 (already gone)."""
|
||||
|
||||
|
|
|
|||
|
|
@ -85,6 +85,28 @@ class CallTransferManager:
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to remove transfer context: {e}")
|
||||
|
||||
async def store_transfer_channel_mapping(
|
||||
self, channel_id: str, transfer_id: str
|
||||
) -> None:
|
||||
"""Store channel->transfer mapping in Redis for event correlation.
|
||||
|
||||
Args:
|
||||
channel_id: ARI channel ID
|
||||
transfer_id: Transfer identifier
|
||||
"""
|
||||
try:
|
||||
redis = await self._get_redis()
|
||||
await redis.setex(
|
||||
f"ari:transfer_channel:{channel_id}", 300, transfer_id
|
||||
) # 5 minute TTL
|
||||
logger.debug(
|
||||
f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[Transfer Manager] Error storing transfer channel mapping: {e}"
|
||||
)
|
||||
|
||||
async def publish_transfer_event(self, event: TransferEvent) -> None:
|
||||
"""Publish transfer event to Redis channel.
|
||||
|
||||
|
|
@ -136,16 +158,10 @@ class CallTransferManager:
|
|||
)
|
||||
|
||||
# Check if this is a completion event
|
||||
if (
|
||||
event.type
|
||||
in [
|
||||
TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful
|
||||
TransferEventType.TRANSFER_COMPLETED,
|
||||
TransferEventType.TRANSFER_FAILED,
|
||||
TransferEventType.TRANSFER_CANCELLED,
|
||||
TransferEventType.TRANSFER_TIMEOUT,
|
||||
]
|
||||
):
|
||||
if event.type in [
|
||||
TransferEventType.DESTINATION_ANSWERED,
|
||||
TransferEventType.TRANSFER_FAILED,
|
||||
]:
|
||||
return event
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse transfer event: {e}")
|
||||
|
|
@ -169,6 +185,31 @@ class CallTransferManager:
|
|||
except Exception as e:
|
||||
logger.error(f"Error closing pubsub connection: {e}")
|
||||
|
||||
async def find_transfer_context_for_call(self, caller_channel_id: str):
|
||||
"""Find the active transfer context for this caller channel."""
|
||||
|
||||
redis = await self._get_redis()
|
||||
|
||||
try:
|
||||
# Search Redis for transfer contexts where original_call_sid matches this caller
|
||||
transfer_keys = await redis.keys("transfer:context:*")
|
||||
|
||||
for key in transfer_keys:
|
||||
try:
|
||||
context_data = await redis.get(key)
|
||||
if context_data:
|
||||
context = TransferContext.from_json(context_data)
|
||||
if context.original_call_sid == caller_channel_id:
|
||||
return context
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI Transfer] Error finding transfer context: {e}")
|
||||
return None
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up Redis connections."""
|
||||
try:
|
||||
|
|
|
|||
226
api/services/telephony/providers/ari_call_strategies.py
Normal file
226
api/services/telephony/providers/ari_call_strategies.py
Normal file
|
|
@ -0,0 +1,226 @@
|
|||
"""ARI-specific call operation strategies.
|
||||
|
||||
This module contains the business logic for Asterisk ARI call operations.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
|
||||
|
||||
|
||||
class ARIBridgeSwapStrategy(TransferStrategy):
|
||||
"""Implements bridge swap transfer for Asterisk ARI.
|
||||
|
||||
This strategy handles transferring calls by swapping channels in existing
|
||||
bridges, managing transfer contexts, and publishing
|
||||
transfer completion events.
|
||||
"""
|
||||
|
||||
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
|
||||
"""Execute bridge swap transfer for Asterisk ARI."""
|
||||
try:
|
||||
import aiohttp
|
||||
import redis.asyncio as aioredis
|
||||
from aiohttp import BasicAuth
|
||||
|
||||
channel_id = context["channel_id"]
|
||||
ari_endpoint = context["ari_endpoint"]
|
||||
app_name = context["app_name"]
|
||||
app_password = context["app_password"]
|
||||
|
||||
if not channel_id or not ari_endpoint:
|
||||
logger.warning(
|
||||
"Cannot execute transfer: missing channel_id or ari_endpoint"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Executing bridge swap for channel {channel_id}"
|
||||
)
|
||||
|
||||
from api.constants import REDIS_URL
|
||||
from api.db import db_client
|
||||
from api.services.telephony.call_transfer_manager import (
|
||||
get_call_transfer_manager,
|
||||
)
|
||||
auth = BasicAuth(app_name, app_password)
|
||||
|
||||
# Get call transfer manager instance
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
|
||||
# 1. Find active transfer context for this caller channel
|
||||
transfer_context = await call_transfer_manager.find_transfer_context_for_call(channel_id)
|
||||
if not transfer_context:
|
||||
logger.error(
|
||||
f"[ARI Transfer] No active transfer context found for caller {channel_id}"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Found transfer context: {transfer_context.transfer_id}, "
|
||||
f"destination: {transfer_context.call_sid}"
|
||||
)
|
||||
|
||||
# 2. Get workflow run to find current bridge and external media channel
|
||||
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
|
||||
workflow_run_id = await redis.get(f"ari:channel:{channel_id}")
|
||||
if not workflow_run_id:
|
||||
logger.error(
|
||||
f"[ARI Transfer] No workflow run found for caller {channel_id}"
|
||||
)
|
||||
return False
|
||||
|
||||
workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id))
|
||||
if not workflow_run or not workflow_run.gathered_context:
|
||||
logger.error(
|
||||
f"[ARI Transfer] No workflow context found for run {workflow_run_id}"
|
||||
)
|
||||
return False
|
||||
|
||||
ctx = workflow_run.gathered_context
|
||||
bridge_id = ctx.get("bridge_id")
|
||||
ext_channel_id = ctx.get("ext_channel_id")
|
||||
|
||||
if not bridge_id or not ext_channel_id:
|
||||
logger.error(
|
||||
f"[ARI Transfer] Missing bridge/external channel info: {ctx}"
|
||||
)
|
||||
return False
|
||||
|
||||
destination_channel_id = transfer_context.call_sid
|
||||
if not destination_channel_id:
|
||||
logger.error(
|
||||
f"[ARI Transfer] No destination channel in transfer context"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Bridge swap: bridge={bridge_id}, caller={channel_id}, "
|
||||
f"destination={destination_channel_id}, ext_media={ext_channel_id}"
|
||||
)
|
||||
|
||||
# 3. Set transfer state to prevent StasisEnd auto-teardown
|
||||
workflow_run.gathered_context["transfer_state"] = "in-progress"
|
||||
await db_client.update_workflow_run(
|
||||
run_id=int(workflow_run_id),
|
||||
gathered_context=workflow_run.gathered_context,
|
||||
)
|
||||
logger.debug(
|
||||
f"[ARI Transfer] Set transfer_state=in-progress for workflow {workflow_run_id}"
|
||||
)
|
||||
|
||||
# 4. Execute bridge swap operations via ARI REST API
|
||||
async with aiohttp.ClientSession() as session:
|
||||
# Add destination channel to existing bridge
|
||||
add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel"
|
||||
async with session.post(
|
||||
add_url, auth=auth, params={"channel": destination_channel_id}
|
||||
) as response:
|
||||
if response.status in (200, 204):
|
||||
logger.info(
|
||||
f"[ARI Transfer] Added destination {destination_channel_id} to bridge {bridge_id}"
|
||||
)
|
||||
else:
|
||||
error_text = await response.text()
|
||||
logger.error(
|
||||
f"[ARI Transfer] Failed to add destination to bridge: {response.status} {error_text}"
|
||||
)
|
||||
return False
|
||||
|
||||
# Remove external media channel from bridge
|
||||
remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel"
|
||||
async with session.post(
|
||||
remove_url, auth=auth, params={"channel": ext_channel_id}
|
||||
) as response:
|
||||
if response.status in (200, 204):
|
||||
logger.info(
|
||||
f"[ARI Transfer] Removed external media {ext_channel_id} from bridge {bridge_id}"
|
||||
)
|
||||
else:
|
||||
error_text = await response.text()
|
||||
logger.error(
|
||||
f"[ARI Transfer] Failed to remove external media from bridge: {response.status} {error_text}"
|
||||
)
|
||||
|
||||
# Hang up the external media channel
|
||||
hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}"
|
||||
async with session.delete(hangup_url, auth=auth) as response:
|
||||
if response.status in (200, 204):
|
||||
logger.info(
|
||||
f"[ARI Transfer] Hung up external media channel {ext_channel_id}"
|
||||
)
|
||||
elif response.status == 404:
|
||||
logger.debug(
|
||||
f"[ARI Transfer] External media channel {ext_channel_id} already gone"
|
||||
)
|
||||
else:
|
||||
error_text = await response.text()
|
||||
logger.warning(
|
||||
f"[ARI Transfer] Failed to hang up external media: {response.status} {error_text}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Bridge swap completed successfully for transfer {transfer_context.transfer_id}, "
|
||||
f"caller {channel_id} connected to destination {destination_channel_id} via bridge {bridge_id}"
|
||||
)
|
||||
|
||||
# 5. Clean up transfer context after successful completion
|
||||
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
await call_transfer_manager.remove_transfer_context(
|
||||
transfer_context.transfer_id
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to execute ARI transfer: {e}")
|
||||
return False
|
||||
|
||||
class ARIHangupStrategy(HangupStrategy):
|
||||
"""Implements hangup for Asterisk ARI channels."""
|
||||
|
||||
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
|
||||
"""Hang up the Asterisk channel via ARI REST API."""
|
||||
try:
|
||||
import aiohttp
|
||||
from aiohttp import BasicAuth
|
||||
|
||||
channel_id = context["channel_id"]
|
||||
ari_endpoint = context["ari_endpoint"]
|
||||
app_name = context["app_name"]
|
||||
app_password = context["app_password"]
|
||||
|
||||
if not channel_id or not ari_endpoint:
|
||||
logger.warning(
|
||||
"Cannot hang up Asterisk channel: missing channel_id or ari_endpoint"
|
||||
)
|
||||
return False
|
||||
|
||||
endpoint = f"{ari_endpoint}/ari/channels/{channel_id}"
|
||||
auth = BasicAuth(app_name, app_password)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.delete(endpoint, auth=auth) as response:
|
||||
if response.status in (200, 204):
|
||||
logger.info(
|
||||
f"Successfully terminated Asterisk channel {channel_id}"
|
||||
)
|
||||
return True
|
||||
elif response.status == 404:
|
||||
logger.debug(
|
||||
f"Asterisk channel {channel_id} was already terminated"
|
||||
)
|
||||
return True
|
||||
else:
|
||||
error_text = await response.text()
|
||||
logger.error(
|
||||
f"Failed to terminate Asterisk channel {channel_id}: "
|
||||
f"Status {response.status}, Response: {error_text}"
|
||||
)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to hang up Asterisk channel: {e}")
|
||||
return False
|
||||
|
|
@ -349,8 +349,8 @@ class ARIProvider(TelephonyProvider):
|
|||
# ======== CALL TRANSFER METHODS ========
|
||||
|
||||
def supports_transfers(self) -> bool:
|
||||
"""ARI does not currently support call transfers."""
|
||||
return False
|
||||
"""ARI supports call transfers via bridge manipulation."""
|
||||
return True
|
||||
|
||||
async def transfer_call(
|
||||
self,
|
||||
|
|
@ -360,8 +360,104 @@ class ARIProvider(TelephonyProvider):
|
|||
timeout: int = 30,
|
||||
**kwargs: Any,
|
||||
) -> Dict[str, Any]:
|
||||
"""ARI call transfers are not yet implemented."""
|
||||
raise NotImplementedError("ARI provider does not support call transfers")
|
||||
"""Initiate ARI call transfer by creating an outbound channel to the destination.
|
||||
|
||||
This method creates the destination channel and returns immediately. The transfer
|
||||
process completes asynchronously - success/failure is determined by ARI events
|
||||
and communicated through the transfer event system.
|
||||
|
||||
Args:
|
||||
destination: Destination phone number (SIP endpoint)
|
||||
transfer_id: Unique identifier for this transfer attempt
|
||||
conference_name: Conference name (unused in ARI, kept for interface compatibility)
|
||||
timeout: Transfer timeout in seconds
|
||||
**kwargs: Additional arguments
|
||||
|
||||
Returns:
|
||||
Dict containing:
|
||||
- call_sid: Destination channel ID
|
||||
- status: "initiated"
|
||||
- provider: "ari"
|
||||
- raw_response: Full ARI channel creation response
|
||||
"""
|
||||
if not self.validate_config():
|
||||
raise ValueError("ARI provider not properly configured")
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Initiating transfer {transfer_id} to {destination} "
|
||||
f"(timeout: {timeout}s)"
|
||||
)
|
||||
|
||||
from api.services.telephony.call_transfer_manager import (
|
||||
get_call_transfer_manager,
|
||||
)
|
||||
|
||||
# Get call transfer manager for event correlation mapping
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
|
||||
# Build SIP endpoint
|
||||
if destination.startswith("SIP/") or destination.startswith("PJSIP/"):
|
||||
sip_endpoint = destination
|
||||
else:
|
||||
sip_endpoint = f"PJSIP/{destination}"
|
||||
|
||||
# Build transfer appArgs for event correlation
|
||||
app_args = f"transfer,{transfer_id}"
|
||||
|
||||
try:
|
||||
endpoint = f"{self.base_url}/channels"
|
||||
params = {
|
||||
"endpoint": sip_endpoint,
|
||||
"app": self.app_name,
|
||||
"appArgs": app_args,
|
||||
"timeout": timeout, # Keep timeout for transfer calls
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
endpoint,
|
||||
params=params,
|
||||
auth=self._get_auth(),
|
||||
) as response:
|
||||
response_text = await response.text()
|
||||
|
||||
if response.status != 200:
|
||||
error_msg = f"ARI channel creation failed: {response.status} {response_text}"
|
||||
logger.error(f"[ARI Transfer] {error_msg}")
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
raise Exception(error_msg)
|
||||
|
||||
result = json.loads(response_text)
|
||||
|
||||
destination_channel_id = result.get("id", "")
|
||||
if not destination_channel_id:
|
||||
logger.error(
|
||||
f"[ARI Transfer] Failed to get channel ID from response: {result}"
|
||||
)
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
raise Exception("Failed to create destination channel")
|
||||
|
||||
# Store transfer channel mapping for event correlation
|
||||
await call_transfer_manager.store_transfer_channel_mapping(
|
||||
destination_channel_id, transfer_id
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Originated destination channel {destination_channel_id} "
|
||||
f"for transfer {transfer_id}"
|
||||
)
|
||||
|
||||
return {
|
||||
"call_sid": destination_channel_id,
|
||||
"status": "initiated",
|
||||
"provider": self.PROVIDER_NAME,
|
||||
"raw_response": result,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI Transfer] Failed to originate call transfer destination channel: {e}")
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
raise
|
||||
|
||||
# ======== ARI-SPECIFIC METHODS ========
|
||||
|
||||
|
|
|
|||
186
api/services/telephony/providers/twilio_call_strategies.py
Normal file
186
api/services/telephony/providers/twilio_call_strategies.py
Normal file
|
|
@ -0,0 +1,186 @@
|
|||
"""Twilio-specific call operation strategies.
|
||||
|
||||
This module contains the business logic for Twilio call operations,
|
||||
maintaining proper separation of concerns between protocol handling and business logic.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
import aiohttp
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
|
||||
|
||||
|
||||
class TwilioConferenceStrategy(TransferStrategy):
|
||||
"""Implements conference-based call transfer for Twilio.
|
||||
|
||||
This strategy transfers calls by placing them into a Twilio conference,
|
||||
with cleanup of transfer contexts upon successful completion.
|
||||
"""
|
||||
|
||||
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
|
||||
"""Execute conference transfer for Twilio call."""
|
||||
try:
|
||||
account_sid = context["account_sid"]
|
||||
auth_token = context["auth_token"]
|
||||
call_sid = context["call_sid"]
|
||||
region = context.get("region")
|
||||
edge = context.get("edge")
|
||||
|
||||
# 1. Find active transfer context for this call
|
||||
transfer_context = await self._find_transfer_context_for_call(call_sid)
|
||||
if not transfer_context:
|
||||
logger.error(
|
||||
f"[Twilio Transfer] No active transfer context found for call {call_sid}"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.info(
|
||||
f"[Twilio Transfer] Found transfer context: {transfer_context.transfer_id}, "
|
||||
f"original: {transfer_context.original_call_sid}"
|
||||
)
|
||||
|
||||
region_prefix = f"{region}." if region else ""
|
||||
edge_prefix = f"{edge}." if edge else ""
|
||||
|
||||
# Twilio API endpoint for updating calls
|
||||
endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
|
||||
|
||||
# Create basic auth from account_sid and auth_token
|
||||
auth = aiohttp.BasicAuth(account_sid, auth_token)
|
||||
|
||||
conference_name = transfer_context.conference_name
|
||||
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
<Dial>
|
||||
<Conference endConferenceOnExit="true">{conference_name}</Conference>
|
||||
</Dial>
|
||||
</Response>"""
|
||||
|
||||
logger.debug(
|
||||
f"[Twilio Transfer] Transferring call to conference: {conference_name}"
|
||||
)
|
||||
|
||||
# 2. Make the POST request to transfer the call
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
endpoint, auth=auth, data={"Twiml": twiml}
|
||||
) as response:
|
||||
response_text = await response.text()
|
||||
|
||||
if response.status == 200:
|
||||
logger.info(
|
||||
f"[Twilio Transfer] Conference transfer completed successfully for call {call_sid}, "
|
||||
f"joined conference {conference_name}"
|
||||
)
|
||||
|
||||
# 3. Clean up transfer context after successful transfer
|
||||
await self._cleanup_transfer_context(transfer_context.transfer_id)
|
||||
return True
|
||||
elif response.status == 404:
|
||||
logger.error(
|
||||
f"Failed to transfer Twilio call {call_sid}: Call not found (404)"
|
||||
)
|
||||
await self._cleanup_transfer_context(transfer_context.transfer_id)
|
||||
return False
|
||||
else:
|
||||
logger.error(
|
||||
f"Failed to transfer Twilio call {call_sid} to conference {conference_name}: "
|
||||
f"Status {response.status}, Response: {response_text}"
|
||||
)
|
||||
await self._cleanup_transfer_context(transfer_context.transfer_id)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to transfer Twilio call: {e}")
|
||||
if transfer_context:
|
||||
await self._cleanup_transfer_context(transfer_context.transfer_id)
|
||||
return False
|
||||
|
||||
async def _find_transfer_context_for_call(self, call_sid: str):
|
||||
"""Find the active transfer context for this call."""
|
||||
try:
|
||||
import redis.asyncio as aioredis
|
||||
|
||||
from api.constants import REDIS_URL
|
||||
from api.services.telephony.transfer_event_protocol import TransferContext
|
||||
|
||||
# Search Redis for transfer contexts where original_call_sid matches
|
||||
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
|
||||
transfer_keys = await redis.keys("transfer:context:*")
|
||||
|
||||
for key in transfer_keys:
|
||||
try:
|
||||
context_data = await redis.get(key)
|
||||
if context_data:
|
||||
context = TransferContext.from_json(context_data)
|
||||
if context.original_call_sid == call_sid:
|
||||
return context
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Twilio Transfer] Error finding transfer context: {e}")
|
||||
return None
|
||||
|
||||
async def _cleanup_transfer_context(self, transfer_id: str):
|
||||
"""Clean up transfer context after completion or failure."""
|
||||
try:
|
||||
from api.services.telephony.call_transfer_manager import (
|
||||
get_call_transfer_manager,
|
||||
)
|
||||
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
except Exception as e:
|
||||
logger.error(f"[Twilio Transfer] Error cleaning up transfer context: {e}")
|
||||
|
||||
|
||||
class TwilioHangupStrategy(HangupStrategy):
|
||||
"""Implements hangup for Twilio calls."""
|
||||
|
||||
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
|
||||
"""Hang up the Twilio call via REST API."""
|
||||
try:
|
||||
account_sid = context["account_sid"]
|
||||
auth_token = context["auth_token"]
|
||||
call_sid = context["call_sid"]
|
||||
region = context.get("region")
|
||||
edge = context.get("edge")
|
||||
|
||||
if not account_sid or not auth_token or not call_sid:
|
||||
logger.warning(
|
||||
"Cannot hang up Twilio call: missing required credentials or call_sid"
|
||||
)
|
||||
return False
|
||||
|
||||
region_prefix = f"{region}." if region else ""
|
||||
edge_prefix = f"{edge}." if edge else ""
|
||||
|
||||
endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
|
||||
auth = aiohttp.BasicAuth(account_sid, auth_token)
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
endpoint, auth=auth, data={"Status": "completed"}
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
logger.info(f"Successfully terminated Twilio call {call_sid}")
|
||||
return True
|
||||
elif response.status == 404:
|
||||
logger.debug(f"Twilio call {call_sid} was already terminated")
|
||||
return True
|
||||
else:
|
||||
response_text = await response.text()
|
||||
logger.error(
|
||||
f"Failed to terminate Twilio call {call_sid}: "
|
||||
f"Status {response.status}, Response: {response_text}"
|
||||
)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to hang up Twilio call: {e}")
|
||||
return False
|
||||
|
|
@ -13,12 +13,8 @@ from typing import Any, Dict, Optional
|
|||
class TransferEventType(str, Enum):
|
||||
"""Types of transfer events sent between instances."""
|
||||
|
||||
TRANSFER_INITIATED = "transfer_initiated"
|
||||
TRANSFER_ANSWERED = "transfer_answered"
|
||||
TRANSFER_COMPLETED = "transfer_completed"
|
||||
DESTINATION_ANSWERED = "destination_answered"
|
||||
TRANSFER_FAILED = "transfer_failed"
|
||||
TRANSFER_CANCELLED = "transfer_cancelled"
|
||||
TRANSFER_TIMEOUT = "transfer_timeout"
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -56,7 +52,6 @@ class TransferEvent:
|
|||
"conference_id": self.conference_name,
|
||||
"transfer_call_sid": self.transfer_call_sid,
|
||||
"original_call_sid": self.original_call_sid,
|
||||
"end_call": self.end_call,
|
||||
"reason": self.reason,
|
||||
}
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -318,7 +318,6 @@ class CustomToolManager:
|
|||
"message": "I'm sorry, but call transfers are not available for web calls. Please try a telephony call.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "webrtc_not_supported",
|
||||
"end_call": True,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
webrtc_error_result, function_call_params, properties
|
||||
|
|
@ -332,27 +331,46 @@ class CustomToolManager:
|
|||
"message": "I'm sorry, but I don't have a phone number configured for the transfer. Please contact support to set up call transfer.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "no_destination",
|
||||
"end_call": True,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
|
||||
# Validate E.164 format
|
||||
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
|
||||
if not re.match(E164_PHONE_REGEX, destination):
|
||||
validation_error_result = {
|
||||
"status": "failed",
|
||||
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "invalid_destination",
|
||||
"end_call": True,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
# Validate destination format based on workflow run mode
|
||||
if workflow_run.mode == WorkflowRunMode.ARI.value:
|
||||
# For ARI provider, also accept SIP endpoints
|
||||
SIP_ENDPOINT_REGEX = r"^(PJSIP|SIP)\/[\w\-\.@]+$"
|
||||
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
|
||||
|
||||
is_valid_sip = re.match(SIP_ENDPOINT_REGEX, destination)
|
||||
is_valid_e164 = re.match(E164_PHONE_REGEX, destination)
|
||||
|
||||
if not (is_valid_sip or is_valid_e164):
|
||||
validation_error_result = {
|
||||
"status": "failed",
|
||||
"message": "I'm sorry, but the transfer destination appears to be invalid. Please contact support to verify the transfer settings.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "invalid_destination",
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
else:
|
||||
# For non-ARI providers (Twilio, etc), use E.164 validation
|
||||
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
|
||||
if not re.match(E164_PHONE_REGEX, destination):
|
||||
validation_error_result = {
|
||||
"status": "failed",
|
||||
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "invalid_destination",
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
|
||||
if message_type == "custom" and custom_message:
|
||||
logger.info(f"Playing pre-transfer message: {custom_message}")
|
||||
|
|
@ -366,14 +384,12 @@ class CustomToolManager:
|
|||
"message": "I'm sorry, there's an issue with this call transfer. Please contact support.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "no_organization_id",
|
||||
"end_call": False,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
|
||||
# Get telephony provider directly (no HTTP round-trip)
|
||||
provider = await get_telephony_provider(organization_id)
|
||||
if not provider.supports_transfers() or not provider.validate_config():
|
||||
validation_error_result = {
|
||||
|
|
@ -381,7 +397,6 @@ class CustomToolManager:
|
|||
"message": "I'm sorry, there's an issue with this call transfer. Please contact support.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "provider_does_not_support_transfer",
|
||||
"end_call": False,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
|
|
@ -396,6 +411,19 @@ class CustomToolManager:
|
|||
# Compute conference name from original call SID
|
||||
conference_name = f"transfer-{original_call_sid}"
|
||||
|
||||
# Store initial transfer context in Redis before provider call to avoid race condition
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
transfer_context = TransferContext(
|
||||
transfer_id=transfer_id,
|
||||
call_sid=None, # Will be updated after provider response
|
||||
target_number=destination,
|
||||
tool_uuid=tool.tool_uuid,
|
||||
original_call_sid=original_call_sid,
|
||||
conference_name=conference_name,
|
||||
initiated_at=time.time(),
|
||||
)
|
||||
await call_transfer_manager.store_transfer_context(transfer_context)
|
||||
|
||||
# Mute the pipeline
|
||||
self._engine.set_mute_pipeline(True)
|
||||
|
||||
|
|
@ -410,21 +438,8 @@ class CustomToolManager:
|
|||
call_sid = transfer_result.get("call_sid")
|
||||
logger.info(f"Transfer call initiated successfully: {call_sid}")
|
||||
|
||||
# TODO: Possible race here between saving the transfer context
|
||||
# and getting a callback response from Twilio? Should we store_transfer_context
|
||||
# before sending request to Twilio and update the transfer context afterwards?
|
||||
|
||||
# Store transfer context in Redis
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
transfer_context = TransferContext(
|
||||
transfer_id=transfer_id,
|
||||
call_sid=call_sid,
|
||||
target_number=destination,
|
||||
tool_uuid=tool.tool_uuid,
|
||||
original_call_sid=original_call_sid,
|
||||
conference_name=conference_name,
|
||||
initiated_at=time.time(),
|
||||
)
|
||||
# Update transfer context with actual call_sid from provider response
|
||||
transfer_context.call_sid = call_sid
|
||||
await call_transfer_manager.store_transfer_context(transfer_context)
|
||||
|
||||
# Wait for status callback completion using Redis pub/sub
|
||||
|
|
@ -466,15 +481,15 @@ class CustomToolManager:
|
|||
transfer_event = None
|
||||
|
||||
finally:
|
||||
# Single cleanup point: stop hold music, unmute pipeline, remove context
|
||||
# Cleanup hold music and pipeline state
|
||||
# Transfer context cleanup is handled by respective transfer call strategies
|
||||
logger.info(
|
||||
"Transfer wait ended, cleaning up hold music, pipeline state, and transfer context"
|
||||
"Transfer wait ended, cleaning up hold music and pipeline state"
|
||||
)
|
||||
hold_music_stop_event.set()
|
||||
if hold_music_task:
|
||||
await hold_music_task
|
||||
self._engine.set_mute_pipeline(False)
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
|
||||
# Handle result (after cleanup)
|
||||
if transfer_event:
|
||||
|
|
@ -491,7 +506,6 @@ class CustomToolManager:
|
|||
"message": "I'm sorry, but the call is taking longer than expected to connect. The person might not be available right now. Please try calling back later.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "timeout",
|
||||
"end_call": True,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
timeout_result, function_call_params, properties
|
||||
|
|
@ -509,7 +523,6 @@ class CustomToolManager:
|
|||
"message": "I'm sorry, but something went wrong while trying to transfer your call. Please try again later or contact support if the problem persists.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "execution_error",
|
||||
"end_call": True,
|
||||
}
|
||||
|
||||
await self._handle_transfer_result(
|
||||
|
|
@ -521,41 +534,52 @@ class CustomToolManager:
|
|||
async def _handle_transfer_result(
|
||||
self, result: dict, function_call_params, properties
|
||||
):
|
||||
"""Handle different transfer call outcomes and take appropriate action."""
|
||||
"""Handle transfer call outcomes from any telephony provider (Twilio, ARI, etc).
|
||||
|
||||
This method is provider-agnostic and processes standardized result dictionaries
|
||||
from transfer completion events, validation failures, timeouts, and errors.
|
||||
|
||||
Args:
|
||||
result: Standardized result dict with keys: action, status, reason, message
|
||||
function_call_params: LLM function call parameters for response callback
|
||||
properties: Function call result properties (e.g., run_llm setting)
|
||||
"""
|
||||
action = result.get("action", "")
|
||||
status = result.get("status", "")
|
||||
|
||||
logger.info(f"Handling transfer result: action={action}, status={status}")
|
||||
|
||||
if action == "transfer_success":
|
||||
# Successful transfer - add original caller to conference and end pipeline
|
||||
if action == "destination_answered":
|
||||
# Transfer destination answered - proceeding with bridge swap/conference join
|
||||
conference_id = result.get("conference_id")
|
||||
original_call_sid = result.get("original_call_sid")
|
||||
transfer_call_sid = result.get("transfer_call_sid")
|
||||
|
||||
logger.info(
|
||||
f"Transfer successful! Conference: {conference_id}, Original: {original_call_sid}, Transfer: {transfer_call_sid}"
|
||||
f"Transfer destination answered! Conference/Bridge: {conference_id}, "
|
||||
f"Original: {original_call_sid}, Transfer: {transfer_call_sid}"
|
||||
)
|
||||
|
||||
# Inform LLM of success and end the call with Transfer call reason
|
||||
# Inform LLM of success and end the call (no further LLM processing needed)
|
||||
response_properties = FunctionCallResultProperties(run_llm=False)
|
||||
await function_call_params.result_callback(
|
||||
{
|
||||
"status": "transfer_success",
|
||||
"message": "Transfer successful - connecting to conference",
|
||||
"message": "Transfer destination answered - connecting calls",
|
||||
"conference_id": conference_id,
|
||||
},
|
||||
properties=response_properties,
|
||||
)
|
||||
|
||||
# End pipeline - providers complete bridge swap/conference join as final transfer leg
|
||||
await self._engine.end_call_with_reason(
|
||||
EndTaskReason.TRANSFER_CALL.value, abort_immediately=False
|
||||
)
|
||||
|
||||
elif action == "transfer_failed":
|
||||
# Transfer failed - inform user via LLM and then end the call
|
||||
# Transfer failed - let LLM inform user with error details
|
||||
reason = result.get("reason", "unknown")
|
||||
logger.info(f"Transfer failed ({reason}), informing user")
|
||||
logger.info(f"Transfer failed ({reason}), informing user via LLM")
|
||||
|
||||
await function_call_params.result_callback(
|
||||
{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue