"""ARI WebSocket Event Listener Manager. Standalone process that: 1. Queries the database for all organizations with ARI telephony configuration 2. Creates WebSocket connections to each ARI instance 3. Handles reconnection logic with exponential backoff 4. Processes StasisStart/StasisEnd events 5. Periodically refreshes configuration to detect new/removed organizations """ from api.logging_config import setup_logging setup_logging() import asyncio import json import signal import uuid from typing import Dict, Optional, Set from urllib.parse import urlparse import aiohttp import redis.asyncio as aioredis import websockets from loguru import logger from api.constants import REDIS_URL from api.db import db_client from api.enums import CallType, WorkflowRunMode from api.services.quota_service import check_dograh_quota_by_user_id from api.services.telephony.call_transfer_manager import get_call_transfer_manager from api.services.telephony.transfer_event_protocol import ( TransferEvent, TransferEventType, ) # Redis key pattern and TTL for channel-to-run mapping _CHANNEL_KEY_PREFIX = "ari:channel:" _EXT_CHANNEL_KEY_PREFIX = "ari:ext_channel:" _PENDING_BRIDGE_PREFIX = "ari:pending_bridge:" _CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry _PENDING_BRIDGE_TTL = 300 # 5 min safety expiry for bridge-pending state class ARIConnection: """Manages a single ARI WebSocket connection for an organization.""" def __init__( self, organization_id: int, telephony_configuration_id: int, ari_endpoint: str, app_name: str, app_password: str, ws_client_name: str = "", ): self.organization_id = organization_id self.telephony_configuration_id = telephony_configuration_id self.ari_endpoint = ari_endpoint.rstrip("/") self.app_name = app_name self.app_password = app_password self.ws_client_name = ws_client_name self._ws: Optional[websockets.ClientConnection] = None self._task: Optional[asyncio.Task] = None self._running = False self._reconnect_delay = 1 # Start with 1 second self._max_reconnect_delay = 300 # Max 300 seconds self._ping_interval = 30 # Send ping every 30 seconds # Redis client for channel-to-run reverse mapping (lazy init) self._redis_client: Optional[aioredis.Redis] = None # Transfer manager for handling call transfers self._call_transfer_manager = None async def _get_redis(self) -> aioredis.Redis: """Get Redis client instance (lazy init).""" if not self._redis_client: self._redis_client = await aioredis.from_url( REDIS_URL, decode_responses=True ) return self._redis_client async def _get_transfer_manager(self): """Get transfer manager instance.""" if not self._call_transfer_manager: self._call_transfer_manager = await get_call_transfer_manager() return self._call_transfer_manager async def _set_channel_run(self, channel_id: str, workflow_run_id: str): """Store channel_id -> workflow_run_id mapping in Redis.""" r = await self._get_redis() await r.set( f"{_CHANNEL_KEY_PREFIX}{channel_id}", workflow_run_id, ex=_CHANNEL_KEY_TTL, ) async def _get_channel_run(self, channel_id: str) -> Optional[str]: """Look up workflow_run_id for a channel_id from Redis.""" r = await self._get_redis() return await r.get(f"{_CHANNEL_KEY_PREFIX}{channel_id}") async def _delete_channel_run(self, *channel_ids: str): """Delete channel-to-run mapping(s) from Redis.""" if not channel_ids: return r = await self._get_redis() keys = [f"{_CHANNEL_KEY_PREFIX}{cid}" for cid in channel_ids] await r.delete(*keys) async def _mark_ext_channel(self, channel_id: str): """Mark a channel as an external media channel we created.""" r = await self._get_redis() await r.set(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}", "1", ex=_CHANNEL_KEY_TTL) async def _is_ext_channel(self, channel_id: str) -> bool: """Check if a channel is an external media channel we created.""" r = await self._get_redis() return await r.exists(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}") > 0 async def _delete_ext_channel(self, channel_id: str): """Remove the external media channel marker.""" r = await self._get_redis() await r.delete(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}") async def _set_pending_bridge( self, ext_channel_id: str, caller_channel_id: str, workflow_run_id: str, ): """Store the bridge context to be consumed when ext media enters Stasis.""" r = await self._get_redis() await r.set( f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}", json.dumps( { "caller_channel_id": caller_channel_id, "workflow_run_id": workflow_run_id, } ), ex=_PENDING_BRIDGE_TTL, ) async def _pop_pending_bridge(self, ext_channel_id: str) -> Optional[dict]: """Read and delete the pending bridge context. Returns None if absent.""" r = await self._get_redis() val = await r.getdel(f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}") if val is None: return None return json.loads(val) @property def ws_url(self) -> str: """Build the ARI WebSocket URL.""" parsed = urlparse(self.ari_endpoint) ws_scheme = "wss" if parsed.scheme == "https" else "ws" return ( f"{ws_scheme}://{parsed.netloc}/ari/events" f"?api_key={self.app_name}:{self.app_password}" f"&app={self.app_name}" f"&subscribeAll=true" ) @property def connection_key(self) -> str: """Unique key for this connection — one per ARI config row.""" return f"config:{self.telephony_configuration_id}" async def start(self): """Start the WebSocket connection in a background task.""" if self._running: return self._running = True self._task = asyncio.create_task(self._connection_loop()) logger.info( f"[ARI org={self.organization_id}] Started connection to {self.ari_endpoint}" ) async def stop(self): """Stop the WebSocket connection.""" self._running = False if self._ws: await self._ws.close() if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info( f"[ARI org={self.organization_id}] Stopped connection to {self.ari_endpoint}" ) async def _connection_loop(self): """Main connection loop with reconnection logic.""" while self._running: try: await self._connect_and_listen() except asyncio.CancelledError: break except Exception as e: if not self._running: break logger.warning( f"[ARI org={self.organization_id}] Connection error: {e}. " f"Reconnecting in {self._reconnect_delay}s..." ) await asyncio.sleep(self._reconnect_delay) # Exponential backoff self._reconnect_delay = min( self._reconnect_delay * 2, self._max_reconnect_delay ) async def _connect_and_listen(self): """Establish WebSocket connection and listen for events.""" ws_url = self.ws_url logger.info( f"[ARI org={self.organization_id}] Connecting to {self.ari_endpoint}..." ) async for ws in websockets.connect( ws_url, ping_interval=self._ping_interval, ping_timeout=10, close_timeout=5, ): try: self._ws = ws # Reset reconnect delay on successful connection self._reconnect_delay = 1 logger.info( f"[ARI org={self.organization_id}] WebSocket connected to {self.ari_endpoint}" ) async for message in ws: if not self._running: return if isinstance(message, str): await self._handle_event(message) else: logger.debug( f"[ARI org={self.organization_id}] Received binary message, ignoring" ) except websockets.ConnectionClosed as e: if not self._running: return logger.warning( f"[ARI org={self.organization_id}] WebSocket closed: " f"code={e.code}, reason={e.reason}. Reconnecting..." ) continue finally: self._ws = None async def _handle_event(self, raw_data: str): """Handle an ARI WebSocket event.""" try: event = json.loads(raw_data) except json.JSONDecodeError: logger.warning( f"[ARI org={self.organization_id}] Invalid JSON: {raw_data[:200]}" ) return event_type = event.get("type", "unknown") channel = event.get("channel", {}) channel_id = channel.get("id", "unknown") channel_state = channel.get("state", "unknown") # Log all events for each channel logger.debug( f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}" ) if event_type == "StasisStart": if await self._is_ext_channel(channel_id): # External media channel has entered Stasis. If there is a # queued bridge for it, finish bridging now; otherwise the # caller-side handler did not register one and this event is # nothing for us to act on. pending = await self._pop_pending_bridge(channel_id) if pending is None: logger.debug( f"[ARI org={self.organization_id}] StasisStart for ext " f"channel {channel_id} with no pending bridge" ) return logger.info( f"[ARI org={self.organization_id}] Ext channel {channel_id} " f"entered Stasis — completing bridge for caller " f"{pending['caller_channel_id']} (run {pending['workflow_run_id']})" ) asyncio.create_task( self._complete_bridge_after_ext_ready(channel_id, pending) ) return app_args = event.get("args", []) caller = channel.get("caller", {}) logger.info( f"[ARI org={self.organization_id}] StasisStart: " f"channel={channel_id}, state={channel_state}, " f"caller={caller.get('number', 'unknown')}, " f"args={app_args}" ) if channel_state == "Ring": # Inbound call — arrived from outside, not yet answered asyncio.create_task( self._handle_inbound_stasis_start(channel_id, channel_state, event) ) else: # Outbound call (state == "Up") — originated by us # Check if this is a transfer destination channel (app_args starts with "transfer") # Transfer destinations run externally - we only track status to publish transfer event, not run the pipeline transfer_id = self._get_transfer_id(app_args) if transfer_id: logger.info( f"[ARI org={self.organization_id}] Transfer destination answered: " f"channel={channel_id}, transfer_id={transfer_id}" ) asyncio.create_task( self._handle_destination_answered(transfer_id, channel_id) ) return # Parse args to extract workflow context args_dict = {} for arg in app_args: for pair in arg.split(","): if "=" in pair: key, value = pair.split("=", 1) args_dict[key.strip()] = value.strip() workflow_run_id = args_dict.get("workflow_run_id") workflow_id = args_dict.get("workflow_id") user_id = args_dict.get("user_id") if not workflow_run_id or not workflow_id or not user_id: logger.warning( f"[ARI org={self.organization_id}] StasisStart missing required args: " f"workflow_run_id={workflow_run_id}, workflow_id={workflow_id}, user_id={user_id}" ) return # Start pipeline connection in background task asyncio.create_task( self._handle_stasis_start( channel_id, channel_state, workflow_run_id, workflow_id, user_id ) ) elif event_type == "StasisEnd": logger.info( f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}" ) workflow_run_id = await self._get_channel_run(channel_id) if workflow_run_id: asyncio.create_task( self._handle_stasis_end(channel_id, workflow_run_id) ) elif event_type == "ChannelStateChange": logger.debug( f"[ARI org={self.organization_id}] ChannelStateChange: " f"channel={channel_id}, state={channel_state}" ) elif event_type == "ChannelDestroyed": cause = event.get("cause", 0) cause_txt = event.get("cause_txt", "unknown") tech_cause = event.get("tech_cause", "unknown") logger.info( f"[ARI org={self.organization_id}] ChannelDestroyed: " f"channel={channel_id}, cause={cause} ({cause_txt}), tech_cause = {tech_cause}" ) # Check if this is a transfer destination that failed transfer_id = await self._get_transfer_id_for_channel(channel_id) if transfer_id: failure_message = self._map_hangup_cause_to_message( cause, tech_cause, cause_txt ) asyncio.create_task( self._handle_transfer_failed( transfer_id, channel_id, failure_message ) ) elif event_type == "ChannelDtmfReceived": digit = event.get("digit", "") logger.debug( f"[ARI org={self.organization_id}] DTMF: " f"channel={channel_id}, digit={digit}" ) else: logger.debug( f"[ARI org={self.organization_id}] Event: {event_type} " f"channel={channel_id}" ) async def _ari_request(self, method: str, path: str, **kwargs) -> dict: """Make an ARI REST API request.""" url = f"{self.ari_endpoint}/ari{path}" auth = aiohttp.BasicAuth(self.app_name, self.app_password) async with aiohttp.ClientSession() as session: async with session.request(method, url, auth=auth, **kwargs) as response: response_text = await response.text() if response.status not in (200, 201, 204): logger.error( f"[ARI org={self.organization_id}] REST API error: " f"{method} {path} -> {response.status}: {response_text}" ) return {} if response_text: return json.loads(response_text) return {} async def _answer_channel(self, channel_id: str) -> bool: """Answer an ARI channel.""" await self._ari_request("POST", f"/channels/{channel_id}/answer") # answer returns 204 No Content on success, so empty dict is OK logger.info(f"[ARI org={self.organization_id}] Answered channel {channel_id}") return True async def _create_external_media( self, workflow_id: str, user_id: str, workflow_run_id: str, channel_id: Optional[str] = None, ) -> str: """Create an external media channel via chan_websocket. Uses ARI externalMedia with transport=websocket so Asterisk connects to our backend over WebSocket (via websocket_client.conf). Dynamic routing params are passed as URI query params via v() in transport_data. If ``channel_id`` is provided, it is passed to Asterisk as the ``channelId`` query parameter so the new channel is created with that id. The caller can then register ext-channel state ahead of the POST and avoid racing against the StasisStart event. """ # v() appends URI query params to the websocket_client.conf URL # e.g. wss://api.dograh.com/ws/ari?workflow_id=1&user_id=2&workflow_run_id=3 transport_data = ( f"v(workflow_id={workflow_id}," f"user_id={user_id}," f"workflow_run_id={workflow_run_id})" ) params = { "app": self.app_name, "external_host": self.ws_client_name, "format": "ulaw", "transport": "websocket", "encapsulation": "none", "connection_type": "client", "direction": "both", "transport_data": transport_data, } if channel_id: params["channelId"] = channel_id result = await self._ari_request( "POST", "/channels/externalMedia", params=params ) ext_channel_id = result.get("id", "") if ext_channel_id: # Idempotent — caller may have already marked it before the POST. await self._mark_ext_channel(ext_channel_id) logger.info( f"[ARI org={self.organization_id}] Created external media channel: {ext_channel_id}" ) return ext_channel_id async def _create_bridge_and_add_channels(self, channel_ids: list) -> str: """Create a bridge and add channels to it.""" # Create bridge bridge_result = await self._ari_request( "POST", "/bridges", params={"type": "mixing", "name": f"bridge-{channel_ids[0]}"}, ) bridge_id = bridge_result.get("id", "") if not bridge_id: logger.error(f"[ARI org={self.organization_id}] Failed to create bridge") return "" # Add channels to bridge await self._ari_request( "POST", f"/bridges/{bridge_id}/addChannel", params={"channel": ",".join(channel_ids)}, ) logger.info( f"[ARI org={self.organization_id}] Bridge {bridge_id} created with channels: {channel_ids}" ) return bridge_id async def _handle_inbound_stasis_start( self, channel_id: str, channel_state: str, event: dict ): """Handle an inbound call (StasisStart with state=Ring). Validates quota, creates a workflow run, then delegates to the standard answer→externalMedia→bridge pipeline. """ channel = event.get("channel", {}) caller_number = channel.get("caller", {}).get("number", "unknown") called_number = channel.get("dialplan", {}).get("exten", "unknown") try: # 1. Resolve the workflow from the called extension via the # telephony_phone_numbers row scoped to this connection's config. phone_row = await db_client.find_active_phone_number_for_inbound( self.organization_id, called_number, "ari" ) if ( not phone_row or phone_row.telephony_configuration_id != self.telephony_configuration_id ): logger.warning( f"[ARI org={self.organization_id}] Inbound call to extension " f"{called_number} on channel {channel_id} — no matching phone " f"number registered for config {self.telephony_configuration_id}, " f"hanging up" ) await self._delete_channel(channel_id) return inbound_workflow_id = phone_row.inbound_workflow_id if not inbound_workflow_id: logger.warning( f"[ARI org={self.organization_id}] Inbound call to extension " f"{called_number} on channel {channel_id} — phone number " f"{phone_row.address} has no inbound_workflow_id assigned, " f"hanging up" ) await self._delete_channel(channel_id) return # 2. Load workflow to get user_id and verify organization workflow = await db_client.get_workflow( inbound_workflow_id, organization_id=self.organization_id ) if not workflow: logger.warning( f"[ARI org={self.organization_id}] Workflow {inbound_workflow_id} " f"not found or doesn't belong to this organization — hanging up" ) await self._delete_channel(channel_id) return user_id = workflow.user_id # 3. Check quota (apply per-workflow model_overrides). quota_result = await check_dograh_quota_by_user_id( user_id, workflow_id=inbound_workflow_id ) if not quota_result.has_quota: logger.warning( f"[ARI org={self.organization_id}] Quota exceeded for user {user_id} " f"— hanging up inbound call {channel_id}" ) await self._delete_channel(channel_id) return # 4. Create workflow run call_id = channel_id workflow_run = await db_client.create_workflow_run( name=f"ARI Inbound {caller_number}", workflow_id=inbound_workflow_id, mode=WorkflowRunMode.ARI.value, user_id=user_id, call_type=CallType.INBOUND, initial_context={ "caller_number": caller_number, "called_number": called_number, "direction": "inbound", "provider": "ari", "telephony_configuration_id": self.telephony_configuration_id, }, gathered_context={ "call_id": call_id, }, ) logger.info( f"[ARI org={self.organization_id}] Created inbound workflow run " f"{workflow_run.id} for channel {channel_id} " f"(caller={caller_number}, called={called_number})" ) # 5. Answer the inbound channel await self._answer_channel(channel_id) # 6. Delegate to the standard pipeline await self._handle_stasis_start( channel_id, channel_state, str(workflow_run.id), str(inbound_workflow_id), str(user_id), ) except Exception as e: logger.error( f"[ARI org={self.organization_id}] Error handling inbound StasisStart " f"for channel {channel_id}: {e}" ) try: await self._delete_channel(channel_id) except Exception: pass async def _handle_stasis_start( self, channel_id: str, channel_state: str, workflow_run_id: str, workflow_id: str, user_id: str, ): """Set up external media for a caller channel that has entered Stasis. Creates the external media channel via chan_websocket and registers a pending bridge entry keyed by its channel id. The bridge itself is created in :meth:`_complete_bridge_after_ext_ready` once the external media channel has entered Stasis (its own StasisStart event). """ ext_channel_id = f"dograh-ext-{uuid.uuid4()}" try: logger.info( f"[ARI org={self.organization_id}] Setting up external media for " f"channel {channel_id} via ws_client={self.ws_client_name} " f"(ext_channel_id={ext_channel_id})" ) # 1. Track caller channel for StasisEnd cleanup (Redis). await self._set_channel_run(channel_id, workflow_run_id) # 2. Pre-register all ext-channel state synchronously, before the # externalMedia POST is sent. Asterisk can fire StasisStart for # the ext channel before the POST response returns; registering # here guarantees that event handler finds the marker and the # pending bridge entry regardless of ordering. await self._mark_ext_channel(ext_channel_id) await self._set_channel_run(ext_channel_id, workflow_run_id) await self._set_pending_bridge(ext_channel_id, channel_id, workflow_run_id) # Persist the caller channel id as call_id. Inbound runs already # set this in create_workflow_run, but outbound runs never do, so # without this the serializer hangup (provider reads # gathered_context["call_id"]) and the StasisEnd teardown both get # an empty channel id and fail to hang up the live caller channel. await db_client.update_workflow_run( run_id=int(workflow_run_id), gathered_context={ "ext_channel_id": ext_channel_id, "call_id": channel_id, }, ) # 3. Create the ext media channel with the id we just registered. created_id = await self._create_external_media( workflow_id, user_id, workflow_run_id, channel_id=ext_channel_id, ) if not created_id: await self._pop_pending_bridge(ext_channel_id) logger.error( f"[ARI org={self.organization_id}] Failed to create external " f"media for {channel_id} (ext_channel_id={ext_channel_id})" ) return if created_id != ext_channel_id: # Asterisk ignored our channelId — pending state is stale and # will never be consumed. Clear it and surface loudly. await self._pop_pending_bridge(ext_channel_id) logger.error( f"[ARI org={self.organization_id}] Asterisk returned channel " f"id {created_id} but we requested {ext_channel_id}; " f"channelId may not be honored on this ARI version" ) return logger.info( f"[ARI org={self.organization_id}] Queued bridge for caller " f"{channel_id} <-> ext {ext_channel_id} (run {workflow_run_id}); " f"waiting for ext channel StasisStart" ) except Exception as e: await self._pop_pending_bridge(ext_channel_id) logger.error( f"[ARI org={self.organization_id}] Error handling StasisStart " f"for channel {channel_id}: {e}" ) async def _complete_bridge_after_ext_ready( self, ext_channel_id: str, pending: dict ): """Bridge the caller and external media channels for a queued entry. Invoked from the external media channel's StasisStart handler with the pending entry that :meth:`_handle_stasis_start` registered. Both channels are in the Stasis application at this point, so the bridge and addChannel calls can succeed. """ caller_channel_id = pending["caller_channel_id"] workflow_run_id = pending["workflow_run_id"] try: bridge_id = await self._create_bridge_and_add_channels( [caller_channel_id, ext_channel_id] ) if not bridge_id: logger.error( f"[ARI org={self.organization_id}] Failed to bridge " f"channels {caller_channel_id} <-> {ext_channel_id}" ) return await db_client.update_workflow_run( run_id=int(workflow_run_id), gathered_context={ "ext_channel_id": ext_channel_id, "bridge_id": bridge_id, }, ) except Exception as e: logger.error( f"[ARI org={self.organization_id}] Error completing bridge for " f"caller {caller_channel_id} / ext {ext_channel_id}: {e}" ) async def _handle_stasis_end(self, channel_id: str, workflow_run_id: str): """Full teardown of all ARI resources on any channel's StasisEnd. When either channel (call or ext) fires StasisEnd, we tear down the bridge and both channels — like endConferenceOnExit. """ try: workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id)) if not workflow_run or not workflow_run.gathered_context: logger.warning( f"[ARI org={self.organization_id}] StasisEnd: no gathered_context " f"for workflow_run {workflow_run_id}" ) # Still clean up the Redis key for the channel that ended await self._delete_channel_run(channel_id) return ctx = workflow_run.gathered_context call_id = ctx.get("call_id") ext_channel_id = ctx.get("ext_channel_id") bridge_id = ctx.get("bridge_id") transfer_state = ctx.get("transfer_state") # Check if this is a call transfer scenario external channel. Skip full teardown if # transfer is in progress and this is the external media channel # During call transfer, we preserve the caller-destination bridge if ( transfer_state == "in-progress" and channel_id == ext_channel_id and ext_channel_id is not None ): logger.info( f"[ARI org={self.organization_id}] Transfer in progress - skipping full teardown " f"for external channel {channel_id}, preserving bridge {bridge_id} and caller {call_id}" ) # Update transfer state to complete ctx["transfer_state"] = "complete" await db_client.update_workflow_run( run_id=int(workflow_run_id), gathered_context=ctx ) # Clean up only Redis markers for external channel await self._delete_channel_run(channel_id) await self._delete_ext_channel(channel_id) logger.info( f"[ARI org={self.organization_id}] Transfer cleanup complete - preserved caller {call_id} " f"in bridge {bridge_id}" ) return # Normal full teardown for non-transfer scenarios (transfer_state is None or not in-progress) # Delete the bridge first (removes channels from it) if bridge_id: await self._delete_bridge(bridge_id) # Destroy both channels, skipping the one that already ended for cid in (call_id, ext_channel_id): if cid and cid != channel_id: await self._delete_channel(cid) # Clean up all Redis reverse-mapping keys keys_to_delete = [ cid for cid in (call_id, ext_channel_id, channel_id) if cid ] if keys_to_delete: await self._delete_channel_run(*keys_to_delete) # Clean up the Redis marker for external channel await self._delete_ext_channel(ext_channel_id) logger.info( f"[ARI org={self.organization_id}] StasisEnd full teardown for " f"channel={channel_id}, call={call_id}, ext={ext_channel_id}, bridge={bridge_id}" ) except Exception as e: logger.error( f"[ARI org={self.organization_id}] Error cleaning up StasisEnd " f"for channel {channel_id}: {e}" ) async def _delete_bridge(self, bridge_id: str): """Delete an ARI bridge. Ignores 404 (already gone).""" url = f"{self.ari_endpoint}/ari/bridges/{bridge_id}" auth = aiohttp.BasicAuth(self.app_name, self.app_password) async with aiohttp.ClientSession() as session: async with session.delete(url, auth=auth) as response: if response.status in (200, 204): logger.info( f"[ARI org={self.organization_id}] Deleted bridge {bridge_id}" ) elif response.status == 404: logger.debug( f"[ARI org={self.organization_id}] Bridge {bridge_id} already gone" ) else: text = await response.text() logger.error( f"[ARI org={self.organization_id}] Failed to delete bridge {bridge_id}: " f"{response.status} {text}" ) # ======== CALL TRANSFER HELPER METHODS ======== def _map_hangup_cause_to_message( self, cause: int, tech_cause: str, cause_txt: str ) -> str: """Map Asterisk cause codes to user-friendly transfer failure messages.""" if cause == 17 and tech_cause == "486": # User busy/declined return "The person declined the call or their line is busy." elif cause == 19 and tech_cause == "480": # No answer return "The transfer call was not answered. The person may be busy or unavailable right now." elif cause == 21: # Call rejected return "The transfer call failed to connect. There may be a network issue or the number is unavailable." else: return f"Transfer failed: {cause_txt}" def _get_transfer_id(self, app_args: list) -> Optional[str]: """Get transfer_id if this is a transfer channel, None otherwise. Args format: ['transfer', '{transfer_id}', '{conf_name}'] """ if len(app_args) > 1 and app_args[0] == "transfer": transfer_id = app_args[1] logger.debug( f"[ARI org={self.organization_id}] Detected transfer channel with transfer_id: {transfer_id}" ) return transfer_id return None async def _get_transfer_id_for_channel(self, channel_id: str) -> Optional[str]: """Get transfer_id for a channel by checking Redis mapping.""" try: r = await self._get_redis() transfer_id = await r.get(f"ari:transfer_channel:{channel_id}") logger.debug( f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}" ) return transfer_id except Exception as e: logger.error( f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}" ) return None async def _handle_destination_answered( self, transfer_id: str, destination_channel_id: str ): """Handle transfer destination channel answered - publish success event.""" try: logger.info( f"[ARI Transfer org={self.organization_id}] Destination {destination_channel_id} " f"answered for transfer {transfer_id}" ) # Store channel mapping for potential future events and get transfer context transfer_manager = await self._get_transfer_manager() await transfer_manager.store_transfer_channel_mapping( destination_channel_id, transfer_id ) context = await transfer_manager.get_transfer_context(transfer_id) if not context: logger.error( f"[ARI Transfer org={self.organization_id}] No transfer context found for {transfer_id}" ) return logger.info( f"[ARI Transfer org={self.organization_id}] Transfer {transfer_id} success: " f"caller={context.original_call_sid} -> destination={destination_channel_id}" ) # Publish destination answered event - this will trigger the bridge swap in serializer success_event = TransferEvent( type=TransferEventType.DESTINATION_ANSWERED, transfer_id=transfer_id, original_call_sid=context.original_call_sid, transfer_call_sid=destination_channel_id, conference_name=context.conference_name, message="Transfer destination answered", status="success", action="destination_answered", ) await transfer_manager.publish_transfer_event(success_event) except Exception as e: logger.error( f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}" ) # On error, publish failure event await self._handle_transfer_failed( transfer_id, destination_channel_id, f"Transfer processing error: {e}" ) async def _handle_transfer_failed( self, transfer_id: str, channel_id: str, reason: str ): """Handle transfer failure - publish failure event.""" try: logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}") transfer_manager = await self._get_transfer_manager() context = await transfer_manager.get_transfer_context(transfer_id) # Publish failure event failure_event = TransferEvent( type=TransferEventType.TRANSFER_FAILED, transfer_id=transfer_id, original_call_sid=context.original_call_sid if context else "", transfer_call_sid=channel_id, message=f"Transfer failed: {reason}", status="failed", action="transfer_failed", reason=reason, ) await transfer_manager.publish_transfer_event(failure_event) except Exception as e: logger.error(f"[ARI Transfer] Error handling transfer failure: {e}") async def _delete_channel(self, channel_id: str): """Delete (hang up) an ARI channel. Ignores 404 (already gone).""" url = f"{self.ari_endpoint}/ari/channels/{channel_id}" auth = aiohttp.BasicAuth(self.app_name, self.app_password) async with aiohttp.ClientSession() as session: async with session.delete(url, auth=auth) as response: if response.status in (200, 204): logger.info( f"[ARI org={self.organization_id}] Deleted channel {channel_id}" ) elif response.status == 404: logger.debug( f"[ARI org={self.organization_id}] Channel {channel_id} already gone" ) else: text = await response.text() logger.error( f"[ARI org={self.organization_id}] Failed to delete channel {channel_id}: " f"{response.status} {text}" ) class ARIManager: """Manages ARI WebSocket connections for all organizations.""" def __init__(self): self._connections: Dict[str, ARIConnection] = {} # key -> connection self._running = False self._config_refresh_interval = 60 # Check for config changes every 60 seconds async def start(self): """Start the ARI manager.""" self._running = True logger.info("ARI Manager starting...") # Initial load of configurations await self._refresh_connections() # Start periodic config refresh while self._running: await asyncio.sleep(self._config_refresh_interval) if self._running: await self._refresh_connections() async def stop(self): """Stop all connections and clean up.""" self._running = False logger.info("ARI Manager stopping...") # Stop all connections for conn in self._connections.values(): await conn.stop() self._connections.clear() logger.info("ARI Manager stopped") async def _refresh_connections(self): """ Refresh connections based on current database configurations. - Starts new connections for new ARI configurations - Stops connections for removed configurations - Restarts connections if configuration changed """ try: active_configs = await self._load_ari_configs() except Exception as e: logger.error(f"Failed to load ARI configurations: {e}") return active_keys: Set[str] = set() for config in active_configs: org_id = config["organization_id"] telephony_configuration_id = config["telephony_configuration_id"] ari_endpoint = config["ari_endpoint"] app_name = config["app_name"] app_password = config["app_password"] ws_client_name = config["ws_client_name"] conn = ARIConnection( org_id, telephony_configuration_id, ari_endpoint, app_name, app_password, ws_client_name, ) key = conn.connection_key active_keys.add(key) if key not in self._connections: # New configuration - start connection logger.info( f"[ARI Manager] New ARI config {telephony_configuration_id} " f"for org {org_id}: {ari_endpoint}" ) self._connections[key] = conn await conn.start() else: # Existing configuration — reconnect if connection-level fields # (endpoint, app, password, ws client) changed. Workflow IDs are # resolved per-call via telephony_phone_numbers, so changes to # them don't require a reconnect. existing = self._connections[key] if ( existing.ari_endpoint != conn.ari_endpoint or existing.app_name != app_name or existing.app_password != app_password or existing.ws_client_name != ws_client_name ): logger.info( f"[ARI Manager] Config {telephony_configuration_id} " f"changed for org {org_id}, reconnecting..." ) await existing.stop() self._connections[key] = conn await conn.start() # Stop connections for removed configurations removed_keys = set(self._connections.keys()) - active_keys for key in removed_keys: conn = self._connections.pop(key) logger.info( f"[ARI Manager] Removing connection for org {conn.organization_id}" ) await conn.stop() if active_configs: logger.info( f"[ARI Manager] Active connections: {len(self._connections)} " f"(configs: {[c['telephony_configuration_id'] for c in active_configs]})" ) else: logger.debug("[ARI Manager] No ARI configurations found") async def _load_ari_configs(self) -> list: """Load all ARI telephony configurations from the multi-config tables.""" rows = await db_client.list_all_telephony_configurations_by_provider("ari") configs = [] for row in rows: credentials = row.credentials or {} ari_endpoint = credentials.get("ari_endpoint") app_name = credentials.get("app_name") app_password = credentials.get("app_password") ws_client_name = credentials.get("ws_client_name", "") if not all([ari_endpoint, app_name, app_password]): logger.warning( f"[ARI Manager] Incomplete ARI config {row.id} " f"for org {row.organization_id}, skipping" ) continue if not ws_client_name: logger.warning( f"[ARI Manager] Missing ws_client_name for config {row.id} " f"(org {row.organization_id}), externalMedia WebSocket won't work" ) configs.append( { "organization_id": row.organization_id, "telephony_configuration_id": row.id, "ari_endpoint": ari_endpoint, "app_name": app_name, "app_password": app_password, "ws_client_name": ws_client_name, } ) return configs async def main(): """Entry point for the ARI manager process.""" manager = ARIManager() # Handle graceful shutdown loop = asyncio.get_running_loop() shutdown_event = asyncio.Event() def signal_handler(): logger.info("Received shutdown signal") shutdown_event.set() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler) # Start manager in background manager_task = asyncio.create_task(manager.start()) # Wait for shutdown signal await shutdown_event.wait() # Clean up await manager.stop() manager_task.cancel() try: await manager_task except asyncio.CancelledError: pass logger.info("ARI Manager exited cleanly") if __name__ == "__main__": asyncio.run(main())