From 050c6451215b39cb4a6aacd43e554b7a34af61dd Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Thu, 26 Feb 2026 19:09:37 +0530 Subject: [PATCH] fix: docker compose, add missing files from merge conflicts --- api/services/telephony/ari_manager.py | 7 +- .../providers/ari_call_strategies.py | 253 ++++++++++++++++++ .../telephony/providers/ari_provider.py | 3 +- .../providers/twilio_call_strategies.py | 186 +++++++++++++ .../workflow/pipecat_engine_custom_tools.py | 2 +- docker-compose.yaml | 174 ++++++------ 6 files changed, 531 insertions(+), 94 deletions(-) create mode 100644 api/services/telephony/providers/ari_call_strategies.py create mode 100644 api/services/telephony/providers/twilio_call_strategies.py diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index 19797da..193ed9f 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -276,7 +276,7 @@ class ARIConnection: else: # Outbound call (state == "Up") — originated by us # Check if this is a transfer destination channel (app_args starts with "transfer") - # Transfer destinations run externally - we only track status to publish transfer event, not run our pipeline + # Transfer destinations run externally - we only track status to publish transfer event, not run the pipeline transfer_id = self._get_transfer_id(app_args) if transfer_id: logger.info( @@ -318,7 +318,6 @@ class ARIConnection: logger.info( f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}" ) - workflow_run_id = await self._get_channel_run(channel_id) if workflow_run_id: asyncio.create_task( @@ -629,7 +628,8 @@ class ARIConnection: bridge_id = ctx.get("bridge_id") transfer_state = ctx.get("transfer_state") - # Check if this is transfer-protected external channe. Skip full teardown if transfer is in progress and this is the external media channel + # Check if this is a call transfer scenario external channel. Skip full teardown if + # transfer is in progress and this is the external media channel # During call transfer, we preserve the caller-destination bridge if ( transfer_state == "in-progress" @@ -811,7 +811,6 @@ class ARIConnection: try: logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}") - # Get transfer context transfer_manager = await self._get_transfer_manager() context = await transfer_manager.get_transfer_context(transfer_id) diff --git a/api/services/telephony/providers/ari_call_strategies.py b/api/services/telephony/providers/ari_call_strategies.py new file mode 100644 index 0000000..5986104 --- /dev/null +++ b/api/services/telephony/providers/ari_call_strategies.py @@ -0,0 +1,253 @@ +"""ARI-specific call operation strategies. + +This module contains the business logic for Asterisk ARI call operations. +""" + +from typing import Any, Dict + +from loguru import logger + +from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy + + +class ARIBridgeSwapStrategy(TransferStrategy): + """Implements bridge swap transfer for Asterisk ARI. + + This strategy handles transferring calls by swapping channels in existing + bridges, managing transfer contexts, and publishing + transfer completion events. + """ + + async def execute_transfer(self, context: Dict[str, Any]) -> bool: + """Execute bridge swap transfer for Asterisk ARI.""" + try: + import aiohttp + import redis.asyncio as aioredis + from aiohttp import BasicAuth + + channel_id = context["channel_id"] + ari_endpoint = context["ari_endpoint"] + app_name = context["app_name"] + app_password = context["app_password"] + + if not channel_id or not ari_endpoint: + logger.warning( + "Cannot execute transfer: missing channel_id or ari_endpoint" + ) + return False + + logger.info( + f"[ARI Transfer] Executing bridge swap for channel {channel_id}" + ) + + from api.constants import REDIS_URL + from api.db import db_client + + auth = BasicAuth(app_name, app_password) + + # 1. Find active transfer context for this caller channel + transfer_context = await self._find_transfer_context_for_call(channel_id) + if not transfer_context: + logger.error( + f"[ARI Transfer] No active transfer context found for caller {channel_id}" + ) + return False + + logger.info( + f"[ARI Transfer] Found transfer context: {transfer_context.transfer_id}, " + f"destination: {transfer_context.call_sid}" + ) + + # 2. Get workflow run to find current bridge and external media channel + redis = aioredis.from_url(REDIS_URL, decode_responses=True) + workflow_run_id = await redis.get(f"ari:channel:{channel_id}") + if not workflow_run_id: + logger.error( + f"[ARI Transfer] No workflow run found for caller {channel_id}" + ) + return False + + workflow_run = await db_client.get_workflow_run_by_id(int(workflow_run_id)) + if not workflow_run or not workflow_run.gathered_context: + logger.error( + f"[ARI Transfer] No workflow context found for run {workflow_run_id}" + ) + return False + + ctx = workflow_run.gathered_context + bridge_id = ctx.get("bridge_id") + ext_channel_id = ctx.get("ext_channel_id") + + if not bridge_id or not ext_channel_id: + logger.error( + f"[ARI Transfer] Missing bridge/external channel info: {ctx}" + ) + return False + + destination_channel_id = transfer_context.call_sid + if not destination_channel_id: + logger.error( + f"[ARI Transfer] No destination channel in transfer context" + ) + return False + + logger.info( + f"[ARI Transfer] Bridge swap: bridge={bridge_id}, caller={channel_id}, " + f"destination={destination_channel_id}, ext_media={ext_channel_id}" + ) + + # 3. Set transfer state to prevent StasisEnd auto-teardown + workflow_run.gathered_context["transfer_state"] = "in-progress" + await db_client.update_workflow_run( + run_id=int(workflow_run_id), + gathered_context=workflow_run.gathered_context, + ) + logger.debug( + f"[ARI Transfer] Set transfer_state=in-progress for workflow {workflow_run_id}" + ) + + # 4. Execute bridge swap operations via ARI REST API + async with aiohttp.ClientSession() as session: + # Add destination channel to existing bridge + add_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/addChannel" + async with session.post( + add_url, auth=auth, params={"channel": destination_channel_id} + ) as response: + if response.status in (200, 204): + logger.info( + f"[ARI Transfer] Added destination {destination_channel_id} to bridge {bridge_id}" + ) + else: + error_text = await response.text() + logger.error( + f"[ARI Transfer] Failed to add destination to bridge: {response.status} {error_text}" + ) + return False + + # Remove external media channel from bridge + remove_url = f"{ari_endpoint}/ari/bridges/{bridge_id}/removeChannel" + async with session.post( + remove_url, auth=auth, params={"channel": ext_channel_id} + ) as response: + if response.status in (200, 204): + logger.info( + f"[ARI Transfer] Removed external media {ext_channel_id} from bridge {bridge_id}" + ) + else: + error_text = await response.text() + logger.error( + f"[ARI Transfer] Failed to remove external media from bridge: {response.status} {error_text}" + ) + + # Hang up the external media channel + hangup_url = f"{ari_endpoint}/ari/channels/{ext_channel_id}" + async with session.delete(hangup_url, auth=auth) as response: + if response.status in (200, 204): + logger.info( + f"[ARI Transfer] Hung up external media channel {ext_channel_id}" + ) + elif response.status == 404: + logger.debug( + f"[ARI Transfer] External media channel {ext_channel_id} already gone" + ) + else: + error_text = await response.text() + logger.warning( + f"[ARI Transfer] Failed to hang up external media: {response.status} {error_text}" + ) + + logger.info( + f"[ARI Transfer] Bridge swap completed successfully for transfer {transfer_context.transfer_id}, " + f"caller {channel_id} connected to destination {destination_channel_id} via bridge {bridge_id}" + ) + + # 5. Clean up transfer context after successful completion + from api.services.telephony.call_transfer_manager import ( + get_call_transfer_manager, + ) + + call_transfer_manager = await get_call_transfer_manager() + await call_transfer_manager.remove_transfer_context( + transfer_context.transfer_id + ) + return True + + except Exception as e: + logger.exception(f"Failed to execute ARI transfer: {e}") + return False + + async def _find_transfer_context_for_call(self, caller_channel_id: str): + """Find the active transfer context for this caller channel.""" + try: + import redis.asyncio as aioredis + + from api.constants import REDIS_URL + from api.services.telephony.transfer_event_protocol import TransferContext + + # Search Redis for transfer contexts where original_call_sid matches this caller + redis = aioredis.from_url(REDIS_URL, decode_responses=True) + transfer_keys = await redis.keys("transfer:context:*") + + for key in transfer_keys: + try: + context_data = await redis.get(key) + if context_data: + context = TransferContext.from_json(context_data) + if context.original_call_sid == caller_channel_id: + return context + except Exception: + continue + + return None + + except Exception as e: + logger.error(f"[ARI Transfer] Error finding transfer context: {e}") + return None + + +class ARIHangupStrategy(HangupStrategy): + """Implements hangup for Asterisk ARI channels.""" + + async def execute_hangup(self, context: Dict[str, Any]) -> bool: + """Hang up the Asterisk channel via ARI REST API.""" + try: + import aiohttp + from aiohttp import BasicAuth + + channel_id = context["channel_id"] + ari_endpoint = context["ari_endpoint"] + app_name = context["app_name"] + app_password = context["app_password"] + + if not channel_id or not ari_endpoint: + logger.warning( + "Cannot hang up Asterisk channel: missing channel_id or ari_endpoint" + ) + return False + + endpoint = f"{ari_endpoint}/ari/channels/{channel_id}" + auth = BasicAuth(app_name, app_password) + + async with aiohttp.ClientSession() as session: + async with session.delete(endpoint, auth=auth) as response: + if response.status in (200, 204): + logger.info( + f"Successfully terminated Asterisk channel {channel_id}" + ) + return True + elif response.status == 404: + logger.debug( + f"Asterisk channel {channel_id} was already terminated" + ) + return True + else: + error_text = await response.text() + logger.error( + f"Failed to terminate Asterisk channel {channel_id}: " + f"Status {response.status}, Response: {error_text}" + ) + return False + + except Exception as e: + logger.exception(f"Failed to hang up Asterisk channel: {e}") + return False \ No newline at end of file diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py index 3b2d9af..7d7fce1 100644 --- a/api/services/telephony/providers/ari_provider.py +++ b/api/services/telephony/providers/ari_provider.py @@ -388,7 +388,6 @@ class ARIProvider(TelephonyProvider): f"(timeout: {timeout}s)" ) - # Import here to avoid circular dependency from api.services.telephony.call_transfer_manager import ( get_call_transfer_manager, ) @@ -456,7 +455,7 @@ class ARIProvider(TelephonyProvider): } except Exception as e: - logger.error(f"[ARI Transfer] Failed to originate transfer channel: {e}") + logger.error(f"[ARI Transfer] Failed to originate call transfer destination channel: {e}") await call_transfer_manager.remove_transfer_context(transfer_id) raise diff --git a/api/services/telephony/providers/twilio_call_strategies.py b/api/services/telephony/providers/twilio_call_strategies.py new file mode 100644 index 0000000..d48cff3 --- /dev/null +++ b/api/services/telephony/providers/twilio_call_strategies.py @@ -0,0 +1,186 @@ +"""Twilio-specific call operation strategies. + +This module contains the business logic for Twilio call operations, +maintaining proper separation of concerns between protocol handling and business logic. +""" + +from typing import Any, Dict + +import aiohttp +from loguru import logger + +from pipecat.serializers.call_strategies import HangupStrategy, TransferStrategy + + +class TwilioConferenceStrategy(TransferStrategy): + """Implements conference-based call transfer for Twilio. + + This strategy transfers calls by placing them into a Twilio conference, + with cleanup of transfer contexts upon successful completion. + """ + + async def execute_transfer(self, context: Dict[str, Any]) -> bool: + """Execute conference transfer for Twilio call.""" + try: + account_sid = context["account_sid"] + auth_token = context["auth_token"] + call_sid = context["call_sid"] + region = context.get("region") + edge = context.get("edge") + + # 1. Find active transfer context for this call + transfer_context = await self._find_transfer_context_for_call(call_sid) + if not transfer_context: + logger.error( + f"[Twilio Transfer] No active transfer context found for call {call_sid}" + ) + return False + + logger.info( + f"[Twilio Transfer] Found transfer context: {transfer_context.transfer_id}, " + f"original: {transfer_context.original_call_sid}" + ) + + region_prefix = f"{region}." if region else "" + edge_prefix = f"{edge}." if edge else "" + + # Twilio API endpoint for updating calls + endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json" + + # Create basic auth from account_sid and auth_token + auth = aiohttp.BasicAuth(account_sid, auth_token) + + conference_name = transfer_context.conference_name + twiml = f""" + + + {conference_name} + +""" + + logger.debug( + f"[Twilio Transfer] Transferring call to conference: {conference_name}" + ) + + # 2. Make the POST request to transfer the call + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, auth=auth, data={"Twiml": twiml} + ) as response: + response_text = await response.text() + + if response.status == 200: + logger.info( + f"[Twilio Transfer] Conference transfer completed successfully for call {call_sid}, " + f"joined conference {conference_name}" + ) + + # 3. Clean up transfer context after successful transfer + await self._cleanup_transfer_context(transfer_context.transfer_id) + return True + elif response.status == 404: + logger.error( + f"Failed to transfer Twilio call {call_sid}: Call not found (404)" + ) + await self._cleanup_transfer_context(transfer_context.transfer_id) + return False + else: + logger.error( + f"Failed to transfer Twilio call {call_sid} to conference {conference_name}: " + f"Status {response.status}, Response: {response_text}" + ) + await self._cleanup_transfer_context(transfer_context.transfer_id) + return False + + except Exception as e: + logger.error(f"Failed to transfer Twilio call: {e}") + if transfer_context: + await self._cleanup_transfer_context(transfer_context.transfer_id) + return False + + async def _find_transfer_context_for_call(self, call_sid: str): + """Find the active transfer context for this call.""" + try: + import redis.asyncio as aioredis + + from api.constants import REDIS_URL + from api.services.telephony.transfer_event_protocol import TransferContext + + # Search Redis for transfer contexts where original_call_sid matches + redis = aioredis.from_url(REDIS_URL, decode_responses=True) + transfer_keys = await redis.keys("transfer:context:*") + + for key in transfer_keys: + try: + context_data = await redis.get(key) + if context_data: + context = TransferContext.from_json(context_data) + if context.original_call_sid == call_sid: + return context + except Exception: + continue + + return None + + except Exception as e: + logger.error(f"[Twilio Transfer] Error finding transfer context: {e}") + return None + + async def _cleanup_transfer_context(self, transfer_id: str): + """Clean up transfer context after completion or failure.""" + try: + from api.services.telephony.call_transfer_manager import ( + get_call_transfer_manager, + ) + + call_transfer_manager = await get_call_transfer_manager() + await call_transfer_manager.remove_transfer_context(transfer_id) + except Exception as e: + logger.error(f"[Twilio Transfer] Error cleaning up transfer context: {e}") + + +class TwilioHangupStrategy(HangupStrategy): + """Implements hangup for Twilio calls.""" + + async def execute_hangup(self, context: Dict[str, Any]) -> bool: + """Hang up the Twilio call via REST API.""" + try: + account_sid = context["account_sid"] + auth_token = context["auth_token"] + call_sid = context["call_sid"] + region = context.get("region") + edge = context.get("edge") + + if not account_sid or not auth_token or not call_sid: + logger.warning( + "Cannot hang up Twilio call: missing required credentials or call_sid" + ) + return False + + region_prefix = f"{region}." if region else "" + edge_prefix = f"{edge}." if edge else "" + + endpoint = f"https://api.{edge_prefix}{region_prefix}twilio.com/2010-04-01/Accounts/{account_sid}/Calls/{call_sid}.json" + auth = aiohttp.BasicAuth(account_sid, auth_token) + + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, auth=auth, data={"Status": "completed"} + ) as response: + if response.status == 200: + logger.info(f"Successfully terminated Twilio call {call_sid}") + return True + elif response.status == 404: + logger.debug(f"Twilio call {call_sid} was already terminated") + return True + else: + response_text = await response.text() + logger.error( + f"Failed to terminate Twilio call {call_sid}: " + f"Status {response.status}, Response: {response_text}" + ) + return False + + except Exception as e: + logger.exception(f"Failed to hang up Twilio call: {e}") + return False \ No newline at end of file diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index db7933f..0c2dfca 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -488,7 +488,7 @@ class CustomToolManager: finally: # Cleanup hold music and pipeline state - # Transfer context cleanup is now handled by respective serializers + # Transfer context cleanup is handled by respective transfer call strategies logger.info( "Transfer wait ended, cleaning up hold music and pipeline state" ) diff --git a/docker-compose.yaml b/docker-compose.yaml index 9a288bd..949b40a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -59,52 +59,52 @@ services: networks: - app-network - # nginx: - # image: nginx:alpine - # container_name: nginx_https - # profiles: ["remote"] - # depends_on: - # - ui - # ports: - # - "80:80" - # - "443:443" - # volumes: - # - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro - # - ./certs:/etc/nginx/certs:ro - # networks: - # - app-network + nginx: + image: nginx:alpine + container_name: nginx_https + profiles: ["remote"] + depends_on: + - ui + ports: + - "80:80" + - "443:443" + volumes: + - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro + - ./certs:/etc/nginx/certs:ro + networks: + - app-network - # api: - # image: ${REGISTRY:-dograhai}/dograh-api:latest - # volumes: - # - shared-tmp:/tmp - # environment: - # # Core application config - # ENVIRONMENT: "local" - # LOG_LEVEL: "INFO" + api: + image: ${REGISTRY:-dograhai}/dograh-api:latest + volumes: + - shared-tmp:/tmp + environment: + # Core application config + ENVIRONMENT: "local" + LOG_LEVEL: "INFO" - # # Replace this environment variable if you are using a custom - # # domain to host the stack - # BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" + # Replace this environment variable if you are using a custom + # domain to host the stack + BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" - # # Database configuration (using containerized postgres) - # DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" + # Database configuration (using containerized postgres) + DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" - # # Redis configuration (using containerized redis) - # REDIS_URL: "redis://:redissecret@redis:6379" + # Redis configuration (using containerized redis) + REDIS_URL: "redis://:redissecret@redis:6379" - # # Storage configuration - using local MinIO - # ENABLE_AWS_S3: "false" + # Storage configuration - using local MinIO + ENABLE_AWS_S3: "false" - # # MinIO - # MINIO_ENDPOINT: "minio:9000" - # MINIO_ACCESS_KEY: "minioadmin" - # MINIO_SECRET_KEY: "minioadmin" - # MINIO_BUCKET: "voice-audio" - # MINIO_SECURE: "false" + # MinIO + MINIO_ENDPOINT: "minio:9000" + MINIO_ACCESS_KEY: "minioadmin" + MINIO_SECRET_KEY: "minioadmin" + MINIO_BUCKET: "voice-audio" + MINIO_SECURE: "false" - # # FastAPI workers count - # FASTAPI_WORKERS: 1 + # FastAPI workers count + FASTAPI_WORKERS: 1 # Langfuse ENABLE_TRACING: "false" @@ -112,36 +112,36 @@ services: # LANGFUSE_PUBLIC_KEY: "" # LANGFUSE_HOST: "" - # # TURN server configuration (for WebRTC NAT traversal in remote server) - # # Uses time-limited credentials via TURN REST API (HMAC-SHA1) - # TURN_HOST: "${TURN_HOST:-}" - # TURN_SECRET: "${TURN_SECRET:-}" + # TURN server configuration (for WebRTC NAT traversal in remote server) + # Uses time-limited credentials via TURN REST API (HMAC-SHA1) + TURN_HOST: "${TURN_HOST:-}" + TURN_SECRET: "${TURN_SECRET:-}" - # OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" + OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" - # ports: - # - "8000:8000" - # depends_on: - # postgres: - # condition: service_healthy - # redis: - # condition: service_healthy - # minio: - # condition: service_healthy - # cloudflared: - # condition: service_started - # healthcheck: - # test: - # [ - # "CMD-SHELL", - # 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"', - # ] - # interval: 30s - # timeout: 10s - # retries: 3 - # start_period: 60s - # networks: - # - app-network + ports: + - "8000:8000" + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + minio: + condition: service_healthy + cloudflared: + condition: service_started + healthcheck: + test: + [ + "CMD-SHELL", + 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"', + ] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + networks: + - app-network ui: image: ${REGISTRY:-dograhai}/dograh-ui:latest @@ -182,32 +182,32 @@ services: networks: - app-network - # coturn: - # image: coturn/coturn:4.8.0 - # container_name: coturn - # restart: unless-stopped - # profiles: ["remote"] - # ports: - # - "3478:3478/udp" - # - "3478:3478/tcp" - # - "5349:5349/udp" - # - "5349:5349/tcp" - # - "49152-49200:49152-49200/udp" - # volumes: - # - ./turnserver.conf:/etc/coturn/turnserver.conf:ro - # command: - # - -c - # - /etc/coturn/turnserver.conf - # networks: - # - app-network + coturn: + image: coturn/coturn:4.8.0 + container_name: coturn + restart: unless-stopped + profiles: ["remote"] + ports: + - "3478:3478/udp" + - "3478:3478/tcp" + - "5349:5349/udp" + - "5349:5349/tcp" + - "49152-49200:49152-49200/udp" + volumes: + - ./turnserver.conf:/etc/coturn/turnserver.conf:ro + command: + - -c + - /etc/coturn/turnserver.conf + networks: + - app-network volumes: postgres_data: redis_data: minio-data: driver: local - # shared-tmp: - # driver: local + shared-tmp: + driver: local networks: app-network: