diff --git a/api/routes/telephony.py b/api/routes/telephony.py index cc7ce55..0e423cb 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -1670,7 +1670,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request): result = { "status": "success", "message": "Great! The destination number answered. Let me transfer you now.", - "action": "transfer_success", + "action": "destination_answered", "conference_id": conference_name, "transfer_call_sid": call_sid, # The outbound transfer call SID "original_call_sid": original_call_sid, # The original caller's SID @@ -1714,9 +1714,7 @@ async def complete_transfer_function_call(transfer_id: str, request: Request): try: # Determine event type based on result status if result["status"] == "success": - event_type = TransferEventType.TRANSFER_COMPLETED - elif result.get("reason") == "timeout": - event_type = TransferEventType.TRANSFER_TIMEOUT + event_type = TransferEventType.DESTINATION_ANSWERED else: event_type = TransferEventType.TRANSFER_FAILED diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index db18380..c7b56ff 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -6,6 +6,14 @@ from api.constants import APP_ROOT_DIR from api.db import db_client from api.enums import OrganizationConfigurationKey from api.services.pipecat.audio_config import AudioConfig +from api.services.telephony.providers.ari_call_strategies import ( + ARIBridgeSwapStrategy, + ARIHangupStrategy, +) +from api.services.telephony.providers.twilio_call_strategies import ( + TwilioConferenceStrategy, + TwilioHangupStrategy, +) from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer from pipecat.serializers.asterisk import AsteriskFrameSerializer @@ -54,12 +62,17 @@ async def create_twilio_transport( raise ValueError( f"Incomplete Twilio configuration for organization {organization_id}" ) + # Create strategy instances + transfer_strategy = TwilioConferenceStrategy() + hangup_strategy = TwilioHangupStrategy() serializer = TwilioFrameSerializer( stream_sid=stream_sid, call_sid=call_sid, account_sid=account_sid, auth_token=auth_token, + transfer_strategy=transfer_strategy, + hangup_strategy=hangup_strategy, ) return FastAPIWebsocketTransport( @@ -178,12 +191,17 @@ async def create_ari_transport( f"Incomplete ARI configuration for organization {organization_id}. " f"Required: ari_endpoint, app_name, app_password" ) + # Create strategy instances + transfer_strategy = ARIBridgeSwapStrategy() + hangup_strategy = ARIHangupStrategy() serializer = AsteriskFrameSerializer( channel_id=channel_id, ari_endpoint=ari_endpoint, app_name=app_name, app_password=app_password, + transfer_strategy=transfer_strategy, + hangup_strategy=hangup_strategy, params=AsteriskFrameSerializer.InputParams( asterisk_sample_rate=audio_config.transport_in_sample_rate, sample_rate=audio_config.pipeline_sample_rate, diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index 4da16a6..19797da 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -284,7 +284,7 @@ class ARIConnection: f"channel={channel_id}, transfer_id={transfer_id}" ) asyncio.create_task( - self._handle_transfer_answered(transfer_id, channel_id) + self._handle_destination_answered(transfer_id, channel_id) ) return @@ -753,7 +753,7 @@ class ARIConnection: ) return None - async def _handle_transfer_answered( + async def _handle_destination_answered( self, transfer_id: str, destination_channel_id: str ): """Handle transfer destination channel answered - publish success event.""" @@ -780,16 +780,16 @@ class ARIConnection: f"caller={context.original_call_sid} -> destination={destination_channel_id}" ) - # Publish transfer success event - this will trigger the bridge swap in serializer + # Publish destination answered event - this will trigger the bridge swap in serializer success_event = TransferEvent( - type=TransferEventType.TRANSFER_ANSWERED, + type=TransferEventType.DESTINATION_ANSWERED, transfer_id=transfer_id, original_call_sid=context.original_call_sid, transfer_call_sid=destination_channel_id, conference_name=context.conference_name, message="Transfer destination answered", status="success", - action="transfer_success", + action="destination_answered", end_call=True, timestamp=time.time(), ) diff --git a/api/services/telephony/call_transfer_manager.py b/api/services/telephony/call_transfer_manager.py index 19b3ca8..6c1e0f6 100644 --- a/api/services/telephony/call_transfer_manager.py +++ b/api/services/telephony/call_transfer_manager.py @@ -158,16 +158,10 @@ class CallTransferManager: ) # Check if this is a completion event - if ( - event.type - in [ - TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful - TransferEventType.TRANSFER_COMPLETED, - TransferEventType.TRANSFER_FAILED, - TransferEventType.TRANSFER_CANCELLED, - TransferEventType.TRANSFER_TIMEOUT, - ] - ): + if event.type in [ + TransferEventType.DESTINATION_ANSWERED, + TransferEventType.TRANSFER_FAILED, + ]: return event except Exception as e: logger.error(f"Failed to parse transfer event: {e}") diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py index c692fdc..3b2d9af 100644 --- a/api/services/telephony/providers/ari_provider.py +++ b/api/services/telephony/providers/ari_provider.py @@ -6,7 +6,6 @@ The ARI WebSocket event listener runs as a separate process (ari_manager.py). """ import json -import time from typing import TYPE_CHECKING, Any, Dict, List, Optional from urllib.parse import urlparse @@ -393,20 +392,9 @@ class ARIProvider(TelephonyProvider): from api.services.telephony.call_transfer_manager import ( get_call_transfer_manager, ) - from api.services.telephony.transfer_event_protocol import TransferContext - # Store transfer context for event correlation + # Get call transfer manager for event correlation mapping call_transfer_manager = await get_call_transfer_manager() - context = TransferContext( - transfer_id=transfer_id, - call_sid=None, # Will be updated after channel creation - target_number=destination, - tool_uuid=kwargs.get("tool_uuid", ""), - original_call_sid=kwargs.get("original_call_sid", ""), - conference_name=conference_name, - initiated_at=time.time(), - ) - await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10) # Build SIP endpoint if destination.startswith("SIP/") or destination.startswith("PJSIP/"): @@ -450,12 +438,6 @@ class ARIProvider(TelephonyProvider): await call_transfer_manager.remove_transfer_context(transfer_id) raise Exception("Failed to create destination channel") - # Update transfer context with destination channel ID - context.call_sid = destination_channel_id - await call_transfer_manager.store_transfer_context( - context, ttl=timeout + 10 - ) - # Store transfer channel mapping for event correlation await call_transfer_manager.store_transfer_channel_mapping( destination_channel_id, transfer_id diff --git a/api/services/telephony/transfer_event_protocol.py b/api/services/telephony/transfer_event_protocol.py index 6260676..761d3f6 100644 --- a/api/services/telephony/transfer_event_protocol.py +++ b/api/services/telephony/transfer_event_protocol.py @@ -13,12 +13,8 @@ from typing import Any, Dict, Optional class TransferEventType(str, Enum): """Types of transfer events sent between instances.""" - TRANSFER_INITIATED = "transfer_initiated" - TRANSFER_ANSWERED = "transfer_answered" - TRANSFER_COMPLETED = "transfer_completed" + DESTINATION_ANSWERED = "destination_answered" TRANSFER_FAILED = "transfer_failed" - TRANSFER_CANCELLED = "transfer_cancelled" - TRANSFER_TIMEOUT = "transfer_timeout" @dataclass diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index 4efd3d3..db7933f 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -395,7 +395,6 @@ class CustomToolManager: ) return - # Get telephony provider directly (no HTTP round-trip) provider = await get_telephony_provider(organization_id) if not provider.supports_transfers() or not provider.validate_config(): validation_error_result = { @@ -418,6 +417,19 @@ class CustomToolManager: # Compute conference name from original call SID conference_name = f"transfer-{original_call_sid}" + # Store initial transfer context in Redis before provider call to avoid race condition + call_transfer_manager = await get_call_transfer_manager() + transfer_context = TransferContext( + transfer_id=transfer_id, + call_sid=None, # Will be updated after provider response + target_number=destination, + tool_uuid=tool.tool_uuid, + original_call_sid=original_call_sid, + conference_name=conference_name, + initiated_at=time.time(), + ) + await call_transfer_manager.store_transfer_context(transfer_context) + # Mute the pipeline self._engine.set_mute_pipeline(True) @@ -432,21 +444,8 @@ class CustomToolManager: call_sid = transfer_result.get("call_sid") logger.info(f"Transfer call initiated successfully: {call_sid}") - # TODO: Possible race here between saving the transfer context - # and getting a callback response from Twilio? Should we store_transfer_context - # before sending request to Twilio and update the transfer context afterwards? - - # Store transfer context in Redis - call_transfer_manager = await get_call_transfer_manager() - transfer_context = TransferContext( - transfer_id=transfer_id, - call_sid=call_sid, - target_number=destination, - tool_uuid=tool.tool_uuid, - original_call_sid=original_call_sid, - conference_name=conference_name, - initiated_at=time.time(), - ) + # Update transfer context with actual call_sid from provider response + transfer_context.call_sid = call_sid await call_transfer_manager.store_transfer_context(transfer_context) # Wait for status callback completion using Redis pub/sub @@ -538,67 +537,57 @@ class CustomToolManager: exception_result, function_call_params, properties ) - finally: - # Schedule background cleanup of transfer context after pipeline processing delay - if "transfer_id" in locals(): - asyncio.create_task( - self._cleanup_transfer_context_delayed(transfer_id) - ) - return transfer_call_handler - async def _cleanup_transfer_context_delayed(self, transfer_id: str): - """Background task to clean up transfer context after pipeline processing delay.""" - try: - # Wait for pipeline to process EndFrame(reason="transfer_call") in serializers - await asyncio.sleep(1.0) # 1 second delay for async pipeline processing - - call_transfer_manager = await get_call_transfer_manager() - await call_transfer_manager.remove_transfer_context(transfer_id) - logger.info(f"Background cleanup: removed transfer context {transfer_id}") - except Exception as e: - logger.error( - f"Background cleanup error for transfer context {transfer_id}: {e}" - ) - async def _handle_transfer_result( self, result: dict, function_call_params, properties ): - """Handle different transfer call outcomes and take appropriate action.""" + """Handle transfer call outcomes from any telephony provider (Twilio, ARI, etc). + + This method is provider-agnostic and processes standardized result dictionaries + from transfer completion events, validation failures, timeouts, and errors. + + Args: + result: Standardized result dict with keys: action, status, reason, message + function_call_params: LLM function call parameters for response callback + properties: Function call result properties (e.g., run_llm setting) + """ action = result.get("action", "") status = result.get("status", "") logger.info(f"Handling transfer result: action={action}, status={status}") - if action == "transfer_success": - # Successful transfer - add original caller to conference and end pipeline + if action == "destination_answered": + # Transfer destination answered - proceeding with bridge swap/conference join conference_id = result.get("conference_id") original_call_sid = result.get("original_call_sid") transfer_call_sid = result.get("transfer_call_sid") logger.info( - f"Transfer successful! Conference: {conference_id}, Original: {original_call_sid}, Transfer: {transfer_call_sid}" + f"Transfer destination answered! Conference/Bridge: {conference_id}, " + f"Original: {original_call_sid}, Transfer: {transfer_call_sid}" ) - # Inform LLM of success and end the call with Transfer call reason + # Inform LLM of success and end the call (no further LLM processing needed) response_properties = FunctionCallResultProperties(run_llm=False) await function_call_params.result_callback( { "status": "transfer_success", - "message": "Transfer successful - connecting to conference", + "message": "Transfer destination answered - connecting calls", "conference_id": conference_id, }, properties=response_properties, ) + # End pipeline - providers complete bridge swap/conference join as final transfer leg await self._engine.end_call_with_reason( EndTaskReason.TRANSFER_CALL.value, abort_immediately=False ) elif action == "transfer_failed": - # Transfer failed - inform user via LLM and then end the call + # Transfer failed - let LLM inform user with error details reason = result.get("reason", "unknown") - logger.info(f"Transfer failed ({reason}), informing user") + logger.info(f"Transfer failed ({reason}), informing user via LLM") await function_call_params.result_callback( { diff --git a/docker-compose.yaml b/docker-compose.yaml index 949b40a..9a288bd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -59,52 +59,52 @@ services: networks: - app-network - nginx: - image: nginx:alpine - container_name: nginx_https - profiles: ["remote"] - depends_on: - - ui - ports: - - "80:80" - - "443:443" - volumes: - - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro - - ./certs:/etc/nginx/certs:ro - networks: - - app-network + # nginx: + # image: nginx:alpine + # container_name: nginx_https + # profiles: ["remote"] + # depends_on: + # - ui + # ports: + # - "80:80" + # - "443:443" + # volumes: + # - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro + # - ./certs:/etc/nginx/certs:ro + # networks: + # - app-network - api: - image: ${REGISTRY:-dograhai}/dograh-api:latest - volumes: - - shared-tmp:/tmp - environment: - # Core application config - ENVIRONMENT: "local" - LOG_LEVEL: "INFO" + # api: + # image: ${REGISTRY:-dograhai}/dograh-api:latest + # volumes: + # - shared-tmp:/tmp + # environment: + # # Core application config + # ENVIRONMENT: "local" + # LOG_LEVEL: "INFO" - # Replace this environment variable if you are using a custom - # domain to host the stack - BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" + # # Replace this environment variable if you are using a custom + # # domain to host the stack + # BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" - # Database configuration (using containerized postgres) - DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" + # # Database configuration (using containerized postgres) + # DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" - # Redis configuration (using containerized redis) - REDIS_URL: "redis://:redissecret@redis:6379" + # # Redis configuration (using containerized redis) + # REDIS_URL: "redis://:redissecret@redis:6379" - # Storage configuration - using local MinIO - ENABLE_AWS_S3: "false" + # # Storage configuration - using local MinIO + # ENABLE_AWS_S3: "false" - # MinIO - MINIO_ENDPOINT: "minio:9000" - MINIO_ACCESS_KEY: "minioadmin" - MINIO_SECRET_KEY: "minioadmin" - MINIO_BUCKET: "voice-audio" - MINIO_SECURE: "false" + # # MinIO + # MINIO_ENDPOINT: "minio:9000" + # MINIO_ACCESS_KEY: "minioadmin" + # MINIO_SECRET_KEY: "minioadmin" + # MINIO_BUCKET: "voice-audio" + # MINIO_SECURE: "false" - # FastAPI workers count - FASTAPI_WORKERS: 1 + # # FastAPI workers count + # FASTAPI_WORKERS: 1 # Langfuse ENABLE_TRACING: "false" @@ -112,36 +112,36 @@ services: # LANGFUSE_PUBLIC_KEY: "" # LANGFUSE_HOST: "" - # TURN server configuration (for WebRTC NAT traversal in remote server) - # Uses time-limited credentials via TURN REST API (HMAC-SHA1) - TURN_HOST: "${TURN_HOST:-}" - TURN_SECRET: "${TURN_SECRET:-}" + # # TURN server configuration (for WebRTC NAT traversal in remote server) + # # Uses time-limited credentials via TURN REST API (HMAC-SHA1) + # TURN_HOST: "${TURN_HOST:-}" + # TURN_SECRET: "${TURN_SECRET:-}" - OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" + # OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" - ports: - - "8000:8000" - depends_on: - postgres: - condition: service_healthy - redis: - condition: service_healthy - minio: - condition: service_healthy - cloudflared: - condition: service_started - healthcheck: - test: - [ - "CMD-SHELL", - 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"', - ] - interval: 30s - timeout: 10s - retries: 3 - start_period: 60s - networks: - - app-network + # ports: + # - "8000:8000" + # depends_on: + # postgres: + # condition: service_healthy + # redis: + # condition: service_healthy + # minio: + # condition: service_healthy + # cloudflared: + # condition: service_started + # healthcheck: + # test: + # [ + # "CMD-SHELL", + # 'python -c "import urllib.request; urllib.request.urlopen(''http://localhost:8000/api/v1/health'').read()"', + # ] + # interval: 30s + # timeout: 10s + # retries: 3 + # start_period: 60s + # networks: + # - app-network ui: image: ${REGISTRY:-dograhai}/dograh-ui:latest @@ -182,32 +182,32 @@ services: networks: - app-network - coturn: - image: coturn/coturn:4.8.0 - container_name: coturn - restart: unless-stopped - profiles: ["remote"] - ports: - - "3478:3478/udp" - - "3478:3478/tcp" - - "5349:5349/udp" - - "5349:5349/tcp" - - "49152-49200:49152-49200/udp" - volumes: - - ./turnserver.conf:/etc/coturn/turnserver.conf:ro - command: - - -c - - /etc/coturn/turnserver.conf - networks: - - app-network + # coturn: + # image: coturn/coturn:4.8.0 + # container_name: coturn + # restart: unless-stopped + # profiles: ["remote"] + # ports: + # - "3478:3478/udp" + # - "3478:3478/tcp" + # - "5349:5349/udp" + # - "5349:5349/tcp" + # - "49152-49200:49152-49200/udp" + # volumes: + # - ./turnserver.conf:/etc/coturn/turnserver.conf:ro + # command: + # - -c + # - /etc/coturn/turnserver.conf + # networks: + # - app-network volumes: postgres_data: redis_data: minio-data: driver: local - shared-tmp: - driver: local + # shared-tmp: + # driver: local networks: app-network: diff --git a/ui/package-lock.json b/ui/package-lock.json index e7307d9..cc11b0f 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "ui", - "version": "1.14.0", + "version": "1.15.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "ui", - "version": "1.14.0", + "version": "1.15.0", "dependencies": { "@dagrejs/dagre": "^1.1.4", "@hey-api/client-fetch": "^0.10.0",