From e1d8b52b4297c9738b11dc3957027ea1c277e965 Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Mon, 23 Feb 2026 16:42:00 +0530 Subject: [PATCH] chore: refactor code --- api/services/telephony/ari_manager.py | 80 ++++++------------ .../telephony/providers/ari_provider.py | 10 +-- docker-compose.yaml | 84 +++++++++---------- 3 files changed, 72 insertions(+), 102 deletions(-) diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index f047021..4da16a6 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -68,7 +68,7 @@ class ARIConnection: # Redis client for channel-to-run reverse mapping (lazy init) self._redis_client: Optional[aioredis.Redis] = None - # Transfer manager for handling call transfers (lazy init) + # Transfer manager for handling call transfers self._call_transfer_manager = None async def _get_redis(self) -> aioredis.Redis: @@ -80,7 +80,7 @@ class ARIConnection: return self._redis_client async def _get_transfer_manager(self): - """Get transfer manager instance (lazy init).""" + """Get transfer manager instance.""" if not self._call_transfer_manager: self._call_transfer_manager = await get_call_transfer_manager() return self._call_transfer_manager @@ -244,7 +244,7 @@ class ARIConnection: channel_id = channel.get("id", "unknown") channel_state = channel.get("state", "unknown") - # Log all events for each channel for debugging + # Log all events for each channel logger.debug( f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}" ) @@ -275,20 +275,20 @@ class ARIConnection: ) else: # Outbound call (state == "Up") — originated by us - # Check if this is a transfer channel first - if self._is_transfer_channel(app_args): - transfer_id = self._extract_transfer_id(app_args) - if transfer_id: - logger.info( - f"[ARI org={self.organization_id}] Transfer destination answered: " - f"channel={channel_id}, transfer_id={transfer_id}" - ) - asyncio.create_task( - self._handle_transfer_answered(transfer_id, channel_id) - ) - return + # Check if this is a transfer destination channel (app_args starts with "transfer") + # Transfer destinations run externally - we only track status to publish transfer event, not run our pipeline + transfer_id = self._get_transfer_id(app_args) + if transfer_id: + logger.info( + f"[ARI org={self.organization_id}] Transfer destination answered: " + f"channel={channel_id}, transfer_id={transfer_id}" + ) + asyncio.create_task( + self._handle_transfer_answered(transfer_id, channel_id) + ) + return - # Regular outbound call - parse args to extract workflow context + # Parse args to extract workflow context args_dict = {} for arg in app_args: for pair in arg.split(","): @@ -319,9 +319,6 @@ class ARIConnection: f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}" ) - # Check if this is a caller hangup during transfer - # await self._handle_caller_hangup_during_transfer(channel_id) TODO: handle when caller ends call after transfer initiation - workflow_run_id = await self._get_channel_run(channel_id) if workflow_run_id: asyncio.create_task( @@ -632,7 +629,8 @@ class ARIConnection: bridge_id = ctx.get("bridge_id") transfer_state = ctx.get("transfer_state") - # Check if this is transfer-protected external channel + # Check if this is transfer-protected external channe. Skip full teardown if transfer is in progress and this is the external media channel + # During call transfer, we preserve the caller-destination bridge if ( transfer_state == "in-progress" and channel_id == ext_channel_id @@ -649,7 +647,7 @@ class ARIConnection: run_id=int(workflow_run_id), gathered_context=ctx ) - # Clean up only Redis markers for external channel (partial cleanup) + # Clean up only Redis markers for external channel await self._delete_channel_run(channel_id) await self._delete_ext_channel(channel_id) @@ -727,25 +725,15 @@ class ARIConnection: else: return f"Transfer failed: {cause_txt}" - def _is_transfer_channel(self, app_args: list) -> bool: - """Check if appArgs indicate this is a transfer channel.""" - if not app_args: - return False - # Check if first arg is "transfer" (args are parsed as separate list items) - is_transfer = len(app_args) > 0 and app_args[0] == "transfer" - if is_transfer: - logger.debug( - f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}" - ) - return is_transfer + def _get_transfer_id(self, app_args: list) -> Optional[str]: + """Get transfer_id if this is a transfer channel, None otherwise. - def _extract_transfer_id(self, app_args: list) -> Optional[str]: - """Extract transfer_id from appArgs: ['transfer', '{transfer_id}', '{conf_name}'].""" - # Args are parsed as separate list items, so transfer_id is at index 1 + Args format: ['transfer', '{transfer_id}', '{conf_name}'] + """ if len(app_args) > 1 and app_args[0] == "transfer": transfer_id = app_args[1] logger.debug( - f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}" + f"[ARI org={self.organization_id}] Detected transfer channel with transfer_id: {transfer_id}" ) return transfer_id return None @@ -765,18 +753,6 @@ class ARIConnection: ) return None - async def _store_transfer_channel_mapping(self, channel_id: str, transfer_id: str): - """Store channel->transfer mapping in Redis for event correlation.""" - try: - r = await self._get_redis() - await r.setex( - f"ari:transfer_channel:{channel_id}", 300, transfer_id - ) # 5 minute TTL - except Exception as e: - logger.error( - f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}" - ) - async def _handle_transfer_answered( self, transfer_id: str, destination_channel_id: str ): @@ -787,13 +763,11 @@ class ARIConnection: f"answered for transfer {transfer_id}" ) - # Store channel mapping for potential future events - await self._store_transfer_channel_mapping( + # Store channel mapping for potential future events and get transfer context + transfer_manager = await self._get_transfer_manager() + await transfer_manager.store_transfer_channel_mapping( destination_channel_id, transfer_id ) - - # Get transfer context - transfer_manager = await self._get_transfer_manager() context = await transfer_manager.get_transfer_context(transfer_id) if not context: logger.error( diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py index e039644..c692fdc 100644 --- a/api/services/telephony/providers/ari_provider.py +++ b/api/services/telephony/providers/ari_provider.py @@ -415,13 +415,10 @@ class ARIProvider(TelephonyProvider): sip_endpoint = f"PJSIP/{destination}" # Build transfer appArgs for event correlation - app_args = f"transfer,{transfer_id},{conference_name}" + app_args = f"transfer,{transfer_id}" try: - # Build endpoint URL following existing pattern endpoint = f"{self.base_url}/channels" - - # Prepare channel creation params following existing pattern params = { "endpoint": sip_endpoint, "app": self.app_name, @@ -429,7 +426,6 @@ class ARIProvider(TelephonyProvider): "timeout": timeout, # Keep timeout for transfer calls } - # Originate destination channel using existing pattern async with aiohttp.ClientSession() as session: async with session.post( endpoint, @@ -460,7 +456,7 @@ class ARIProvider(TelephonyProvider): context, ttl=timeout + 10 ) - # Store transfer channel mapping for event correlation (works with any dialplan setup) + # Store transfer channel mapping for event correlation await call_transfer_manager.store_transfer_channel_mapping( destination_channel_id, transfer_id ) @@ -486,7 +482,7 @@ class ARIProvider(TelephonyProvider): async def hangup_channel(self, channel_id: str, reason: str = "normal") -> bool: """Hang up an ARI channel.""" - endpwoint = f"{self.base_url}/channels/{channel_id}" + endpoint = f"{self.base_url}/channels/{channel_id}" params = {"reason_code": reason} try: diff --git a/docker-compose.yaml b/docker-compose.yaml index c7a59ef..949b40a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -74,27 +74,27 @@ services: networks: - app-network - # api: - # image: ${REGISTRY:-dograhai}/dograh-api:latest - # volumes: - # - shared-tmp:/tmp - # environment: - # # Core application config - # ENVIRONMENT: "local" - # LOG_LEVEL: "INFO" + api: + image: ${REGISTRY:-dograhai}/dograh-api:latest + volumes: + - shared-tmp:/tmp + environment: + # Core application config + ENVIRONMENT: "local" + LOG_LEVEL: "INFO" - # # Replace this environment variable if you are using a custom - # # domain to host the stack - # BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" + # Replace this environment variable if you are using a custom + # domain to host the stack + BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" - # # Database configuration (using containerized postgres) - # DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" + # Database configuration (using containerized postgres) + DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" - # # Redis configuration (using containerized redis) - # REDIS_URL: "redis://:redissecret@redis:6379" + # Redis configuration (using containerized redis) + REDIS_URL: "redis://:redissecret@redis:6379" - # # Storage configuration - using local MinIO - # ENABLE_AWS_S3: "false" + # Storage configuration - using local MinIO + ENABLE_AWS_S3: "false" # MinIO MINIO_ENDPOINT: "minio:9000" @@ -112,10 +112,10 @@ services: # LANGFUSE_PUBLIC_KEY: "" # LANGFUSE_HOST: "" - # # TURN server configuration (for WebRTC NAT traversal in remote server) - # # Uses time-limited credentials via TURN REST API (HMAC-SHA1) - # TURN_HOST: "${TURN_HOST:-}" - # TURN_SECRET: "${TURN_SECRET:-}" + # TURN server configuration (for WebRTC NAT traversal in remote server) + # Uses time-limited credentials via TURN REST API (HMAC-SHA1) + TURN_HOST: "${TURN_HOST:-}" + TURN_SECRET: "${TURN_SECRET:-}" OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" @@ -182,33 +182,33 @@ services: networks: - app-network - # coturn: - # image: coturn/coturn:4.8.0 - # container_name: coturn - # restart: unless-stopped - # profiles: ["remote"] - # ports: - # - "3478:3478/udp" - # - "3478:3478/tcp" - # - "5349:5349/udp" - # - "5349:5349/tcp" - # - "49152-49200:49152-49200/udp" - # volumes: - # - ./turnserver.conf:/etc/coturn/turnserver.conf:ro - # command: - # - -c - # - /etc/coturn/turnserver.conf - # networks: - # - app-network + coturn: + image: coturn/coturn:4.8.0 + container_name: coturn + restart: unless-stopped + profiles: ["remote"] + ports: + - "3478:3478/udp" + - "3478:3478/tcp" + - "5349:5349/udp" + - "5349:5349/tcp" + - "49152-49200:49152-49200/udp" + volumes: + - ./turnserver.conf:/etc/coturn/turnserver.conf:ro + command: + - -c + - /etc/coturn/turnserver.conf + networks: + - app-network volumes: postgres_data: redis_data: minio-data: driver: local - # shared-tmp: - # driver: local + shared-tmp: + driver: local networks: app-network: - driver: bridge + driver: bridge \ No newline at end of file