diff --git a/api/routes/telephony.py b/api/routes/telephony.py
index cc7ce55..5c57cc2 100644
--- a/api/routes/telephony.py
+++ b/api/routes/telephony.py
@@ -1670,7 +1670,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
result = {
"status": "success",
"message": "Great! The destination number answered. Let me transfer you now.",
- "action": "transfer_success",
+ "action": "destination_answered",
"conference_id": conference_name,
"transfer_call_sid": call_sid, # The outbound transfer call SID
"original_call_sid": original_call_sid, # The original caller's SID
@@ -1714,9 +1714,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
try:
# Determine event type based on result status
if result["status"] == "success":
- event_type = TransferEventType.TRANSFER_COMPLETED
- elif result.get("reason") == "timeout":
- event_type = TransferEventType.TRANSFER_TIMEOUT
+ event_type = TransferEventType.DESTINATION_ANSWERED
else:
event_type = TransferEventType.TRANSFER_FAILED
@@ -1730,7 +1728,6 @@ async def complete_transfer_function_call(transfer_id: str, request: Request):
status=result["status"],
action=result.get("action", ""),
reason=result.get("reason"),
- end_call=result.get("end_call", False),
)
# Publish the event via Redis
diff --git a/api/routes/tool.py b/api/routes/tool.py
index d54589e..ba8e708 100644
--- a/api/routes/tool.py
+++ b/api/routes/tool.py
@@ -71,7 +71,7 @@ class TransferCallConfig(BaseModel):
"""Configuration for Transfer Call tools."""
destination: str = Field(
- description="Phone number to transfer the call to (E.164 format, e.g., +1234567890)"
+ description="Phone number or SIP endpoint to transfer the call to (E.164 format e.g., +1234567890, or SIP endpoint e.g., PJSIP/1234)"
)
messageType: Literal["none", "custom"] = Field(
default="none", description="Type of message to play before transfer"
@@ -89,16 +89,23 @@ class TransferCallConfig(BaseModel):
@field_validator("destination")
@classmethod
def validate_destination(cls, v: str) -> str:
- """Validate that destination is a valid E.164 phone number."""
+ """Validate that destination is a valid E.164 phone number or SIP endpoint."""
# Allow empty string for initial creation (like HTTP API tools with empty URL)
if not v.strip():
return v
# E.164 format: +[1-9]\d{1,14}
e164_pattern = r"^\+[1-9]\d{1,14}$"
- if not re.match(e164_pattern, v):
+
+ # SIP endpoint format: PJSIP/extension or SIP/extension
+ sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$"
+
+ is_valid_e164 = re.match(e164_pattern, v)
+ is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE)
+
+ if not (is_valid_e164 or is_valid_sip):
raise ValueError(
- "Destination must be a valid E.164 phone number (e.g., +1234567890)"
+ "Destination must be a valid E.164 phone number (e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)"
)
return v
diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py
index db18380..c7b56ff 100644
--- a/api/services/pipecat/transport_setup.py
+++ b/api/services/pipecat/transport_setup.py
@@ -6,6 +6,14 @@ from api.constants import APP_ROOT_DIR
from api.db import db_client
from api.enums import OrganizationConfigurationKey
from api.services.pipecat.audio_config import AudioConfig
+from api.services.telephony.providers.ari_call_strategies import (
+ ARIBridgeSwapStrategy,
+ ARIHangupStrategy,
+)
+from api.services.telephony.providers.twilio_call_strategies import (
+ TwilioConferenceStrategy,
+ TwilioHangupStrategy,
+)
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.serializers.asterisk import AsteriskFrameSerializer
@@ -54,12 +62,17 @@ async def create_twilio_transport(
raise ValueError(
f"Incomplete Twilio configuration for organization {organization_id}"
)
+ # Create strategy instances
+ transfer_strategy = TwilioConferenceStrategy()
+ hangup_strategy = TwilioHangupStrategy()
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=account_sid,
auth_token=auth_token,
+ transfer_strategy=transfer_strategy,
+ hangup_strategy=hangup_strategy,
)
return FastAPIWebsocketTransport(
@@ -178,12 +191,17 @@ async def create_ari_transport(
f"Incomplete ARI configuration for organization {organization_id}. "
f"Required: ari_endpoint, app_name, app_password"
)
+ # Create strategy instances
+ transfer_strategy = ARIBridgeSwapStrategy()
+ hangup_strategy = ARIHangupStrategy()
serializer = AsteriskFrameSerializer(
channel_id=channel_id,
ari_endpoint=ari_endpoint,
app_name=app_name,
app_password=app_password,
+ transfer_strategy=transfer_strategy,
+ hangup_strategy=hangup_strategy,
params=AsteriskFrameSerializer.InputParams(
asterisk_sample_rate=audio_config.transport_in_sample_rate,
sample_rate=audio_config.pipeline_sample_rate,
diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py
index ea8e414..98dacfc 100644
--- a/api/services/telephony/ari_manager.py
+++ b/api/services/telephony/ari_manager.py
@@ -14,6 +14,7 @@ setup_logging()
import asyncio
import json
import signal
+import time
from typing import Dict, Optional, Set
from urllib.parse import urlparse
@@ -26,6 +27,11 @@ from api.constants import REDIS_URL
from api.db import db_client
from api.enums import CallType, OrganizationConfigurationKey, WorkflowRunMode
from api.services.quota_service import check_dograh_quota_by_user_id
+from api.services.telephony.call_transfer_manager import get_call_transfer_manager
+from api.services.telephony.transfer_event_protocol import (
+ TransferEvent,
+ TransferEventType,
+)
# Redis key pattern and TTL for channel-to-run mapping
_CHANNEL_KEY_PREFIX = "ari:channel:"
@@ -62,6 +68,9 @@ class ARIConnection:
# Redis client for channel-to-run reverse mapping (lazy init)
self._redis_client: Optional[aioredis.Redis] = None
+ # Transfer manager for handling call transfers
+ self._call_transfer_manager = None
+
async def _get_redis(self) -> aioredis.Redis:
"""Get Redis client instance (lazy init)."""
if not self._redis_client:
@@ -70,6 +79,12 @@ class ARIConnection:
)
return self._redis_client
+ async def _get_transfer_manager(self):
+ """Get transfer manager instance."""
+ if not self._call_transfer_manager:
+ self._call_transfer_manager = await get_call_transfer_manager()
+ return self._call_transfer_manager
+
async def _set_channel_run(self, channel_id: str, workflow_run_id: str):
"""Store channel_id -> workflow_run_id mapping in Redis."""
r = await self._get_redis()
@@ -229,6 +244,11 @@ class ARIConnection:
channel_id = channel.get("id", "unknown")
channel_state = channel.get("state", "unknown")
+ # Log all events for each channel
+ logger.debug(
+ f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}"
+ )
+
if event_type == "StasisStart":
# Skip external media channels we created — they fire
# their own StasisStart but need no further handling.
@@ -255,6 +275,19 @@ class ARIConnection:
)
else:
# Outbound call (state == "Up") — originated by us
+ # Check if this is a transfer destination channel (app_args starts with "transfer")
+ # Transfer destinations run externally - we only track status to publish transfer event, not run the pipeline
+ transfer_id = self._get_transfer_id(app_args)
+ if transfer_id:
+ logger.info(
+ f"[ARI org={self.organization_id}] Transfer destination answered: "
+ f"channel={channel_id}, transfer_id={transfer_id}"
+ )
+ asyncio.create_task(
+ self._handle_destination_answered(transfer_id, channel_id)
+ )
+ return
+
# Parse args to extract workflow context
args_dict = {}
for arg in app_args:
@@ -298,13 +331,26 @@ class ARIConnection:
)
elif event_type == "ChannelDestroyed":
- cause = channel.get("cause", 0)
- cause_txt = channel.get("cause_txt", "unknown")
+ cause = event.get("cause", 0)
+ cause_txt = event.get("cause_txt", "unknown")
+ tech_cause = event.get("tech_cause", "unknown")
logger.info(
f"[ARI org={self.organization_id}] ChannelDestroyed: "
- f"channel={channel_id}, cause={cause} ({cause_txt})"
+ f"channel={channel_id}, cause={cause} ({cause_txt}), tech_cause = {tech_cause}"
)
+ # Check if this is a transfer destination that failed
+ transfer_id = await self._get_transfer_id_for_channel(channel_id)
+ if transfer_id:
+ failure_message = self._map_hangup_cause_to_message(
+ cause, tech_cause, cause_txt
+ )
+ asyncio.create_task(
+ self._handle_transfer_failed(
+ transfer_id, channel_id, failure_message
+ )
+ )
+
elif event_type == "ChannelDtmfReceived":
digit = event.get("digit", "")
logger.debug(
@@ -580,7 +626,38 @@ class ARIConnection:
call_id = ctx.get("call_id")
ext_channel_id = ctx.get("ext_channel_id")
bridge_id = ctx.get("bridge_id")
+ transfer_state = ctx.get("transfer_state")
+ # Check if this is a call transfer scenario external channel. Skip full teardown if
+ # transfer is in progress and this is the external media channel
+ # During call transfer, we preserve the caller-destination bridge
+ if (
+ transfer_state == "in-progress"
+ and channel_id == ext_channel_id
+ and ext_channel_id is not None
+ ):
+ logger.info(
+ f"[ARI org={self.organization_id}] Transfer in progress - skipping full teardown "
+ f"for external channel {channel_id}, preserving bridge {bridge_id} and caller {call_id}"
+ )
+
+ # Update transfer state to complete
+ ctx["transfer_state"] = "complete"
+ await db_client.update_workflow_run(
+ run_id=int(workflow_run_id), gathered_context=ctx
+ )
+
+ # Clean up only Redis markers for external channel
+ await self._delete_channel_run(channel_id)
+ await self._delete_ext_channel(channel_id)
+
+ logger.info(
+ f"[ARI org={self.organization_id}] Transfer cleanup complete - preserved caller {call_id} "
+ f"in bridge {bridge_id}"
+ )
+ return
+
+ # Normal full teardown for non-transfer scenarios (transfer_state is None or not in-progress)
# Delete the bridge first (removes channels from it)
if bridge_id:
await self._delete_bridge(bridge_id)
@@ -633,6 +710,124 @@ class ARIConnection:
f"{response.status} {text}"
)
+ # ======== CALL TRANSFER HELPER METHODS ========
+
+ def _map_hangup_cause_to_message(
+ self, cause: int, tech_cause: str, cause_txt: str
+ ) -> str:
+ """Map Asterisk cause codes to user-friendly transfer failure messages."""
+ if cause == 17 and tech_cause == "486": # User busy/declined
+ return "The person declined the call or their line is busy."
+ elif cause == 19 and tech_cause == "480": # No answer
+ return "The transfer call was not answered. The person may be busy or unavailable right now."
+ elif cause == 21: # Call rejected
+ return "The transfer call failed to connect. There may be a network issue or the number is unavailable."
+ else:
+ return f"Transfer failed: {cause_txt}"
+
+ def _get_transfer_id(self, app_args: list) -> Optional[str]:
+ """Get transfer_id if this is a transfer channel, None otherwise.
+
+ Args format: ['transfer', '{transfer_id}', '{conf_name}']
+ """
+ if len(app_args) > 1 and app_args[0] == "transfer":
+ transfer_id = app_args[1]
+ logger.debug(
+ f"[ARI org={self.organization_id}] Detected transfer channel with transfer_id: {transfer_id}"
+ )
+ return transfer_id
+ return None
+
+ async def _get_transfer_id_for_channel(self, channel_id: str) -> Optional[str]:
+ """Get transfer_id for a channel by checking Redis mapping."""
+ try:
+ r = await self._get_redis()
+ transfer_id = await r.get(f"ari:transfer_channel:{channel_id}")
+ logger.debug(
+ f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}"
+ )
+ return transfer_id
+ except Exception as e:
+ logger.error(
+ f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}"
+ )
+ return None
+
+ async def _handle_destination_answered(
+ self, transfer_id: str, destination_channel_id: str
+ ):
+ """Handle transfer destination channel answered - publish success event."""
+ try:
+ logger.info(
+ f"[ARI Transfer org={self.organization_id}] Destination {destination_channel_id} "
+ f"answered for transfer {transfer_id}"
+ )
+
+ # Store channel mapping for potential future events and get transfer context
+ transfer_manager = await self._get_transfer_manager()
+ await transfer_manager.store_transfer_channel_mapping(
+ destination_channel_id, transfer_id
+ )
+ context = await transfer_manager.get_transfer_context(transfer_id)
+ if not context:
+ logger.error(
+ f"[ARI Transfer org={self.organization_id}] No transfer context found for {transfer_id}"
+ )
+ return
+
+ logger.info(
+ f"[ARI Transfer org={self.organization_id}] Transfer {transfer_id} success: "
+ f"caller={context.original_call_sid} -> destination={destination_channel_id}"
+ )
+
+ # Publish destination answered event - this will trigger the bridge swap in serializer
+ success_event = TransferEvent(
+ type=TransferEventType.DESTINATION_ANSWERED,
+ transfer_id=transfer_id,
+ original_call_sid=context.original_call_sid,
+ transfer_call_sid=destination_channel_id,
+ conference_name=context.conference_name,
+ message="Transfer destination answered",
+ status="success",
+ action="destination_answered",
+ )
+ await transfer_manager.publish_transfer_event(success_event)
+
+ except Exception as e:
+ logger.error(
+ f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}"
+ )
+ # On error, publish failure event
+ await self._handle_transfer_failed(
+ transfer_id, destination_channel_id, f"Transfer processing error: {e}"
+ )
+
+ async def _handle_transfer_failed(
+ self, transfer_id: str, channel_id: str, reason: str
+ ):
+ """Handle transfer failure - publish failure event."""
+ try:
+ logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}")
+
+ transfer_manager = await self._get_transfer_manager()
+ context = await transfer_manager.get_transfer_context(transfer_id)
+
+ # Publish failure event
+ failure_event = TransferEvent(
+ type=TransferEventType.TRANSFER_FAILED,
+ transfer_id=transfer_id,
+ original_call_sid=context.original_call_sid if context else "",
+ transfer_call_sid=channel_id,
+ message=f"Transfer failed: {reason}",
+ status="failed",
+ action="transfer_failed",
+ reason=reason,
+ )
+ await transfer_manager.publish_transfer_event(failure_event)
+
+ except Exception as e:
+ logger.error(f"[ARI Transfer] Error handling transfer failure: {e}")
+
async def _delete_channel(self, channel_id: str):
"""Delete (hang up) an ARI channel. Ignores 404 (already gone)."""
diff --git a/api/services/telephony/call_transfer_manager.py b/api/services/telephony/call_transfer_manager.py
index f843850..a6f0f8c 100644
--- a/api/services/telephony/call_transfer_manager.py
+++ b/api/services/telephony/call_transfer_manager.py
@@ -85,6 +85,28 @@ class CallTransferManager:
except Exception as e:
logger.error(f"Failed to remove transfer context: {e}")
+ async def store_transfer_channel_mapping(
+ self, channel_id: str, transfer_id: str
+ ) -> None:
+ """Store channel->transfer mapping in Redis for event correlation.
+
+ Args:
+ channel_id: ARI channel ID
+ transfer_id: Transfer identifier
+ """
+ try:
+ redis = await self._get_redis()
+ await redis.setex(
+ f"ari:transfer_channel:{channel_id}", 300, transfer_id
+ ) # 5 minute TTL
+ logger.debug(
+ f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}"
+ )
+ except Exception as e:
+ logger.error(
+ f"[Transfer Manager] Error storing transfer channel mapping: {e}"
+ )
+
async def publish_transfer_event(self, event: TransferEvent) -> None:
"""Publish transfer event to Redis channel.
@@ -136,16 +158,10 @@ class CallTransferManager:
)
# Check if this is a completion event
- if (
- event.type
- in [
- TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful
- TransferEventType.TRANSFER_COMPLETED,
- TransferEventType.TRANSFER_FAILED,
- TransferEventType.TRANSFER_CANCELLED,
- TransferEventType.TRANSFER_TIMEOUT,
- ]
- ):
+ if event.type in [
+ TransferEventType.DESTINATION_ANSWERED,
+ TransferEventType.TRANSFER_FAILED,
+ ]:
return event
except Exception as e:
logger.error(f"Failed to parse transfer event: {e}")
@@ -169,6 +185,31 @@ class CallTransferManager:
except Exception as e:
logger.error(f"Error closing pubsub connection: {e}")
+ async def find_transfer_context_for_call(self, caller_channel_id: str):
+ """Find the active transfer context for this caller channel."""
+
+ redis = await self._get_redis()
+
+ try:
+ # Search Redis for transfer contexts where original_call_sid matches this caller
+ transfer_keys = await redis.keys("transfer:context:*")
+
+ for key in transfer_keys:
+ try:
+ context_data = await redis.get(key)
+ if context_data:
+ context = TransferContext.from_json(context_data)
+ if context.original_call_sid == caller_channel_id:
+ return context
+ except Exception:
+ continue
+
+ return None
+
+ except Exception as e:
+ logger.error(f"[ARI Transfer] Error finding transfer context: {e}")
+ return None
+
async def cleanup(self):
"""Clean up Redis connections."""
try:
diff --git a/api/services/telephony/providers/ari_call_strategies.py b/api/services/telephony/providers/ari_call_strategies.py
new file mode 100644
index 0000000..5225a29
--- /dev/null
+++ b/api/services/telephony/providers/ari_call_strategies.py
@@ -0,0 +1,226 @@
+"""ARI-specific call operation strategies.
+
+This module contains the business logic for Asterisk ARI call operations.
+"""
+
+from typing import Any, Dict
+
+from loguru import logger
+
+from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
+
+
+class ARIBridgeSwapStrategy(TransferStrategy):
+ """Implements bridge swap transfer for Asterisk ARI.
+
+ This strategy handles transferring calls by swapping channels in existing
+ bridges, managing transfer contexts, and publishing
+ transfer completion events.
+ """
+
+ async def execute_transfer(self, context: Dict[str, Any]) -> bool:
+ """Execute bridge swap transfer for Asterisk ARI."""
+ try:
+ import aiohttp
+ import redis.asyncio as aioredis
+ from aiohttp import BasicAuth
+
+ channel_id = context["channel_id"]
+ ari_endpoint = context["ari_endpoint"]
+ app_name = context["app_name"]
+ app_password = context["app_password"]
+
+ if not channel_id or not ari_endpoint:
+ logger.warning(
+ "Cannot execute transfer: missing channel_id or ari_endpoint"
+ )
+ return False
+
+ logger.info(
+ f"[ARI Transfer] Executing bridge swap for channel {channel_id}"
+ )
+
+ from api.constants import REDIS_URL
+ from api.db import db_client
+ from api.services.telephony.call_transfer_manager import (
+ get_call_transfer_manager,
+ )
+ auth = BasicAuth(app_name, app_password)
+
+ # Get call transfer manager instance
+ call_transfer_manager = await get_call_transfer_manager()
+
+ # 1. Find active transfer context for this caller channel
+ transfer_context = await call_transfer_manager.find_transfer_context_for_call(channel_id)
+ if not transfer_context:
+ logger.error(
+ f"[ARI Transfer] No active transfer context found for caller {channel_id}"
+ )
+ return False
+
+ logger.info(
+ f"[ARI Transfer] Found transfer context: {transfer_context.transfer_id}, "
+ f"destination: {transfer_context.call_sid}"
+ )
+
+ # 2. Get workflow run to find current bridge and external media channel
+ redis = aioredis.from_url(REDIS_URL, decode_responses=True)
+ workflow_run_id = await redis.get(f"ari:channel:{channel_id}")
+ if not workflow_run_id:
+ logger.error(
+ f"[ARI Transfer] No workflow run found for caller {channel_id}"
+ )
+ return False
+
+ workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id))
+ if not workflow_run or not workflow_run.gathered_context:
+ logger.error(
+ f"[ARI Transfer] No workflow context found for run {workflow_run_id}"
+ )
+ return False
+
+ ctx = workflow_run.gathered_context
+ bridge_id = ctx.get("bridge_id")
+ ext_channel_id = ctx.get("ext_channel_id")
+
+ if not bridge_id or not ext_channel_id:
+ logger.error(
+ f"[ARI Transfer] Missing bridge/external channel info: {ctx}"
+ )
+ return False
+
+ destination_channel_id = transfer_context.call_sid
+ if not destination_channel_id:
+ logger.error(
+ f"[ARI Transfer] No destination channel in transfer context"
+ )
+ return False
+
+ logger.info(
+ f"[ARI Transfer] Bridge swap: bridge={bridge_id}, caller={channel_id}, "
+ f"destination={destination_channel_id}, ext_media={ext_channel_id}"
+ )
+
+ # 3. Set transfer state to prevent StasisEnd auto-teardown
+ workflow_run.gathered_context["transfer_state"] = "in-progress"
+ await db_client.update_workflow_run(
+ run_id=int(workflow_run_id),
+ gathered_context=workflow_run.gathered_context,
+ )
+ logger.debug(
+ f"[ARI Transfer] Set transfer_state=in-progress for workflow {workflow_run_id}"
+ )
+
+ # 4. Execute bridge swap operations via ARI REST API
+ async with aiohttp.ClientSession() as session:
+ # Add destination channel to existing bridge
+ add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel"
+ async with session.post(
+ add_url, auth=auth, params={"channel": destination_channel_id}
+ ) as response:
+ if response.status in (200, 204):
+ logger.info(
+ f"[ARI Transfer] Added destination {destination_channel_id} to bridge {bridge_id}"
+ )
+ else:
+ error_text = await response.text()
+ logger.error(
+ f"[ARI Transfer] Failed to add destination to bridge: {response.status} {error_text}"
+ )
+ return False
+
+ # Remove external media channel from bridge
+ remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel"
+ async with session.post(
+ remove_url, auth=auth, params={"channel": ext_channel_id}
+ ) as response:
+ if response.status in (200, 204):
+ logger.info(
+ f"[ARI Transfer] Removed external media {ext_channel_id} from bridge {bridge_id}"
+ )
+ else:
+ error_text = await response.text()
+ logger.error(
+ f"[ARI Transfer] Failed to remove external media from bridge: {response.status} {error_text}"
+ )
+
+ # Hang up the external media channel
+ hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}"
+ async with session.delete(hangup_url, auth=auth) as response:
+ if response.status in (200, 204):
+ logger.info(
+ f"[ARI Transfer] Hung up external media channel {ext_channel_id}"
+ )
+ elif response.status == 404:
+ logger.debug(
+ f"[ARI Transfer] External media channel {ext_channel_id} already gone"
+ )
+ else:
+ error_text = await response.text()
+ logger.warning(
+ f"[ARI Transfer] Failed to hang up external media: {response.status} {error_text}"
+ )
+
+ logger.info(
+ f"[ARI Transfer] Bridge swap completed successfully for transfer {transfer_context.transfer_id}, "
+ f"caller {channel_id} connected to destination {destination_channel_id} via bridge {bridge_id}"
+ )
+
+ # 5. Clean up transfer context after successful completion
+
+ call_transfer_manager = await get_call_transfer_manager()
+ await call_transfer_manager.remove_transfer_context(
+ transfer_context.transfer_id
+ )
+ return True
+
+ except Exception as e:
+ logger.exception(f"Failed to execute ARI transfer: {e}")
+ return False
+
+class ARIHangupStrategy(HangupStrategy):
+ """Implements hangup for Asterisk ARI channels."""
+
+ async def execute_hangup(self, context: Dict[str, Any]) -> bool:
+ """Hang up the Asterisk channel via ARI REST API."""
+ try:
+ import aiohttp
+ from aiohttp import BasicAuth
+
+ channel_id = context["channel_id"]
+ ari_endpoint = context["ari_endpoint"]
+ app_name = context["app_name"]
+ app_password = context["app_password"]
+
+ if not channel_id or not ari_endpoint:
+ logger.warning(
+ "Cannot hang up Asterisk channel: missing channel_id or ari_endpoint"
+ )
+ return False
+
+ endpoint = f"{ari_endpoint}/ari/channels/{channel_id}"
+ auth = BasicAuth(app_name, app_password)
+
+ async with aiohttp.ClientSession() as session:
+ async with session.delete(endpoint, auth=auth) as response:
+ if response.status in (200, 204):
+ logger.info(
+ f"Successfully terminated Asterisk channel {channel_id}"
+ )
+ return True
+ elif response.status == 404:
+ logger.debug(
+ f"Asterisk channel {channel_id} was already terminated"
+ )
+ return True
+ else:
+ error_text = await response.text()
+ logger.error(
+ f"Failed to terminate Asterisk channel {channel_id}: "
+ f"Status {response.status}, Response: {error_text}"
+ )
+ return False
+
+ except Exception as e:
+ logger.exception(f"Failed to hang up Asterisk channel: {e}")
+ return False
\ No newline at end of file
diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py
index 139065a..7d7fce1 100644
--- a/api/services/telephony/providers/ari_provider.py
+++ b/api/services/telephony/providers/ari_provider.py
@@ -349,8 +349,8 @@ class ARIProvider(TelephonyProvider):
# ======== CALL TRANSFER METHODS ========
def supports_transfers(self) -> bool:
- """ARI does not currently support call transfers."""
- return False
+ """ARI supports call transfers via bridge manipulation."""
+ return True
async def transfer_call(
self,
@@ -360,8 +360,104 @@ class ARIProvider(TelephonyProvider):
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
- """ARI call transfers are not yet implemented."""
- raise NotImplementedError("ARI provider does not support call transfers")
+ """Initiate ARI call transfer by creating an outbound channel to the destination.
+
+ This method creates the destination channel and returns immediately. The transfer
+ process completes asynchronously - success/failure is determined by ARI events
+ and communicated through the transfer event system.
+
+ Args:
+ destination: Destination phone number (SIP endpoint)
+ transfer_id: Unique identifier for this transfer attempt
+ conference_name: Conference name (unused in ARI, kept for interface compatibility)
+ timeout: Transfer timeout in seconds
+ **kwargs: Additional arguments
+
+ Returns:
+ Dict containing:
+ - call_sid: Destination channel ID
+ - status: "initiated"
+ - provider: "ari"
+ - raw_response: Full ARI channel creation response
+ """
+ if not self.validate_config():
+ raise ValueError("ARI provider not properly configured")
+
+ logger.info(
+ f"[ARI Transfer] Initiating transfer {transfer_id} to {destination} "
+ f"(timeout: {timeout}s)"
+ )
+
+ from api.services.telephony.call_transfer_manager import (
+ get_call_transfer_manager,
+ )
+
+ # Get call transfer manager for event correlation mapping
+ call_transfer_manager = await get_call_transfer_manager()
+
+ # Build SIP endpoint
+ if destination.startswith("SIP/") or destination.startswith("PJSIP/"):
+ sip_endpoint = destination
+ else:
+ sip_endpoint = f"PJSIP/{destination}"
+
+ # Build transfer appArgs for event correlation
+ app_args = f"transfer,{transfer_id}"
+
+ try:
+ endpoint = f"{self.base_url}/channels"
+ params = {
+ "endpoint": sip_endpoint,
+ "app": self.app_name,
+ "appArgs": app_args,
+ "timeout": timeout, # Keep timeout for transfer calls
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ endpoint,
+ params=params,
+ auth=self._get_auth(),
+ ) as response:
+ response_text = await response.text()
+
+ if response.status != 200:
+ error_msg = f"ARI channel creation failed: {response.status} {response_text}"
+ logger.error(f"[ARI Transfer] {error_msg}")
+ await call_transfer_manager.remove_transfer_context(transfer_id)
+ raise Exception(error_msg)
+
+ result = json.loads(response_text)
+
+ destination_channel_id = result.get("id", "")
+ if not destination_channel_id:
+ logger.error(
+ f"[ARI Transfer] Failed to get channel ID from response: {result}"
+ )
+ await call_transfer_manager.remove_transfer_context(transfer_id)
+ raise Exception("Failed to create destination channel")
+
+ # Store transfer channel mapping for event correlation
+ await call_transfer_manager.store_transfer_channel_mapping(
+ destination_channel_id, transfer_id
+ )
+
+ logger.info(
+ f"[ARI Transfer] Originated destination channel {destination_channel_id} "
+ f"for transfer {transfer_id}"
+ )
+
+ return {
+ "call_sid": destination_channel_id,
+ "status": "initiated",
+ "provider": self.PROVIDER_NAME,
+ "raw_response": result,
+ }
+
+ except Exception as e:
+ logger.error(f"[ARI Transfer] Failed to originate call transfer destination channel: {e}")
+ await call_transfer_manager.remove_transfer_context(transfer_id)
+ raise
# ======== ARI-SPECIFIC METHODS ========
diff --git a/api/services/telephony/providers/twilio_call_strategies.py b/api/services/telephony/providers/twilio_call_strategies.py
new file mode 100644
index 0000000..d48cff3
--- /dev/null
+++ b/api/services/telephony/providers/twilio_call_strategies.py
@@ -0,0 +1,186 @@
+"""Twilio-specific call operation strategies.
+
+This module contains the business logic for Twilio call operations,
+maintaining proper separation of concerns between protocol handling and business logic.
+"""
+
+from typing import Any, Dict
+
+import aiohttp
+from loguru import logger
+
+from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
+
+
+class TwilioConferenceStrategy(TransferStrategy):
+ """Implements conference-based call transfer for Twilio.
+
+ This strategy transfers calls by placing them into a Twilio conference,
+ with cleanup of transfer contexts upon successful completion.
+ """
+
+ async def execute_transfer(self, context: Dict[str, Any]) -> bool:
+ """Execute conference transfer for Twilio call."""
+ try:
+ account_sid = context["account_sid"]
+ auth_token = context["auth_token"]
+ call_sid = context["call_sid"]
+ region = context.get("region")
+ edge = context.get("edge")
+
+ # 1. Find active transfer context for this call
+ transfer_context = await self._find_transfer_context_for_call(call_sid)
+ if not transfer_context:
+ logger.error(
+ f"[Twilio Transfer] No active transfer context found for call {call_sid}"
+ )
+ return False
+
+ logger.info(
+ f"[Twilio Transfer] Found transfer context: {transfer_context.transfer_id}, "
+ f"original: {transfer_context.original_call_sid}"
+ )
+
+ region_prefix = f"{region}." if region else ""
+ edge_prefix = f"{edge}." if edge else ""
+
+ # Twilio API endpoint for updating calls
+ endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json"
+
+ # Create basic auth from account_sid and auth_token
+ auth = aiohttp.BasicAuth(account_sid, auth_token)
+
+ conference_name = transfer_context.conference_name
+ twiml = f"""
+