From ebeffdbc40fb95c56ebf4446142fc0d4fc558f24 Mon Sep 17 00:00:00 2001 From: Sabiha Khan <87858386+chewwbaka@users.noreply.github.com> Date: Wed, 13 May 2026 18:33:34 +0530 Subject: [PATCH] =?UTF-8?q?fix(ari):=20pre-register=20ext=20channel=20id?= =?UTF-8?q?=20and=20defer=20bridge=20to=20its=20StasisS=E2=80=A6=20(#284)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(ari): pre-register ext channel id and defer bridge to its StasisStart Two race conditions in the inbound ARI flow could leave a call silent: 1. Bridging both channels immediately after creating the ext media leg raced against the ext channel entering the Stasis application; slow chan_websocket handshakes produced "Channel not in Stasis application" 422 errors on addChannel. 2. Asterisk could fire StasisStart for the ext channel before the externalMedia POST response returned, so _is_ext_channel returned False and the event was dropped as an unknown outbound call. Fixes: - Generate the ext channel id as dograh-ext- client-side and pass it to Asterisk via the channelId query param. Mark the ext channel, set its channel->run mapping, register the pending bridge entry, and persist gathered_context.ext_channel_id all before the POST. - Defer the bridge to a new _complete_bridge_after_ext_ready handler triggered by the ext channel's own StasisStart. Both channels are guaranteed in Stasis by then, so addChannel cannot 422. - On POST failure or channelId mismatch, roll back the pending entry and ERROR loudly. * fix: replace in-memory dict with redis storage --- api/services/telephony/ari_manager.py | 180 ++++++++++++++++++++------ 1 file changed, 144 insertions(+), 36 deletions(-) diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index 688e583..f24b64e 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -14,6 +14,7 @@ setup_logging() import asyncio import json import signal +import uuid from typing import Dict, Optional, Set from urllib.parse import urlparse @@ -35,7 +36,9 @@ from api.services.telephony.transfer_event_protocol import ( # Redis key pattern and TTL for channel-to-run mapping _CHANNEL_KEY_PREFIX = "ari:channel:" _EXT_CHANNEL_KEY_PREFIX = "ari:ext_channel:" +_PENDING_BRIDGE_PREFIX = "ari:pending_bridge:" _CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry +_PENDING_BRIDGE_TTL = 300 # 5 min safety expiry for bridge-pending state class ARIConnection: @@ -121,6 +124,33 @@ class ARIConnection: r = await self._get_redis() await r.delete(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}") + async def _set_pending_bridge( + self, + ext_channel_id: str, + caller_channel_id: str, + workflow_run_id: str, + ): + """Store the bridge context to be consumed when ext media enters Stasis.""" + r = await self._get_redis() + await r.set( + f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}", + json.dumps( + { + "caller_channel_id": caller_channel_id, + "workflow_run_id": workflow_run_id, + } + ), + ex=_PENDING_BRIDGE_TTL, + ) + + async def _pop_pending_bridge(self, ext_channel_id: str) -> Optional[dict]: + """Read and delete the pending bridge context. Returns None if absent.""" + r = await self._get_redis() + val = await r.getdel(f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}") + if val is None: + return None + return json.loads(val) + @property def ws_url(self) -> str: """Build the ARI WebSocket URL.""" @@ -249,12 +279,25 @@ class ARIConnection: ) if event_type == "StasisStart": - # Skip external media channels we created — they fire - # their own StasisStart but need no further handling. if await self._is_ext_channel(channel_id): - logger.debug( - f"[ARI org={self.organization_id}] StasisStart for our " - f"externalMedia channel {channel_id}, ignoring" + # External media channel has entered Stasis. If there is a + # queued bridge for it, finish bridging now; otherwise the + # caller-side handler did not register one and this event is + # nothing for us to act on. + pending = await self._pop_pending_bridge(channel_id) + if pending is None: + logger.debug( + f"[ARI org={self.organization_id}] StasisStart for ext " + f"channel {channel_id} with no pending bridge" + ) + return + logger.info( + f"[ARI org={self.organization_id}] Ext channel {channel_id} " + f"entered Stasis — completing bridge for caller " + f"{pending['caller_channel_id']} (run {pending['workflow_run_id']})" + ) + asyncio.create_task( + self._complete_bridge_after_ext_ready(channel_id, pending) ) return @@ -394,12 +437,18 @@ class ARIConnection: workflow_id: str, user_id: str, workflow_run_id: str, + channel_id: Optional[str] = None, ) -> str: """Create an external media channel via chan_websocket. Uses ARI externalMedia with transport=websocket so Asterisk connects to our backend over WebSocket (via websocket_client.conf). Dynamic routing params are passed as URI query params via v() in transport_data. + + If ``channel_id`` is provided, it is passed to Asterisk as the + ``channelId`` query parameter so the new channel is created with + that id. The caller can then register ext-channel state ahead of + the POST and avoid racing against the StasisStart event. """ # v() appends URI query params to the websocket_client.conf URL # e.g. wss://api.dograh.com/ws/ari?workflow_id=1&user_id=2&workflow_run_id=3 @@ -409,22 +458,25 @@ class ARIConnection: f"workflow_run_id={workflow_run_id})" ) + params = { + "app": self.app_name, + "external_host": self.ws_client_name, + "format": "ulaw", + "transport": "websocket", + "encapsulation": "none", + "connection_type": "client", + "direction": "both", + "transport_data": transport_data, + } + if channel_id: + params["channelId"] = channel_id + result = await self._ari_request( - "POST", - "/channels/externalMedia", - params={ - "app": self.app_name, - "external_host": self.ws_client_name, - "format": "ulaw", - "transport": "websocket", - "encapsulation": "none", - "connection_type": "client", - "direction": "both", - "transport_data": transport_data, - }, + "POST", "/channels/externalMedia", params=params ) ext_channel_id = result.get("id", "") if ext_channel_id: + # Idempotent — caller may have already marked it before the POST. await self._mark_ext_channel(ext_channel_id) logger.info( f"[ARI org={self.organization_id}] Created external media channel: {ext_channel_id}" @@ -579,42 +631,98 @@ class ARIConnection: workflow_id: str, user_id: str, ): - """Handle StasisStart by creating external media and bridging.""" + """Set up external media for a caller channel that has entered Stasis. + + Creates the external media channel via chan_websocket and registers + a pending bridge entry keyed by its channel id. The bridge itself is + created in :meth:`_complete_bridge_after_ext_ready` once the external + media channel has entered Stasis (its own StasisStart event). + """ + ext_channel_id = f"dograh-ext-{uuid.uuid4()}" try: logger.info( f"[ARI org={self.organization_id}] Setting up external media for " - f"channel {channel_id} via ws_client={self.ws_client_name}" + f"channel {channel_id} via ws_client={self.ws_client_name} " + f"(ext_channel_id={ext_channel_id})" ) - # 1. Track channel for StasisEnd cleanup (Redis) + # 1. Track caller channel for StasisEnd cleanup (Redis). await self._set_channel_run(channel_id, workflow_run_id) - # 2. Create external media channel via chan_websocket - # Asterisk connects to our backend using websocket_client.conf config, - # with routing params appended as URI query params via v() - ext_channel_id = await self._create_external_media( - workflow_id, user_id, workflow_run_id + # 2. Pre-register all ext-channel state synchronously, before the + # externalMedia POST is sent. Asterisk can fire StasisStart for + # the ext channel before the POST response returns; registering + # here guarantees that event handler finds the marker and the + # pending bridge entry regardless of ordering. + await self._mark_ext_channel(ext_channel_id) + await self._set_channel_run(ext_channel_id, workflow_run_id) + await self._set_pending_bridge( + ext_channel_id, channel_id, workflow_run_id ) - if not ext_channel_id: + await db_client.update_workflow_run( + run_id=int(workflow_run_id), + gathered_context={"ext_channel_id": ext_channel_id}, + ) + + # 3. Create the ext media channel with the id we just registered. + created_id = await self._create_external_media( + workflow_id, + user_id, + workflow_run_id, + channel_id=ext_channel_id, + ) + if not created_id: + await self._pop_pending_bridge(ext_channel_id) logger.error( - f"[ARI org={self.organization_id}] Failed to create external media for {channel_id}" + f"[ARI org={self.organization_id}] Failed to create external " + f"media for {channel_id} (ext_channel_id={ext_channel_id})" + ) + return + if created_id != ext_channel_id: + # Asterisk ignored our channelId — pending state is stale and + # will never be consumed. Clear it and surface loudly. + await self._pop_pending_bridge(ext_channel_id) + logger.error( + f"[ARI org={self.organization_id}] Asterisk returned channel " + f"id {created_id} but we requested {ext_channel_id}; " + f"channelId may not be honored on this ARI version" ) return - # 3. Track ext channel for StasisEnd cleanup (Redis) - await self._set_channel_run(ext_channel_id, workflow_run_id) + logger.info( + f"[ARI org={self.organization_id}] Queued bridge for caller " + f"{channel_id} <-> ext {ext_channel_id} (run {workflow_run_id}); " + f"waiting for ext channel StasisStart" + ) + except Exception as e: + await self._pop_pending_bridge(ext_channel_id) + logger.error( + f"[ARI org={self.organization_id}] Error handling StasisStart " + f"for channel {channel_id}: {e}" + ) - # 4. Bridge the call channel with the external media channel + async def _complete_bridge_after_ext_ready( + self, ext_channel_id: str, pending: dict + ): + """Bridge the caller and external media channels for a queued entry. + + Invoked from the external media channel's StasisStart handler with + the pending entry that :meth:`_handle_stasis_start` registered. + Both channels are in the Stasis application at this point, so the + bridge and addChannel calls can succeed. + """ + caller_channel_id = pending["caller_channel_id"] + workflow_run_id = pending["workflow_run_id"] + try: bridge_id = await self._create_bridge_and_add_channels( - [channel_id, ext_channel_id] + [caller_channel_id, ext_channel_id] ) if not bridge_id: logger.error( - f"[ARI org={self.organization_id}] Failed to bridge channels" + f"[ARI org={self.organization_id}] Failed to bridge " + f"channels {caller_channel_id} <-> {ext_channel_id}" ) return - - # 5. Store ARI resource IDs in gathered_context for cleanup/debugging await db_client.update_workflow_run( run_id=int(workflow_run_id), gathered_context={ @@ -624,8 +732,8 @@ class ARIConnection: ) except Exception as e: logger.error( - f"[ARI org={self.organization_id}] Error handling StasisStart " - f"for channel {channel_id}: {e}" + f"[ARI org={self.organization_id}] Error completing bridge for " + f"caller {caller_channel_id} / ext {ext_channel_id}: {e}" ) async def _handle_stasis_end(self, channel_id: str, workflow_run_id: str):