mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
fix: replace in-memory dict with redis storage
This commit is contained in:
parent
2849e48144
commit
997b0879a1
1 changed files with 36 additions and 13 deletions
|
|
@ -36,7 +36,9 @@ from api.services.telephony.transfer_event_protocol import (
|
|||
# Redis key pattern and TTL for channel-to-run mapping
|
||||
_CHANNEL_KEY_PREFIX = "ari:channel:"
|
||||
_EXT_CHANNEL_KEY_PREFIX = "ari:ext_channel:"
|
||||
_PENDING_BRIDGE_PREFIX = "ari:pending_bridge:"
|
||||
_CHANNEL_KEY_TTL = 3600 # 1 hour safety expiry
|
||||
_PENDING_BRIDGE_TTL = 300 # 5 min safety expiry for bridge-pending state
|
||||
|
||||
|
||||
class ARIConnection:
|
||||
|
|
@ -71,11 +73,6 @@ class ARIConnection:
|
|||
# Transfer manager for handling call transfers
|
||||
self._call_transfer_manager = None
|
||||
|
||||
# Bridge work waiting for the external media channel to enter Stasis.
|
||||
# Keyed by ext_channel_id; each value carries the caller channel id
|
||||
# and workflow run id needed to complete the bridge.
|
||||
self._pending_bridges: Dict[str, dict] = {}
|
||||
|
||||
async def _get_redis(self) -> aioredis.Redis:
|
||||
"""Get Redis client instance (lazy init)."""
|
||||
if not self._redis_client:
|
||||
|
|
@ -127,6 +124,33 @@ class ARIConnection:
|
|||
r = await self._get_redis()
|
||||
await r.delete(f"{_EXT_CHANNEL_KEY_PREFIX}{channel_id}")
|
||||
|
||||
async def _set_pending_bridge(
|
||||
self,
|
||||
ext_channel_id: str,
|
||||
caller_channel_id: str,
|
||||
workflow_run_id: str,
|
||||
):
|
||||
"""Store the bridge context to be consumed when ext media enters Stasis."""
|
||||
r = await self._get_redis()
|
||||
await r.set(
|
||||
f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}",
|
||||
json.dumps(
|
||||
{
|
||||
"caller_channel_id": caller_channel_id,
|
||||
"workflow_run_id": workflow_run_id,
|
||||
}
|
||||
),
|
||||
ex=_PENDING_BRIDGE_TTL,
|
||||
)
|
||||
|
||||
async def _pop_pending_bridge(self, ext_channel_id: str) -> Optional[dict]:
|
||||
"""Read and delete the pending bridge context. Returns None if absent."""
|
||||
r = await self._get_redis()
|
||||
val = await r.getdel(f"{_PENDING_BRIDGE_PREFIX}{ext_channel_id}")
|
||||
if val is None:
|
||||
return None
|
||||
return json.loads(val)
|
||||
|
||||
@property
|
||||
def ws_url(self) -> str:
|
||||
"""Build the ARI WebSocket URL."""
|
||||
|
|
@ -260,7 +284,7 @@ class ARIConnection:
|
|||
# queued bridge for it, finish bridging now; otherwise the
|
||||
# caller-side handler did not register one and this event is
|
||||
# nothing for us to act on.
|
||||
pending = self._pending_bridges.pop(channel_id, None)
|
||||
pending = await self._pop_pending_bridge(channel_id)
|
||||
if pending is None:
|
||||
logger.debug(
|
||||
f"[ARI org={self.organization_id}] StasisStart for ext "
|
||||
|
|
@ -632,10 +656,9 @@ class ARIConnection:
|
|||
# pending bridge entry regardless of ordering.
|
||||
await self._mark_ext_channel(ext_channel_id)
|
||||
await self._set_channel_run(ext_channel_id, workflow_run_id)
|
||||
self._pending_bridges[ext_channel_id] = {
|
||||
"caller_channel_id": channel_id,
|
||||
"workflow_run_id": workflow_run_id,
|
||||
}
|
||||
await self._set_pending_bridge(
|
||||
ext_channel_id, channel_id, workflow_run_id
|
||||
)
|
||||
await db_client.update_workflow_run(
|
||||
run_id=int(workflow_run_id),
|
||||
gathered_context={"ext_channel_id": ext_channel_id},
|
||||
|
|
@ -649,7 +672,7 @@ class ARIConnection:
|
|||
channel_id=ext_channel_id,
|
||||
)
|
||||
if not created_id:
|
||||
self._pending_bridges.pop(ext_channel_id, None)
|
||||
await self._pop_pending_bridge(ext_channel_id)
|
||||
logger.error(
|
||||
f"[ARI org={self.organization_id}] Failed to create external "
|
||||
f"media for {channel_id} (ext_channel_id={ext_channel_id})"
|
||||
|
|
@ -658,7 +681,7 @@ class ARIConnection:
|
|||
if created_id != ext_channel_id:
|
||||
# Asterisk ignored our channelId — pending state is stale and
|
||||
# will never be consumed. Clear it and surface loudly.
|
||||
self._pending_bridges.pop(ext_channel_id, None)
|
||||
await self._pop_pending_bridge(ext_channel_id)
|
||||
logger.error(
|
||||
f"[ARI org={self.organization_id}] Asterisk returned channel "
|
||||
f"id {created_id} but we requested {ext_channel_id}; "
|
||||
|
|
@ -672,7 +695,7 @@ class ARIConnection:
|
|||
f"waiting for ext channel StasisStart"
|
||||
)
|
||||
except Exception as e:
|
||||
self._pending_bridges.pop(ext_channel_id, None)
|
||||
await self._pop_pending_bridge(ext_channel_id)
|
||||
logger.error(
|
||||
f"[ARI org={self.organization_id}] Error handling StasisStart "
|
||||
f"for channel {channel_id}: {e}"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue