mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-13 08:15:21 +02:00
fix: fix circuit breaker failure recording chore: provide advanced configuration option in UI for campaigns
230 lines
9.4 KiB
Python
230 lines
9.4 KiB
Python
"""ARI-specific call operation strategies.
|
|
|
|
This module contains the business logic for Asterisk ARI call operations.
|
|
"""
|
|
|
|
from typing import Any, Dict
|
|
|
|
from loguru import logger
|
|
|
|
from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy
|
|
|
|
|
|
class ARIBridgeSwapStrategy(TransferStrategy):
|
|
"""Implements bridge swap transfer for Asterisk ARI.
|
|
|
|
This strategy handles transferring calls by swapping channels in existing
|
|
bridges, managing transfer contexts, and publishing
|
|
transfer completion events.
|
|
"""
|
|
|
|
async def execute_transfer(self, context: Dict[str, Any]) -> bool:
|
|
"""Execute bridge swap transfer for Asterisk ARI."""
|
|
try:
|
|
import aiohttp
|
|
import redis.asyncio as aioredis
|
|
from aiohttp import BasicAuth
|
|
|
|
channel_id = context["channel_id"]
|
|
ari_endpoint = context["ari_endpoint"]
|
|
app_name = context["app_name"]
|
|
app_password = context["app_password"]
|
|
|
|
if not channel_id or not ari_endpoint:
|
|
logger.warning(
|
|
"Cannot execute transfer: missing channel_id or ari_endpoint"
|
|
)
|
|
return False
|
|
|
|
logger.info(
|
|
f"[ARI Transfer] Executing bridge swap for channel {channel_id}"
|
|
)
|
|
|
|
from api.constants import REDIS_URL
|
|
from api.db import db_client
|
|
from api.services.telephony.call_transfer_manager import (
|
|
get_call_transfer_manager,
|
|
)
|
|
|
|
auth = BasicAuth(app_name, app_password)
|
|
|
|
# Get call transfer manager instance
|
|
call_transfer_manager = await get_call_transfer_manager()
|
|
|
|
# 1. Find active transfer context for this caller channel
|
|
transfer_context = (
|
|
await call_transfer_manager.find_transfer_context_for_call(channel_id)
|
|
)
|
|
if not transfer_context:
|
|
logger.error(
|
|
f"[ARI Transfer] No active transfer context found for caller {channel_id}"
|
|
)
|
|
return False
|
|
|
|
logger.info(
|
|
f"[ARI Transfer] Found transfer context: {transfer_context.transfer_id}, "
|
|
f"destination: {transfer_context.call_sid}"
|
|
)
|
|
|
|
# 2. Get workflow run to find current bridge and external media channel
|
|
redis = aioredis.from_url(REDIS_URL, decode_responses=True)
|
|
workflow_run_id = await redis.get(f"ari:channel:{channel_id}")
|
|
if not workflow_run_id:
|
|
logger.error(
|
|
f"[ARI Transfer] No workflow run found for caller {channel_id}"
|
|
)
|
|
return False
|
|
|
|
workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id))
|
|
if not workflow_run or not workflow_run.gathered_context:
|
|
logger.error(
|
|
f"[ARI Transfer] No workflow context found for run {workflow_run_id}"
|
|
)
|
|
return False
|
|
|
|
ctx = workflow_run.gathered_context
|
|
bridge_id = ctx.get("bridge_id")
|
|
ext_channel_id = ctx.get("ext_channel_id")
|
|
|
|
if not bridge_id or not ext_channel_id:
|
|
logger.error(
|
|
f"[ARI Transfer] Missing bridge/external channel info: {ctx}"
|
|
)
|
|
return False
|
|
|
|
destination_channel_id = transfer_context.call_sid
|
|
if not destination_channel_id:
|
|
logger.error(
|
|
f"[ARI Transfer] No destination channel in transfer context"
|
|
)
|
|
return False
|
|
|
|
logger.info(
|
|
f"[ARI Transfer] Bridge swap: bridge={bridge_id}, caller={channel_id}, "
|
|
f"destination={destination_channel_id}, ext_media={ext_channel_id}"
|
|
)
|
|
|
|
# 3. Set transfer state to prevent StasisEnd auto-teardown
|
|
workflow_run.gathered_context["transfer_state"] = "in-progress"
|
|
await db_client.update_workflow_run(
|
|
run_id=int(workflow_run_id),
|
|
gathered_context=workflow_run.gathered_context,
|
|
)
|
|
logger.debug(
|
|
f"[ARI Transfer] Set transfer_state=in-progress for workflow {workflow_run_id}"
|
|
)
|
|
|
|
# 4. Execute bridge swap operations via ARI REST API
|
|
async with aiohttp.ClientSession() as session:
|
|
# Add destination channel to existing bridge
|
|
add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel"
|
|
async with session.post(
|
|
add_url, auth=auth, params={"channel": destination_channel_id}
|
|
) as response:
|
|
if response.status in (200, 204):
|
|
logger.info(
|
|
f"[ARI Transfer] Added destination {destination_channel_id} to bridge {bridge_id}"
|
|
)
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(
|
|
f"[ARI Transfer] Failed to add destination to bridge: {response.status} {error_text}"
|
|
)
|
|
return False
|
|
|
|
# Remove external media channel from bridge
|
|
remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel"
|
|
async with session.post(
|
|
remove_url, auth=auth, params={"channel": ext_channel_id}
|
|
) as response:
|
|
if response.status in (200, 204):
|
|
logger.info(
|
|
f"[ARI Transfer] Removed external media {ext_channel_id} from bridge {bridge_id}"
|
|
)
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(
|
|
f"[ARI Transfer] Failed to remove external media from bridge: {response.status} {error_text}"
|
|
)
|
|
|
|
# Hang up the external media channel
|
|
hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}"
|
|
async with session.delete(hangup_url, auth=auth) as response:
|
|
if response.status in (200, 204):
|
|
logger.info(
|
|
f"[ARI Transfer] Hung up external media channel {ext_channel_id}"
|
|
)
|
|
elif response.status == 404:
|
|
logger.debug(
|
|
f"[ARI Transfer] External media channel {ext_channel_id} already gone"
|
|
)
|
|
else:
|
|
error_text = await response.text()
|
|
logger.warning(
|
|
f"[ARI Transfer] Failed to hang up external media: {response.status} {error_text}"
|
|
)
|
|
|
|
logger.info(
|
|
f"[ARI Transfer] Bridge swap completed successfully for transfer {transfer_context.transfer_id}, "
|
|
f"caller {channel_id} connected to destination {destination_channel_id} via bridge {bridge_id}"
|
|
)
|
|
|
|
# 5. Clean up transfer context after successful completion
|
|
|
|
call_transfer_manager = await get_call_transfer_manager()
|
|
await call_transfer_manager.remove_transfer_context(
|
|
transfer_context.transfer_id
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Failed to execute ARI transfer: {e}")
|
|
return False
|
|
|
|
|
|
class ARIHangupStrategy(HangupStrategy):
|
|
"""Implements hangup for Asterisk ARI channels."""
|
|
|
|
async def execute_hangup(self, context: Dict[str, Any]) -> bool:
|
|
"""Hang up the Asterisk channel via ARI REST API."""
|
|
try:
|
|
import aiohttp
|
|
from aiohttp import BasicAuth
|
|
|
|
channel_id = context["channel_id"]
|
|
ari_endpoint = context["ari_endpoint"]
|
|
app_name = context["app_name"]
|
|
app_password = context["app_password"]
|
|
|
|
if not channel_id or not ari_endpoint:
|
|
logger.warning(
|
|
"Cannot hang up Asterisk channel: missing channel_id or ari_endpoint"
|
|
)
|
|
return False
|
|
|
|
endpoint = f"{ari_endpoint}/ari/channels/{channel_id}"
|
|
auth = BasicAuth(app_name, app_password)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.delete(endpoint, auth=auth) as response:
|
|
if response.status in (200, 204):
|
|
logger.info(
|
|
f"Successfully terminated Asterisk channel {channel_id}"
|
|
)
|
|
return True
|
|
elif response.status == 404:
|
|
logger.debug(
|
|
f"Asterisk channel {channel_id} was already terminated"
|
|
)
|
|
return True
|
|
else:
|
|
error_text = await response.text()
|
|
logger.error(
|
|
f"Failed to terminate Asterisk channel {channel_id}: "
|
|
f"Status {response.status}, Response: {error_text}"
|
|
)
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Failed to hang up Asterisk channel: {e}")
|
|
return False
|