diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index c8cf871..f24b64e 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -36,7 +36,9 @@ from api.services.telephony.transfer_event_protocol import ( # Redis key pattern and TTL for channel-to-run mapping _CHANNEL_KEY_PREFIX = "ari:channel:" _EXT_CHANNEL_KEY_PREFIX = "ari:ext_channel:" +_PENDING_BRIDGE_PREFIX = "ari:pending_bridge:" _CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry +_PENDING_BRIDGE_TTL = 300 # 5 min safety expiry for bridge-pending state class ARIConnection: @@ -71,11 +73,6 @@ class ARIConnection: # Transfer manager for handling call transfers self._call_transfer_manager = None - # Bridge work waiting for the external media channel to enter Stasis. - # Keyed by ext_channel_id; each value carries the caller channel id - # and workflow run id needed to complete the bridge. - self._pending_bridges: Dict[str, dict] = {} - async def _get_redis(self) -> aioredis.Redis: """Get Redis client instance (lazy init).""" if not self._redis_client: @@ -127,6 +124,33 @@ class ARIConnection: r = await self._get_redis() await r.delete(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}") + async def _set_pending_bridge( + self, + ext_channel_id: str, + caller_channel_id: str, + workflow_run_id: str, + ): + """Store the bridge context to be consumed when ext media enters Stasis.""" + r = await self._get_redis() + await r.set( + f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}", + json.dumps( + { + "caller_channel_id": caller_channel_id, + "workflow_run_id": workflow_run_id, + } + ), + ex=_PENDING_BRIDGE_TTL, + ) + + async def _pop_pending_bridge(self, ext_channel_id: str) -> Optional[dict]: + """Read and delete the pending bridge context. Returns None if absent.""" + r = await self._get_redis() + val = await r.getdel(f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}") + if val is None: + return None + return json.loads(val) + @property def ws_url(self) -> str: """Build the ARI WebSocket URL.""" @@ -260,7 +284,7 @@ class ARIConnection: # queued bridge for it, finish bridging now; otherwise the # caller-side handler did not register one and this event is # nothing for us to act on. - pending = self._pending_bridges.pop(channel_id, None) + pending = await self._pop_pending_bridge(channel_id) if pending is None: logger.debug( f"[ARI org={self.organization_id}] StasisStart for ext " @@ -632,10 +656,9 @@ class ARIConnection: # pending bridge entry regardless of ordering. await self._mark_ext_channel(ext_channel_id) await self._set_channel_run(ext_channel_id, workflow_run_id) - self._pending_bridges[ext_channel_id] = { - "caller_channel_id": channel_id, - "workflow_run_id": workflow_run_id, - } + await self._set_pending_bridge( + ext_channel_id, channel_id, workflow_run_id + ) await db_client.update_workflow_run( run_id=int(workflow_run_id), gathered_context={"ext_channel_id": ext_channel_id}, @@ -649,7 +672,7 @@ class ARIConnection: channel_id=ext_channel_id, ) if not created_id: - self._pending_bridges.pop(ext_channel_id, None) + await self._pop_pending_bridge(ext_channel_id) logger.error( f"[ARI org={self.organization_id}] Failed to create external " f"media for {channel_id} (ext_channel_id={ext_channel_id})" @@ -658,7 +681,7 @@ class ARIConnection: if created_id != ext_channel_id: # Asterisk ignored our channelId — pending state is stale and # will never be consumed. Clear it and surface loudly. - self._pending_bridges.pop(ext_channel_id, None) + await self._pop_pending_bridge(ext_channel_id) logger.error( f"[ARI org={self.organization_id}] Asterisk returned channel " f"id {created_id} but we requested {ext_channel_id}; " @@ -672,7 +695,7 @@ class ARIConnection: f"waiting for ext channel StasisStart" ) except Exception as e: - self._pending_bridges.pop(ext_channel_id, None) + await self._pop_pending_bridge(ext_channel_id) logger.error( f"[ARI org={self.organization_id}] Error handling StasisStart " f"for channel {channel_id}: {e}"