From 30eebfe811c60659b95b346fd4116c2c4d570efd Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Mon, 23 Feb 2026 12:09:45 +0530 Subject: [PATCH] feat: tansfer calls with aasterisk --- api/routes/tool.py | 15 +- api/services/telephony/ari_manager.py | 203 +++++++++++++++++- .../telephony/call_transfer_manager.py | 14 ++ .../telephony/providers/ari_provider.py | 120 ++++++++++- .../workflow/pipecat_engine_custom_tools.py | 75 +++++-- docker-compose.yaml | 82 +++---- pipecat | 2 +- ui/package-lock.json | 4 +- .../components/TransferCallToolConfig.tsx | 55 ++++- ui/src/app/tools/[toolUuid]/page.tsx | 10 +- 10 files changed, 494 insertions(+), 86 deletions(-) diff --git a/api/routes/tool.py b/api/routes/tool.py index d54589e..0146849 100644 --- a/api/routes/tool.py +++ b/api/routes/tool.py @@ -71,7 +71,7 @@ class TransferCallConfig(BaseModel): """Configuration for Transfer Call tools.""" destination: str = Field( - description="Phone number to transfer the call to (E.164 format, e.g., +1234567890)" + description="Phone number or SIP endpoint to transfer the call to (E.164 format e.g., +1234567890, or SIP endpoint e.g., PJSIP/1234)" ) messageType: Literal["none", "custom"] = Field( default="none", description="Type of message to play before transfer" @@ -89,16 +89,23 @@ class TransferCallConfig(BaseModel): @field_validator("destination") @classmethod def validate_destination(cls, v: str) -> str: - """Validate that destination is a valid E.164 phone number.""" + """Validate that destination is a valid E.164 phone number or SIP endpoint.""" # Allow empty string for initial creation (like HTTP API tools with empty URL) if not v.strip(): return v # E.164 format: +[1-9]\d{1,14} e164_pattern = r"^\+[1-9]\d{1,14}$" - if not re.match(e164_pattern, v): + + # SIP endpoint format: PJSIP/extension or SIP/extension + sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$" + + is_valid_e164 = re.match(e164_pattern, v) + is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE) + + if not (is_valid_e164 or is_valid_sip): raise ValueError( - "Destination must be a valid E.164 phone number (e.g., +1234567890)" + "Destination must be a valid E.164 phone number (e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)" ) return v diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index ea8e414..f91e7e5 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -14,6 +14,7 @@ setup_logging() import asyncio import json import signal +import time from typing import Dict, Optional, Set from urllib.parse import urlparse @@ -26,6 +27,11 @@ from api.constants import REDIS_URL from api.db import db_client from api.enums import CallType, OrganizationConfigurationKey, WorkflowRunMode from api.services.quota_service import check_dograh_quota_by_user_id +from api.services.telephony.call_transfer_manager import get_call_transfer_manager +from api.services.telephony.transfer_event_protocol import ( + TransferEvent, + TransferEventType, +) # Redis key pattern and TTL for channel-to-run mapping _CHANNEL_KEY_PREFIX = "ari:channel:" @@ -61,6 +67,9 @@ class ARIConnection: # Redis client for channel-to-run reverse mapping (lazy init) self._redis_client: Optional[aioredis.Redis] = None + + # Transfer manager for handling call transfers (lazy init) + self._call_transfer_manager = None async def _get_redis(self) -> aioredis.Redis: """Get Redis client instance (lazy init).""" @@ -70,6 +79,12 @@ class ARIConnection: ) return self._redis_client + async def _get_transfer_manager(self): + """Get transfer manager instance (lazy init).""" + if not self._call_transfer_manager: + self._call_transfer_manager = await get_call_transfer_manager() + return self._call_transfer_manager + async def _set_channel_run(self, channel_id: str, workflow_run_id: str): """Store channel_id -> workflow_run_id mapping in Redis.""" r = await self._get_redis() @@ -228,6 +243,9 @@ class ARIConnection: channel = event.get("channel", {}) channel_id = channel.get("id", "unknown") channel_state = channel.get("state", "unknown") + + # Log all events for each channel for debugging + logger.debug(f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}") if event_type == "StasisStart": # Skip external media channels we created — they fire @@ -255,7 +273,20 @@ class ARIConnection: ) else: # Outbound call (state == "Up") — originated by us - # Parse args to extract workflow context + # Check if this is a transfer channel first + if self._is_transfer_channel(app_args): + transfer_id = self._extract_transfer_id(app_args) + if transfer_id: + logger.info( + f"[ARI org={self.organization_id}] Transfer destination answered: " + f"channel={channel_id}, transfer_id={transfer_id}" + ) + asyncio.create_task( + self._handle_transfer_answered(transfer_id, channel_id) + ) + return + + # Regular outbound call - parse args to extract workflow context args_dict = {} for arg in app_args: for pair in arg.split(","): @@ -285,6 +316,10 @@ class ARIConnection: logger.info( f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}" ) + + # Check if this is a caller hangup during transfer + # await self._handle_caller_hangup_during_transfer(channel_id) TODO: handle when caller ends call after transfer initiation + workflow_run_id = await self._get_channel_run(channel_id) if workflow_run_id: asyncio.create_task( @@ -298,12 +333,21 @@ class ARIConnection: ) elif event_type == "ChannelDestroyed": - cause = channel.get("cause", 0) - cause_txt = channel.get("cause_txt", "unknown") + cause = event.get("cause", 0) + cause_txt = event.get("cause_txt", "unknown") + tech_cause = event.get("tech_cause", "unknown") logger.info( f"[ARI org={self.organization_id}] ChannelDestroyed: " - f"channel={channel_id}, cause={cause} ({cause_txt})" + f"channel={channel_id}, cause={cause} ({cause_txt}), tech_cause = {tech_cause}" ) + + # Check if this is a transfer destination that failed + transfer_id = await self._get_transfer_id_for_channel(channel_id) + if transfer_id: + failure_message = self._map_hangup_cause_to_message(cause, tech_cause, cause_txt) + asyncio.create_task( + self._handle_transfer_failed(transfer_id, channel_id, failure_message) + ) elif event_type == "ChannelDtmfReceived": digit = event.get("digit", "") @@ -580,7 +624,35 @@ class ARIConnection: call_id = ctx.get("call_id") ext_channel_id = ctx.get("ext_channel_id") bridge_id = ctx.get("bridge_id") + transfer_state = ctx.get("transfer_state") + # Check if this is transfer-protected external channel + if (transfer_state == "in-progress" and + channel_id == ext_channel_id and + ext_channel_id is not None): + + logger.info( + f"[ARI org={self.organization_id}] Transfer in progress - skipping full teardown " + f"for external channel {channel_id}, preserving bridge {bridge_id} and caller {call_id}" + ) + + # Update transfer state to complete + ctx["transfer_state"] = "complete" + await db_client.update_workflow_run( + run_id=int(workflow_run_id), gathered_context=ctx + ) + + # Clean up only Redis markers for external channel (partial cleanup) + await self._delete_channel_run(channel_id) + await self._delete_ext_channel(channel_id) + + logger.info( + f"[ARI org={self.organization_id}] Transfer cleanup complete - preserved caller {call_id} " + f"in bridge {bridge_id}" + ) + return + + # Normal full teardown for non-transfer scenarios (transfer_state is None or not in-progress) # Delete the bridge first (removes channels from it) if bridge_id: await self._delete_bridge(bridge_id) @@ -633,6 +705,129 @@ class ARIConnection: f"{response.status} {text}" ) + # ======== CALL TRANSFER HELPER METHODS ======== + + def _map_hangup_cause_to_message(self, cause: int, tech_cause: str, cause_txt: str) -> str: + """Map Asterisk cause codes to user-friendly transfer failure messages.""" + if cause == 17 and tech_cause == "486": # User busy/declined + return "The person declined the call or their line is busy." + elif cause == 19 and tech_cause == "480": # No answer + return "The transfer call was not answered. The person may be busy or unavailable right now." + elif cause == 21: # Call rejected + return "The transfer call failed to connect. There may be a network issue or the number is unavailable." + else: + return f"Transfer failed: {cause_txt}" + + def _is_transfer_channel(self, app_args: list) -> bool: + """Check if appArgs indicate this is a transfer channel.""" + if not app_args: + return False + # Check if first arg is "transfer" (args are parsed as separate list items) + is_transfer = len(app_args) > 0 and app_args[0] == "transfer" + if is_transfer: + logger.debug(f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}") + return is_transfer + + def _extract_transfer_id(self, app_args: list) -> Optional[str]: + """Extract transfer_id from appArgs: ['transfer', '{transfer_id}', '{conf_name}'].""" + # Args are parsed as separate list items, so transfer_id is at index 1 + if len(app_args) > 1 and app_args[0] == "transfer": + transfer_id = app_args[1] + logger.debug(f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}") + return transfer_id + return None + + async def _get_transfer_id_for_channel(self, channel_id: str) -> Optional[str]: + """Get transfer_id for a channel by checking Redis mapping.""" + try: + r = await self._get_redis() + transfer_id = await r.get(f"ari:transfer_channel:{channel_id}") + logger.debug(f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}") + return transfer_id + except Exception as e: + logger.error(f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}") + return None + + async def _store_transfer_channel_mapping(self, channel_id: str, transfer_id: str): + """Store channel->transfer mapping in Redis for event correlation.""" + try: + r = await self._get_redis() + await r.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL + except Exception as e: + logger.error(f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}") + + async def _handle_transfer_answered(self, transfer_id: str, destination_channel_id: str): + """Handle transfer destination channel answered - publish success event.""" + try: + logger.info( + f"[ARI Transfer org={self.organization_id}] Destination {destination_channel_id} " + f"answered for transfer {transfer_id}" + ) + + # Store channel mapping for potential future events + await self._store_transfer_channel_mapping(destination_channel_id, transfer_id) + + # Get transfer context + transfer_manager = await self._get_transfer_manager() + context = await transfer_manager.get_transfer_context(transfer_id) + if not context: + logger.error( + f"[ARI Transfer org={self.organization_id}] No transfer context found for {transfer_id}" + ) + return + + logger.info( + f"[ARI Transfer org={self.organization_id}] Transfer {transfer_id} success: " + f"caller={context.original_call_sid} -> destination={destination_channel_id}" + ) + + # Publish transfer success event - this will trigger the bridge swap in serializer + success_event = TransferEvent( + type=TransferEventType.TRANSFER_ANSWERED, + transfer_id=transfer_id, + original_call_sid=context.original_call_sid, + transfer_call_sid=destination_channel_id, + conference_name=context.conference_name, + message="Transfer destination answered", + status="success", + action="transfer_success", + end_call=True, + timestamp=time.time() + ) + await transfer_manager.publish_transfer_event(success_event) + + except Exception as e: + logger.error(f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}") + # On error, publish failure event + await self._handle_transfer_failed(transfer_id, destination_channel_id, f"Transfer processing error: {e}") + + async def _handle_transfer_failed(self, transfer_id: str, channel_id: str, reason: str): + """Handle transfer failure - publish failure event.""" + try: + logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}") + + # Get transfer context + transfer_manager = await self._get_transfer_manager() + context = await transfer_manager.get_transfer_context(transfer_id) + + # Publish failure event + failure_event = TransferEvent( + type=TransferEventType.TRANSFER_FAILED, + transfer_id=transfer_id, + original_call_sid=context.original_call_sid if context else "", + transfer_call_sid=channel_id, + message=f"Transfer failed: {reason}", + status="failed", + action="transfer_failed", + reason=reason, + end_call=False, + timestamp=time.time() + ) + await transfer_manager.publish_transfer_event(failure_event) + + except Exception as e: + logger.error(f"[ARI Transfer] Error handling transfer failure: {e}") + async def _delete_channel(self, channel_id: str): """Delete (hang up) an ARI channel. Ignores 404 (already gone).""" diff --git a/api/services/telephony/call_transfer_manager.py b/api/services/telephony/call_transfer_manager.py index f843850..2f35bc3 100644 --- a/api/services/telephony/call_transfer_manager.py +++ b/api/services/telephony/call_transfer_manager.py @@ -85,6 +85,20 @@ class CallTransferManager: except Exception as e: logger.error(f"Failed to remove transfer context: {e}") + async def store_transfer_channel_mapping(self, channel_id: str, transfer_id: str) -> None: + """Store channel->transfer mapping in Redis for event correlation. + + Args: + channel_id: ARI channel ID + transfer_id: Transfer identifier + """ + try: + redis = await self._get_redis() + await redis.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL + logger.debug(f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}") + except Exception as e: + logger.error(f"[Transfer Manager] Error storing transfer channel mapping: {e}") + async def publish_transfer_event(self, event: TransferEvent) -> None: """Publish transfer event to Redis channel. diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py index 139065a..028fab7 100644 --- a/api/services/telephony/providers/ari_provider.py +++ b/api/services/telephony/providers/ari_provider.py @@ -6,6 +6,7 @@ The ARI WebSocket event listener runs as a separate process (ari_manager.py). """ import json +import time from typing import TYPE_CHECKING, Any, Dict, List, Optional from urllib.parse import urlparse @@ -349,8 +350,8 @@ class ARIProvider(TelephonyProvider): # ======== CALL TRANSFER METHODS ======== def supports_transfers(self) -> bool: - """ARI does not currently support call transfers.""" - return False + """ARI supports call transfers via bridge manipulation.""" + return True async def transfer_call( self, @@ -360,14 +361,123 @@ class ARIProvider(TelephonyProvider): timeout: int = 30, **kwargs: Any, ) -> Dict[str, Any]: - """ARI call transfers are not yet implemented.""" - raise NotImplementedError("ARI provider does not support call transfers") + """Initiate ARI call transfer by originating destination channel. + + This method returns immediately after originating the channel. + The actual transfer completion is handled asynchronously via ARI events. + + Args: + destination: Destination phone number (SIP endpoint) + transfer_id: Unique identifier for this transfer attempt + conference_name: Conference name (unused in ARI, kept for interface compatibility) + timeout: Transfer timeout in seconds + **kwargs: Additional arguments + + Returns: + Dict containing: + - call_sid: Destination channel ID + - status: "initiated" + - provider: "ari" + - raw_response: Full ARI channel creation response + """ + if not self.validate_config(): + raise ValueError("ARI provider not properly configured") + + logger.info( + f"[ARI Transfer] Initiating transfer {transfer_id} to {destination} " + f"(timeout: {timeout}s)" + ) + + # Import here to avoid circular dependency + from api.services.telephony.call_transfer_manager import get_call_transfer_manager + from api.services.telephony.transfer_event_protocol import TransferContext + + # Store transfer context for event correlation + call_transfer_manager = await get_call_transfer_manager() + context = TransferContext( + transfer_id=transfer_id, + call_sid=None, # Will be updated after channel creation + target_number=destination, + tool_uuid=kwargs.get("tool_uuid", ""), + original_call_sid=kwargs.get("original_call_sid", ""), + conference_name=conference_name, + initiated_at=time.time() + ) + await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10) + + # Build SIP endpoint + if destination.startswith("SIP/") or destination.startswith("PJSIP/"): + sip_endpoint = destination + else: + sip_endpoint = f"PJSIP/{destination}" + + # Build transfer appArgs for event correlation + app_args = f"transfer,{transfer_id},{conference_name}" + + try: + # Build endpoint URL following existing pattern + endpoint = f"{self.base_url}/channels" + + # Prepare channel creation params following existing pattern + params = { + "endpoint": sip_endpoint, + "app": self.app_name, + "appArgs": app_args, + "timeout": timeout, # Keep timeout for transfer calls + } + + # Originate destination channel using existing pattern + async with aiohttp.ClientSession() as session: + async with session.post( + endpoint, + params=params, + auth=self._get_auth(), + ) as response: + response_text = await response.text() + + if response.status != 200: + error_msg = f"ARI channel creation failed: {response.status} {response_text}" + logger.error(f"[ARI Transfer] {error_msg}") + await call_transfer_manager.remove_transfer_context(transfer_id) + raise Exception(error_msg) + + result = json.loads(response_text) + + destination_channel_id = result.get("id", "") + if not destination_channel_id: + logger.error(f"[ARI Transfer] Failed to get channel ID from response: {result}") + await call_transfer_manager.remove_transfer_context(transfer_id) + raise Exception("Failed to create destination channel") + + # Update transfer context with destination channel ID + context.call_sid = destination_channel_id + await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10) + + # Store transfer channel mapping for event correlation (works with any dialplan setup) + await call_transfer_manager.store_transfer_channel_mapping(destination_channel_id, transfer_id) + + logger.info( + f"[ARI Transfer] Originated destination channel {destination_channel_id} " + f"for transfer {transfer_id}" + ) + + return { + "call_sid": destination_channel_id, + "status": "initiated", + "provider": self.PROVIDER_NAME, + "raw_response": result, + } + + except Exception as e: + logger.error(f"[ARI Transfer] Failed to originate transfer channel: {e}") + await call_transfer_manager.remove_transfer_context(transfer_id) + raise # ======== ARI-SPECIFIC METHODS ======== async def hangup_channel(self, channel_id: str, reason: str = "normal") -> bool: """Hang up an ARI channel.""" - endpoint = f"{self.base_url}/channels/{channel_id}" + endpwoint = f"{self.base_url}/channels/{channel_id}" params = {"reason_code": reason} try: diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index f226791..0bb337d 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -339,20 +339,42 @@ class CustomToolManager: ) return - # Validate E.164 format - E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$" - if not re.match(E164_PHONE_REGEX, destination): - validation_error_result = { - "status": "failed", - "message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.", - "action": "transfer_failed", - "reason": "invalid_destination", - "end_call": True, - } - await self._handle_transfer_result( - validation_error_result, function_call_params, properties - ) - return + # Validate destination format based on workflow run mode + if workflow_run.mode == WorkflowRunMode.ARI.value: + # For ARI provider, also accept SIP endpoints + SIP_ENDPOINT_REGEX = r"^(PJSIP|SIP)\/[\w\-\.@]+$" + E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$" + + is_valid_sip = re.match(SIP_ENDPOINT_REGEX, destination) + is_valid_e164 = re.match(E164_PHONE_REGEX, destination) + + if not (is_valid_sip or is_valid_e164): + validation_error_result = { + "status": "failed", + "message": "I'm sorry, but the transfer destination appears to be invalid. Please contact support to verify the transfer settings.", + "action": "transfer_failed", + "reason": "invalid_destination", + "end_call": True, + } + await self._handle_transfer_result( + validation_error_result, function_call_params, properties + ) + return + else: + # For non-ARI providers (Twilio, etc), use E.164 validation + E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$" + if not re.match(E164_PHONE_REGEX, destination): + validation_error_result = { + "status": "failed", + "message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.", + "action": "transfer_failed", + "reason": "invalid_destination", + "end_call": True, + } + await self._handle_transfer_result( + validation_error_result, function_call_params, properties + ) + return if message_type == "custom" and custom_message: logger.info(f"Playing pre-transfer message: {custom_message}") @@ -466,15 +488,15 @@ class CustomToolManager: transfer_event = None finally: - # Single cleanup point: stop hold music, unmute pipeline, remove context + # Cleanup hold music and pipeline state + # Transfer context cleanup is now handled by respective serializers logger.info( - "Transfer wait ended, cleaning up hold music, pipeline state, and transfer context" + "Transfer wait ended, cleaning up hold music and pipeline state" ) hold_music_stop_event.set() if hold_music_task: await hold_music_task self._engine.set_mute_pipeline(False) - await call_transfer_manager.remove_transfer_context(transfer_id) # Handle result (after cleanup) if transfer_event: @@ -516,8 +538,27 @@ class CustomToolManager: exception_result, function_call_params, properties ) + finally: + # Schedule background cleanup of transfer context after pipeline processing delay + if 'transfer_id' in locals(): + asyncio.create_task( + self._cleanup_transfer_context_delayed(transfer_id) + ) + return transfer_call_handler + async def _cleanup_transfer_context_delayed(self, transfer_id: str): + """Background task to clean up transfer context after pipeline processing delay.""" + try: + # Wait for pipeline to process EndFrame(reason="transfer_call") in serializers + await asyncio.sleep(1.0) # 1 second delay for async pipeline processing + + call_transfer_manager = await get_call_transfer_manager() + await call_transfer_manager.remove_transfer_context(transfer_id) + logger.info(f"Background cleanup: removed transfer context {transfer_id}") + except Exception as e: + logger.error(f"Background cleanup error for transfer context {transfer_id}: {e}") + async def _handle_transfer_result( self, result: dict, function_call_params, properties ): diff --git a/docker-compose.yaml b/docker-compose.yaml index 5e7ce32..c7a59ef 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -74,27 +74,27 @@ services: networks: - app-network - api: - image: ${REGISTRY:-dograhai}/dograh-api:latest - volumes: - - shared-tmp:/tmp - environment: - # Core application config - ENVIRONMENT: "local" - LOG_LEVEL: "INFO" + # api: + # image: ${REGISTRY:-dograhai}/dograh-api:latest + # volumes: + # - shared-tmp:/tmp + # environment: + # # Core application config + # ENVIRONMENT: "local" + # LOG_LEVEL: "INFO" - # Replace this environment variable if you are using a custom - # domain to host the stack - BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" + # # Replace this environment variable if you are using a custom + # # domain to host the stack + # BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}" - # Database configuration (using containerized postgres) - DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" + # # Database configuration (using containerized postgres) + # DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres" - # Redis configuration (using containerized redis) - REDIS_URL: "redis://:redissecret@redis:6379" + # # Redis configuration (using containerized redis) + # REDIS_URL: "redis://:redissecret@redis:6379" - # Storage configuration - using local MinIO - ENABLE_AWS_S3: "false" + # # Storage configuration - using local MinIO + # ENABLE_AWS_S3: "false" # MinIO MINIO_ENDPOINT: "minio:9000" @@ -112,10 +112,10 @@ services: # LANGFUSE_PUBLIC_KEY: "" # LANGFUSE_HOST: "" - # TURN server configuration (for WebRTC NAT traversal in remote server) - # Uses time-limited credentials via TURN REST API (HMAC-SHA1) - TURN_HOST: "${TURN_HOST:-}" - TURN_SECRET: "${TURN_SECRET:-}" + # # TURN server configuration (for WebRTC NAT traversal in remote server) + # # Uses time-limited credentials via TURN REST API (HMAC-SHA1) + # TURN_HOST: "${TURN_HOST:-}" + # TURN_SECRET: "${TURN_SECRET:-}" OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}" @@ -182,32 +182,32 @@ services: networks: - app-network - coturn: - image: coturn/coturn:4.8.0 - container_name: coturn - restart: unless-stopped - profiles: ["remote"] - ports: - - "3478:3478/udp" - - "3478:3478/tcp" - - "5349:5349/udp" - - "5349:5349/tcp" - - "49152-49200:49152-49200/udp" - volumes: - - ./turnserver.conf:/etc/coturn/turnserver.conf:ro - command: - - -c - - /etc/coturn/turnserver.conf - networks: - - app-network + # coturn: + # image: coturn/coturn:4.8.0 + # container_name: coturn + # restart: unless-stopped + # profiles: ["remote"] + # ports: + # - "3478:3478/udp" + # - "3478:3478/tcp" + # - "5349:5349/udp" + # - "5349:5349/tcp" + # - "49152-49200:49152-49200/udp" + # volumes: + # - ./turnserver.conf:/etc/coturn/turnserver.conf:ro + # command: + # - -c + # - /etc/coturn/turnserver.conf + # networks: + # - app-network volumes: postgres_data: redis_data: minio-data: driver: local - shared-tmp: - driver: local + # shared-tmp: + # driver: local networks: app-network: diff --git a/pipecat b/pipecat index 6aa0502..0ccb4f2 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 6aa0502a9834d536aba9589cec87d827e66f2fad +Subproject commit 0ccb4f242c48b59ad34a586986bbc4a3dcec1d36 diff --git a/ui/package-lock.json b/ui/package-lock.json index 2862dc7..e7307d9 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "ui", - "version": "1.13.0", + "version": "1.14.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "ui", - "version": "1.13.0", + "version": "1.14.0", "dependencies": { "@dagrejs/dagre": "^1.1.4", "@hey-api/client-fetch": "^0.10.0", diff --git a/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx b/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx index 067fad7..15ed298 100644 --- a/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx +++ b/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx @@ -5,6 +5,7 @@ import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group"; import { Textarea } from "@/components/ui/textarea"; +import { useState, useEffect } from "react"; import { type EndCallMessageType } from "../../config"; @@ -37,20 +38,46 @@ export function TransferCallToolConfig({ timeout, onTimeoutChange, }: TransferCallToolConfigProps) { - // Basic E.164 validation pattern + const [sipMode, setSipMode] = useState(() => /^(PJSIP|SIP)\//i.test(destination)); + + // Validation patterns const isValidPhoneNumber = (phone: string): boolean => { const e164Pattern = /^\+[1-9]\d{1,14}$/; return e164Pattern.test(phone); }; - const phoneNumberError = destination && !isValidPhoneNumber(destination); + const isValidSipEndpoint = (endpoint: string): boolean => { + const sipPattern = /^(PJSIP|SIP)\/[\w\-\.@]+$/i; + return sipPattern.test(endpoint); + }; + + const getValidationError = (): string | null => { + if (!destination) return null; + + if (sipMode) { + return isValidSipEndpoint(destination) + ? null + : "Please enter a valid SIP endpoint (e.g., PJSIP/1234 or SIP/extension@domain.com)"; + } else { + return isValidPhoneNumber(destination) + ? null + : "Please enter a valid phone number in E.164 format (e.g., +1234567890)"; + } + }; + + const destinationError = getValidationError(); + + const handleSipModeToggle = () => { + setSipMode(!sipMode); + onDestinationChange(""); // Clear destination when switching modes + }; return ( Transfer Call Configuration - Configure call transfer settings (Twilio only) + Configure call transfer settings. Supports phone numbers (Twilio) and SIP endpoints (Asterisk ARI). @@ -80,21 +107,31 @@ export function TransferCallToolConfig({
- + onDestinationChange(e.target.value)} - placeholder="+1234567890" - className={phoneNumberError ? "border-red-500 focus:border-red-500" : ""} + placeholder={sipMode ? "PJSIP/1234 or SIP/extension@domain.com" : "+1234567890"} + className={destinationError ? "border-red-500 focus:border-red-500" : ""} /> - {phoneNumberError && ( + {destinationError && ( )} +
diff --git a/ui/src/app/tools/[toolUuid]/page.tsx b/ui/src/app/tools/[toolUuid]/page.tsx index b7a83c9..e97362f 100644 --- a/ui/src/app/tools/[toolUuid]/page.tsx +++ b/ui/src/app/tools/[toolUuid]/page.tsx @@ -198,10 +198,14 @@ export default function ToolDetailPage() { // Validation based on tool type if (tool.category === "transfer_call") { - // Validate destination phone number for Transfer Call tools + // Validate destination for Transfer Call tools (supports both E.164 and SIP endpoints) const e164Pattern = /^\+[1-9]\d{1,14}$/; - if (!transferDestination || !e164Pattern.test(transferDestination)) { - setError("Please enter a valid phone number in E.164 format (e.g., +1234567890)"); + const sipPattern = /^(PJSIP|SIP)\/[\w\-\.@]+$/i; + const isValidE164 = e164Pattern.test(transferDestination); + const isValidSip = sipPattern.test(transferDestination); + + if (!transferDestination || (!isValidE164 && !isValidSip)) { + setError("Please enter a valid phone number (E.164 format) or SIP endpoint (e.g., PJSIP/1234)"); return; } } else if (tool.category !== "end_call") {