dograh/api/services/telephony/providers/ari_call_strategies.py
Abhishek Kumar 3ea235a666 fix: fix circuit breaker failure recording
fix: fix circuit breaker failure recording
chore: provide advanced configuration option in UI for campaigns
2026-03-05 13:43:13 +05:30

230 lines
9.4 KiB
Python

"""ARI-specific call operation strategies.
This module contains the business logic for Asterisk ARI call operations.
"""
from typing import Any, Dict
from loguru import logger
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
class ARIBridgeSwapStrategy(TransferStrategy):
"""Implements bridge swap transfer for Asterisk ARI.
This strategy handles transferring calls by swapping channels in existing
bridges, managing transfer contexts, and publishing
transfer completion events.
"""
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
"""Execute bridge swap transfer for Asterisk ARI."""
try:
import aiohttp
import redis.asyncio as aioredis
from aiohttp import BasicAuth
channel_id = context["channel_id"]
ari_endpoint = context["ari_endpoint"]
app_name = context["app_name"]
app_password = context["app_password"]
if not channel_id or not ari_endpoint:
logger.warning(
"Cannot execute transfer: missing channel_id or ari_endpoint"
)
return False
logger.info(
f"[ARI Transfer] Executing bridge swap for channel {channel_id}"
)
from api.constants import REDIS_URL
from api.db import db_client
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
auth = BasicAuth(app_name, app_password)
# Get call transfer manager instance
call_transfer_manager = await get_call_transfer_manager()
# 1. Find active transfer context for this caller channel
transfer_context = (
await call_transfer_manager.find_transfer_context_for_call(channel_id)
)
if not transfer_context:
logger.error(
f"[ARI Transfer] No active transfer context found for caller {channel_id}"
)
return False
logger.info(
f"[ARI Transfer] Found transfer context: {transfer_context.transfer_id}, "
f"destination: {transfer_context.call_sid}"
)
# 2. Get workflow run to find current bridge and external media channel
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
workflow_run_id = await redis.get(f"ari:channel:{channel_id}")
if not workflow_run_id:
logger.error(
f"[ARI Transfer] No workflow run found for caller {channel_id}"
)
return False
workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id))
if not workflow_run or not workflow_run.gathered_context:
logger.error(
f"[ARI Transfer] No workflow context found for run {workflow_run_id}"
)
return False
ctx = workflow_run.gathered_context
bridge_id = ctx.get("bridge_id")
ext_channel_id = ctx.get("ext_channel_id")
if not bridge_id or not ext_channel_id:
logger.error(
f"[ARI Transfer] Missing bridge/external channel info: {ctx}"
)
return False
destination_channel_id = transfer_context.call_sid
if not destination_channel_id:
logger.error(
f"[ARI Transfer] No destination channel in transfer context"
)
return False
logger.info(
f"[ARI Transfer] Bridge swap: bridge={bridge_id}, caller={channel_id}, "
f"destination={destination_channel_id}, ext_media={ext_channel_id}"
)
# 3. Set transfer state to prevent StasisEnd auto-teardown
workflow_run.gathered_context["transfer_state"] = "in-progress"
await db_client.update_workflow_run(
run_id=int(workflow_run_id),
gathered_context=workflow_run.gathered_context,
)
logger.debug(
f"[ARI Transfer] Set transfer_state=in-progress for workflow {workflow_run_id}"
)
# 4. Execute bridge swap operations via ARI REST API
async with aiohttp.ClientSession() as session:
# Add destination channel to existing bridge
add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel"
async with session.post(
add_url, auth=auth, params={"channel": destination_channel_id}
) as response:
if response.status in (200, 204):
logger.info(
f"[ARI Transfer] Added destination {destination_channel_id} to bridge {bridge_id}"
)
else:
error_text = await response.text()
logger.error(
f"[ARI Transfer] Failed to add destination to bridge: {response.status} {error_text}"
)
return False
# Remove external media channel from bridge
remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel"
async with session.post(
remove_url, auth=auth, params={"channel": ext_channel_id}
) as response:
if response.status in (200, 204):
logger.info(
f"[ARI Transfer] Removed external media {ext_channel_id} from bridge {bridge_id}"
)
else:
error_text = await response.text()
logger.error(
f"[ARI Transfer] Failed to remove external media from bridge: {response.status} {error_text}"
)
# Hang up the external media channel
hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}"
async with session.delete(hangup_url, auth=auth) as response:
if response.status in (200, 204):
logger.info(
f"[ARI Transfer] Hung up external media channel {ext_channel_id}"
)
elif response.status == 404:
logger.debug(
f"[ARI Transfer] External media channel {ext_channel_id} already gone"
)
else:
error_text = await response.text()
logger.warning(
f"[ARI Transfer] Failed to hang up external media: {response.status} {error_text}"
)
logger.info(
f"[ARI Transfer] Bridge swap completed successfully for transfer {transfer_context.transfer_id}, "
f"caller {channel_id} connected to destination {destination_channel_id} via bridge {bridge_id}"
)
# 5. Clean up transfer context after successful completion
call_transfer_manager = await get_call_transfer_manager()
await call_transfer_manager.remove_transfer_context(
transfer_context.transfer_id
)
return True
except Exception as e:
logger.exception(f"Failed to execute ARI transfer: {e}")
return False
class ARIHangupStrategy(HangupStrategy):
"""Implements hangup for Asterisk ARI channels."""
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
"""Hang up the Asterisk channel via ARI REST API."""
try:
import aiohttp
from aiohttp import BasicAuth
channel_id = context["channel_id"]
ari_endpoint = context["ari_endpoint"]
app_name = context["app_name"]
app_password = context["app_password"]
if not channel_id or not ari_endpoint:
logger.warning(
"Cannot hang up Asterisk channel: missing channel_id or ari_endpoint"
)
return False
endpoint = f"{ari_endpoint}/ari/channels/{channel_id}"
auth = BasicAuth(app_name, app_password)
async with aiohttp.ClientSession() as session:
async with session.delete(endpoint, auth=auth) as response:
if response.status in (200, 204):
logger.info(
f"Successfully terminated Asterisk channel {channel_id}"
)
return True
elif response.status == 404:
logger.debug(
f"Asterisk channel {channel_id} was already terminated"
)
return True
else:
error_text = await response.text()
logger.error(
f"Failed to terminate Asterisk channel {channel_id}: "
f"Status {response.status}, Response: {error_text}"
)
return False
except Exception as e:
logger.exception(f"Failed to hang up Asterisk channel: {e}")
return False