chore: refactor code

This commit is contained in:
Sabiha Khan 2026-02-23 16:42:00 +05:30
parent 000c648e7e
commit e1d8b52b42
3 changed files with 72 additions and 102 deletions

View file

@ -68,7 +68,7 @@ class ARIConnection:
# Redis client for channel-to-run reverse mapping (lazy init)
self._redis_client: Optional[aioredis.Redis] = None
# Transfer manager for handling call transfers (lazy init)
# Transfer manager for handling call transfers
self._call_transfer_manager = None
async def _get_redis(self) -> aioredis.Redis:
@ -80,7 +80,7 @@ class ARIConnection:
return self._redis_client
async def _get_transfer_manager(self):
"""Get transfer manager instance (lazy init)."""
"""Get transfer manager instance."""
if not self._call_transfer_manager:
self._call_transfer_manager = await get_call_transfer_manager()
return self._call_transfer_manager
@ -244,7 +244,7 @@ class ARIConnection:
channel_id = channel.get("id", "unknown")
channel_state = channel.get("state", "unknown")
# Log all events for each channel for debugging
# Log all events for each channel
logger.debug(
f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}"
)
@ -275,20 +275,20 @@ class ARIConnection:
)
else:
# Outbound call (state == "Up") — originated by us
# Check if this is a transfer channel first
if self._is_transfer_channel(app_args):
transfer_id = self._extract_transfer_id(app_args)
if transfer_id:
logger.info(
f"[ARI org={self.organization_id}] Transfer destination answered: "
f"channel={channel_id}, transfer_id={transfer_id}"
)
asyncio.create_task(
self._handle_transfer_answered(transfer_id, channel_id)
)
return
# Check if this is a transfer destination channel (app_args starts with "transfer")
# Transfer destinations run externally - we only track status to publish transfer event, not run our pipeline
transfer_id = self._get_transfer_id(app_args)
if transfer_id:
logger.info(
f"[ARI org={self.organization_id}] Transfer destination answered: "
f"channel={channel_id}, transfer_id={transfer_id}"
)
asyncio.create_task(
self._handle_transfer_answered(transfer_id, channel_id)
)
return
# Regular outbound call - parse args to extract workflow context
# Parse args to extract workflow context
args_dict = {}
for arg in app_args:
for pair in arg.split(","):
@ -319,9 +319,6 @@ class ARIConnection:
f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}"
)
# Check if this is a caller hangup during transfer
# await self._handle_caller_hangup_during_transfer(channel_id) TODO: handle when caller ends call after transfer initiation
workflow_run_id = await self._get_channel_run(channel_id)
if workflow_run_id:
asyncio.create_task(
@ -632,7 +629,8 @@ class ARIConnection:
bridge_id = ctx.get("bridge_id")
transfer_state = ctx.get("transfer_state")
# Check if this is transfer-protected external channel
# Check if this is transfer-protected external channe. Skip full teardown if transfer is in progress and this is the external media channel
# During call transfer, we preserve the caller-destination bridge
if (
transfer_state == "in-progress"
and channel_id == ext_channel_id
@ -649,7 +647,7 @@ class ARIConnection:
run_id=int(workflow_run_id), gathered_context=ctx
)
# Clean up only Redis markers for external channel (partial cleanup)
# Clean up only Redis markers for external channel
await self._delete_channel_run(channel_id)
await self._delete_ext_channel(channel_id)
@ -727,25 +725,15 @@ class ARIConnection:
else:
return f"Transfer failed: {cause_txt}"
def _is_transfer_channel(self, app_args: list) -> bool:
"""Check if appArgs indicate this is a transfer channel."""
if not app_args:
return False
# Check if first arg is "transfer" (args are parsed as separate list items)
is_transfer = len(app_args) > 0 and app_args[0] == "transfer"
if is_transfer:
logger.debug(
f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}"
)
return is_transfer
def _get_transfer_id(self, app_args: list) -> Optional[str]:
"""Get transfer_id if this is a transfer channel, None otherwise.
def _extract_transfer_id(self, app_args: list) -> Optional[str]:
"""Extract transfer_id from appArgs: ['transfer', '{transfer_id}', '{conf_name}']."""
# Args are parsed as separate list items, so transfer_id is at index 1
Args format: ['transfer', '{transfer_id}', '{conf_name}']
"""
if len(app_args) > 1 and app_args[0] == "transfer":
transfer_id = app_args[1]
logger.debug(
f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}"
f"[ARI org={self.organization_id}] Detected transfer channel with transfer_id: {transfer_id}"
)
return transfer_id
return None
@ -765,18 +753,6 @@ class ARIConnection:
)
return None
async def _store_transfer_channel_mapping(self, channel_id: str, transfer_id: str):
"""Store channel->transfer mapping in Redis for event correlation."""
try:
r = await self._get_redis()
await r.setex(
f"ari:transfer_channel:{channel_id}", 300, transfer_id
) # 5 minute TTL
except Exception as e:
logger.error(
f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}"
)
async def _handle_transfer_answered(
self, transfer_id: str, destination_channel_id: str
):
@ -787,13 +763,11 @@ class ARIConnection:
f"answered for transfer {transfer_id}"
)
# Store channel mapping for potential future events
await self._store_transfer_channel_mapping(
# Store channel mapping for potential future events and get transfer context
transfer_manager = await self._get_transfer_manager()
await transfer_manager.store_transfer_channel_mapping(
destination_channel_id, transfer_id
)
# Get transfer context
transfer_manager = await self._get_transfer_manager()
context = await transfer_manager.get_transfer_context(transfer_id)
if not context:
logger.error(

View file

@ -415,13 +415,10 @@ class ARIProvider(TelephonyProvider):
sip_endpoint = f"PJSIP/{destination}"
# Build transfer appArgs for event correlation
app_args = f"transfer,{transfer_id},{conference_name}"
app_args = f"transfer,{transfer_id}"
try:
# Build endpoint URL following existing pattern
endpoint = f"{self.base_url}/channels"
# Prepare channel creation params following existing pattern
params = {
"endpoint": sip_endpoint,
"app": self.app_name,
@ -429,7 +426,6 @@ class ARIProvider(TelephonyProvider):
"timeout": timeout, # Keep timeout for transfer calls
}
# Originate destination channel using existing pattern
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
@ -460,7 +456,7 @@ class ARIProvider(TelephonyProvider):
context, ttl=timeout + 10
)
# Store transfer channel mapping for event correlation (works with any dialplan setup)
# Store transfer channel mapping for event correlation
await call_transfer_manager.store_transfer_channel_mapping(
destination_channel_id, transfer_id
)
@ -486,7 +482,7 @@ class ARIProvider(TelephonyProvider):
async def hangup_channel(self, channel_id: str, reason: str = "normal") -> bool:
"""Hang up an ARI channel."""
endpwoint = f"{self.base_url}/channels/{channel_id}"
endpoint = f"{self.base_url}/channels/{channel_id}"
params = {"reason_code": reason}
try:

View file

@ -74,27 +74,27 @@ services:
networks:
- app-network
# api:
# image: ${REGISTRY:-dograhai}/dograh-api:latest
# volumes:
# - shared-tmp:/tmp
# environment:
# # Core application config
# ENVIRONMENT: "local"
# LOG_LEVEL: "INFO"
api:
image: ${REGISTRY:-dograhai}/dograh-api:latest
volumes:
- shared-tmp:/tmp
environment:
# Core application config
ENVIRONMENT: "local"
LOG_LEVEL: "INFO"
# # Replace this environment variable if you are using a custom
# # domain to host the stack
# BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
# Replace this environment variable if you are using a custom
# domain to host the stack
BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
# # Database configuration (using containerized postgres)
# DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
# Database configuration (using containerized postgres)
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
# # Redis configuration (using containerized redis)
# REDIS_URL: "redis://:redissecret@redis:6379"
# Redis configuration (using containerized redis)
REDIS_URL: "redis://:redissecret@redis:6379"
# # Storage configuration - using local MinIO
# ENABLE_AWS_S3: "false"
# Storage configuration - using local MinIO
ENABLE_AWS_S3: "false"
# MinIO
MINIO_ENDPOINT: "minio:9000"
@ -112,10 +112,10 @@ services:
# LANGFUSE_PUBLIC_KEY: ""
# LANGFUSE_HOST: ""
# # TURN server configuration (for WebRTC NAT traversal in remote server)
# # Uses time-limited credentials via TURN REST API (HMAC-SHA1)
# TURN_HOST: "${TURN_HOST:-}"
# TURN_SECRET: "${TURN_SECRET:-}"
# TURN server configuration (for WebRTC NAT traversal in remote server)
# Uses time-limited credentials via TURN REST API (HMAC-SHA1)
TURN_HOST: "${TURN_HOST:-}"
TURN_SECRET: "${TURN_SECRET:-}"
OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}"
@ -182,33 +182,33 @@ services:
networks:
- app-network
# coturn:
# image: coturn/coturn:4.8.0
# container_name: coturn
# restart: unless-stopped
# profiles: ["remote"]
# ports:
# - "3478:3478/udp"
# - "3478:3478/tcp"
# - "5349:5349/udp"
# - "5349:5349/tcp"
# - "49152-49200:49152-49200/udp"
# volumes:
# - ./turnserver.conf:/etc/coturn/turnserver.conf:ro
# command:
# - -c
# - /etc/coturn/turnserver.conf
# networks:
# - app-network
coturn:
image: coturn/coturn:4.8.0
container_name: coturn
restart: unless-stopped
profiles: ["remote"]
ports:
- "3478:3478/udp"
- "3478:3478/tcp"
- "5349:5349/udp"
- "5349:5349/tcp"
- "49152-49200:49152-49200/udp"
volumes:
- ./turnserver.conf:/etc/coturn/turnserver.conf:ro
command:
- -c
- /etc/coturn/turnserver.conf
networks:
- app-network
volumes:
postgres_data:
redis_data:
minio-data:
driver: local
# shared-tmp:
# driver: local
shared-tmp:
driver: local
networks:
app-network:
driver: bridge
driver: bridge