From 000c648e7e9bcf97e05b04a7b6d4dd2591eb59d7 Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Mon, 23 Feb 2026 15:06:25 +0530 Subject: [PATCH] chore: format code with pre-commit script --- api/routes/tool.py | 6 +- api/services/telephony/ari_manager.py | 125 +++++++++++------- .../telephony/call_transfer_manager.py | 16 ++- .../telephony/providers/ari_provider.py | 87 ++++++------ .../workflow/pipecat_engine_custom_tools.py | 18 +-- pipecat | 2 +- .../components/TransferCallToolConfig.tsx | 15 ++- ui/src/app/tools/[toolUuid]/page.tsx | 2 +- 8 files changed, 161 insertions(+), 110 deletions(-) diff --git a/api/routes/tool.py b/api/routes/tool.py index 0146849..ba8e708 100644 --- a/api/routes/tool.py +++ b/api/routes/tool.py @@ -96,13 +96,13 @@ class TransferCallConfig(BaseModel): # E.164 format: +[1-9]\d{1,14} e164_pattern = r"^\+[1-9]\d{1,14}$" - + # SIP endpoint format: PJSIP/extension or SIP/extension sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$" - + is_valid_e164 = re.match(e164_pattern, v) is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE) - + if not (is_valid_e164 or is_valid_sip): raise ValueError( "Destination must be a valid E.164 phone number (e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)" diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index f91e7e5..f047021 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -67,7 +67,7 @@ class ARIConnection: # Redis client for channel-to-run reverse mapping (lazy init) self._redis_client: Optional[aioredis.Redis] = None - + # Transfer manager for handling call transfers (lazy init) self._call_transfer_manager = None @@ -243,9 +243,11 @@ class ARIConnection: channel = event.get("channel", {}) channel_id = channel.get("id", "unknown") channel_state = channel.get("state", "unknown") - + # Log all events for each channel for debugging - logger.debug(f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}") + logger.debug( + f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}" + ) if event_type == "StasisStart": # Skip external media channels we created — they fire @@ -285,7 +287,7 @@ class ARIConnection: self._handle_transfer_answered(transfer_id, channel_id) ) return - + # Regular outbound call - parse args to extract workflow context args_dict = {} for arg in app_args: @@ -316,10 +318,10 @@ class ARIConnection: logger.info( f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}" ) - + # Check if this is a caller hangup during transfer # await self._handle_caller_hangup_during_transfer(channel_id) TODO: handle when caller ends call after transfer initiation - + workflow_run_id = await self._get_channel_run(channel_id) if workflow_run_id: asyncio.create_task( @@ -340,13 +342,17 @@ class ARIConnection: f"[ARI org={self.organization_id}] ChannelDestroyed: " f"channel={channel_id}, cause={cause} ({cause_txt}), tech_cause = {tech_cause}" ) - + # Check if this is a transfer destination that failed transfer_id = await self._get_transfer_id_for_channel(channel_id) - if transfer_id: - failure_message = self._map_hangup_cause_to_message(cause, tech_cause, cause_txt) + if transfer_id: + failure_message = self._map_hangup_cause_to_message( + cause, tech_cause, cause_txt + ) asyncio.create_task( - self._handle_transfer_failed(transfer_id, channel_id, failure_message) + self._handle_transfer_failed( + transfer_id, channel_id, failure_message + ) ) elif event_type == "ChannelDtmfReceived": @@ -627,25 +633,26 @@ class ARIConnection: transfer_state = ctx.get("transfer_state") # Check if this is transfer-protected external channel - if (transfer_state == "in-progress" and - channel_id == ext_channel_id and - ext_channel_id is not None): - + if ( + transfer_state == "in-progress" + and channel_id == ext_channel_id + and ext_channel_id is not None + ): logger.info( f"[ARI org={self.organization_id}] Transfer in progress - skipping full teardown " f"for external channel {channel_id}, preserving bridge {bridge_id} and caller {call_id}" ) - + # Update transfer state to complete ctx["transfer_state"] = "complete" await db_client.update_workflow_run( run_id=int(workflow_run_id), gathered_context=ctx ) - + # Clean up only Redis markers for external channel (partial cleanup) await self._delete_channel_run(channel_id) await self._delete_ext_channel(channel_id) - + logger.info( f"[ARI org={self.organization_id}] Transfer cleanup complete - preserved caller {call_id} " f"in bridge {bridge_id}" @@ -706,8 +713,10 @@ class ARIConnection: ) # ======== CALL TRANSFER HELPER METHODS ======== - - def _map_hangup_cause_to_message(self, cause: int, tech_cause: str, cause_txt: str) -> str: + + def _map_hangup_cause_to_message( + self, cause: int, tech_cause: str, cause_txt: str + ) -> str: """Map Asterisk cause codes to user-friendly transfer failure messages.""" if cause == 17 and tech_cause == "486": # User busy/declined return "The person declined the call or their line is busy." @@ -717,7 +726,7 @@ class ARIConnection: return "The transfer call failed to connect. There may be a network issue or the number is unavailable." else: return f"Transfer failed: {cause_txt}" - + def _is_transfer_channel(self, app_args: list) -> bool: """Check if appArgs indicate this is a transfer channel.""" if not app_args: @@ -725,48 +734,64 @@ class ARIConnection: # Check if first arg is "transfer" (args are parsed as separate list items) is_transfer = len(app_args) > 0 and app_args[0] == "transfer" if is_transfer: - logger.debug(f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}") + logger.debug( + f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}" + ) return is_transfer - + def _extract_transfer_id(self, app_args: list) -> Optional[str]: """Extract transfer_id from appArgs: ['transfer', '{transfer_id}', '{conf_name}'].""" # Args are parsed as separate list items, so transfer_id is at index 1 if len(app_args) > 1 and app_args[0] == "transfer": transfer_id = app_args[1] - logger.debug(f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}") + logger.debug( + f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}" + ) return transfer_id return None - + async def _get_transfer_id_for_channel(self, channel_id: str) -> Optional[str]: """Get transfer_id for a channel by checking Redis mapping.""" try: r = await self._get_redis() transfer_id = await r.get(f"ari:transfer_channel:{channel_id}") - logger.debug(f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}") + logger.debug( + f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}" + ) return transfer_id except Exception as e: - logger.error(f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}") + logger.error( + f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}" + ) return None - + async def _store_transfer_channel_mapping(self, channel_id: str, transfer_id: str): """Store channel->transfer mapping in Redis for event correlation.""" try: r = await self._get_redis() - await r.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL + await r.setex( + f"ari:transfer_channel:{channel_id}", 300, transfer_id + ) # 5 minute TTL except Exception as e: - logger.error(f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}") - - async def _handle_transfer_answered(self, transfer_id: str, destination_channel_id: str): + logger.error( + f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}" + ) + + async def _handle_transfer_answered( + self, transfer_id: str, destination_channel_id: str + ): """Handle transfer destination channel answered - publish success event.""" try: logger.info( f"[ARI Transfer org={self.organization_id}] Destination {destination_channel_id} " f"answered for transfer {transfer_id}" ) - + # Store channel mapping for potential future events - await self._store_transfer_channel_mapping(destination_channel_id, transfer_id) - + await self._store_transfer_channel_mapping( + destination_channel_id, transfer_id + ) + # Get transfer context transfer_manager = await self._get_transfer_manager() context = await transfer_manager.get_transfer_context(transfer_id) @@ -775,12 +800,12 @@ class ARIConnection: f"[ARI Transfer org={self.organization_id}] No transfer context found for {transfer_id}" ) return - + logger.info( f"[ARI Transfer org={self.organization_id}] Transfer {transfer_id} success: " f"caller={context.original_call_sid} -> destination={destination_channel_id}" ) - + # Publish transfer success event - this will trigger the bridge swap in serializer success_event = TransferEvent( type=TransferEventType.TRANSFER_ANSWERED, @@ -792,24 +817,30 @@ class ARIConnection: status="success", action="transfer_success", end_call=True, - timestamp=time.time() + timestamp=time.time(), ) await transfer_manager.publish_transfer_event(success_event) - + except Exception as e: - logger.error(f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}") + logger.error( + f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}" + ) # On error, publish failure event - await self._handle_transfer_failed(transfer_id, destination_channel_id, f"Transfer processing error: {e}") - - async def _handle_transfer_failed(self, transfer_id: str, channel_id: str, reason: str): + await self._handle_transfer_failed( + transfer_id, destination_channel_id, f"Transfer processing error: {e}" + ) + + async def _handle_transfer_failed( + self, transfer_id: str, channel_id: str, reason: str + ): """Handle transfer failure - publish failure event.""" try: logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}") - + # Get transfer context transfer_manager = await self._get_transfer_manager() context = await transfer_manager.get_transfer_context(transfer_id) - + # Publish failure event failure_event = TransferEvent( type=TransferEventType.TRANSFER_FAILED, @@ -817,14 +848,14 @@ class ARIConnection: original_call_sid=context.original_call_sid if context else "", transfer_call_sid=channel_id, message=f"Transfer failed: {reason}", - status="failed", + status="failed", action="transfer_failed", reason=reason, end_call=False, - timestamp=time.time() + timestamp=time.time(), ) await transfer_manager.publish_transfer_event(failure_event) - + except Exception as e: logger.error(f"[ARI Transfer] Error handling transfer failure: {e}") diff --git a/api/services/telephony/call_transfer_manager.py b/api/services/telephony/call_transfer_manager.py index 2f35bc3..19b3ca8 100644 --- a/api/services/telephony/call_transfer_manager.py +++ b/api/services/telephony/call_transfer_manager.py @@ -85,7 +85,9 @@ class CallTransferManager: except Exception as e: logger.error(f"Failed to remove transfer context: {e}") - async def store_transfer_channel_mapping(self, channel_id: str, transfer_id: str) -> None: + async def store_transfer_channel_mapping( + self, channel_id: str, transfer_id: str + ) -> None: """Store channel->transfer mapping in Redis for event correlation. Args: @@ -94,10 +96,16 @@ class CallTransferManager: """ try: redis = await self._get_redis() - await redis.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL - logger.debug(f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}") + await redis.setex( + f"ari:transfer_channel:{channel_id}", 300, transfer_id + ) # 5 minute TTL + logger.debug( + f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}" + ) except Exception as e: - logger.error(f"[Transfer Manager] Error storing transfer channel mapping: {e}") + logger.error( + f"[Transfer Manager] Error storing transfer channel mapping: {e}" + ) async def publish_transfer_event(self, event: TransferEvent) -> None: """Publish transfer event to Redis channel. diff --git a/api/services/telephony/providers/ari_provider.py b/api/services/telephony/providers/ari_provider.py index 028fab7..e039644 100644 --- a/api/services/telephony/providers/ari_provider.py +++ b/api/services/telephony/providers/ari_provider.py @@ -361,37 +361,40 @@ class ARIProvider(TelephonyProvider): timeout: int = 30, **kwargs: Any, ) -> Dict[str, Any]: - """Initiate ARI call transfer by originating destination channel. - - This method returns immediately after originating the channel. - The actual transfer completion is handled asynchronously via ARI events. - - Args: - destination: Destination phone number (SIP endpoint) - transfer_id: Unique identifier for this transfer attempt - conference_name: Conference name (unused in ARI, kept for interface compatibility) - timeout: Transfer timeout in seconds - **kwargs: Additional arguments - - Returns: - Dict containing: - - call_sid: Destination channel ID - - status: "initiated" - - provider: "ari" - - raw_response: Full ARI channel creation response + """Initiate ARI call transfer by creating an outbound channel to the destination. + + This method creates the destination channel and returns immediately. The transfer + process completes asynchronously - success/failure is determined by ARI events + and communicated through the transfer event system. + + Args: + destination: Destination phone number (SIP endpoint) + transfer_id: Unique identifier for this transfer attempt + conference_name: Conference name (unused in ARI, kept for interface compatibility) + timeout: Transfer timeout in seconds + **kwargs: Additional arguments + + Returns: + Dict containing: + - call_sid: Destination channel ID + - status: "initiated" + - provider: "ari" + - raw_response: Full ARI channel creation response """ if not self.validate_config(): raise ValueError("ARI provider not properly configured") - + logger.info( f"[ARI Transfer] Initiating transfer {transfer_id} to {destination} " f"(timeout: {timeout}s)" ) - + # Import here to avoid circular dependency - from api.services.telephony.call_transfer_manager import get_call_transfer_manager + from api.services.telephony.call_transfer_manager import ( + get_call_transfer_manager, + ) from api.services.telephony.transfer_event_protocol import TransferContext - + # Store transfer context for event correlation call_transfer_manager = await get_call_transfer_manager() context = TransferContext( @@ -401,23 +404,23 @@ class ARIProvider(TelephonyProvider): tool_uuid=kwargs.get("tool_uuid", ""), original_call_sid=kwargs.get("original_call_sid", ""), conference_name=conference_name, - initiated_at=time.time() + initiated_at=time.time(), ) await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10) - + # Build SIP endpoint if destination.startswith("SIP/") or destination.startswith("PJSIP/"): sip_endpoint = destination else: sip_endpoint = f"PJSIP/{destination}" - + # Build transfer appArgs for event correlation app_args = f"transfer,{transfer_id},{conference_name}" - + try: # Build endpoint URL following existing pattern endpoint = f"{self.base_url}/channels" - + # Prepare channel creation params following existing pattern params = { "endpoint": sip_endpoint, @@ -425,7 +428,7 @@ class ARIProvider(TelephonyProvider): "appArgs": app_args, "timeout": timeout, # Keep timeout for transfer calls } - + # Originate destination channel using existing pattern async with aiohttp.ClientSession() as session: async with session.post( @@ -434,40 +437,46 @@ class ARIProvider(TelephonyProvider): auth=self._get_auth(), ) as response: response_text = await response.text() - + if response.status != 200: error_msg = f"ARI channel creation failed: {response.status} {response_text}" logger.error(f"[ARI Transfer] {error_msg}") await call_transfer_manager.remove_transfer_context(transfer_id) raise Exception(error_msg) - + result = json.loads(response_text) - + destination_channel_id = result.get("id", "") if not destination_channel_id: - logger.error(f"[ARI Transfer] Failed to get channel ID from response: {result}") + logger.error( + f"[ARI Transfer] Failed to get channel ID from response: {result}" + ) await call_transfer_manager.remove_transfer_context(transfer_id) raise Exception("Failed to create destination channel") - + # Update transfer context with destination channel ID context.call_sid = destination_channel_id - await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10) - + await call_transfer_manager.store_transfer_context( + context, ttl=timeout + 10 + ) + # Store transfer channel mapping for event correlation (works with any dialplan setup) - await call_transfer_manager.store_transfer_channel_mapping(destination_channel_id, transfer_id) - + await call_transfer_manager.store_transfer_channel_mapping( + destination_channel_id, transfer_id + ) + logger.info( f"[ARI Transfer] Originated destination channel {destination_channel_id} " f"for transfer {transfer_id}" ) - + return { "call_sid": destination_channel_id, "status": "initiated", "provider": self.PROVIDER_NAME, "raw_response": result, } - + except Exception as e: logger.error(f"[ARI Transfer] Failed to originate transfer channel: {e}") await call_transfer_manager.remove_transfer_context(transfer_id) diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index 0bb337d..4efd3d3 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -341,16 +341,16 @@ class CustomToolManager: # Validate destination format based on workflow run mode if workflow_run.mode == WorkflowRunMode.ARI.value: - # For ARI provider, also accept SIP endpoints + # For ARI provider, also accept SIP endpoints SIP_ENDPOINT_REGEX = r"^(PJSIP|SIP)\/[\w\-\.@]+$" E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$" - + is_valid_sip = re.match(SIP_ENDPOINT_REGEX, destination) is_valid_e164 = re.match(E164_PHONE_REGEX, destination) - + if not (is_valid_sip or is_valid_e164): validation_error_result = { - "status": "failed", + "status": "failed", "message": "I'm sorry, but the transfer destination appears to be invalid. Please contact support to verify the transfer settings.", "action": "transfer_failed", "reason": "invalid_destination", @@ -367,7 +367,7 @@ class CustomToolManager: validation_error_result = { "status": "failed", "message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.", - "action": "transfer_failed", + "action": "transfer_failed", "reason": "invalid_destination", "end_call": True, } @@ -540,7 +540,7 @@ class CustomToolManager: finally: # Schedule background cleanup of transfer context after pipeline processing delay - if 'transfer_id' in locals(): + if "transfer_id" in locals(): asyncio.create_task( self._cleanup_transfer_context_delayed(transfer_id) ) @@ -552,12 +552,14 @@ class CustomToolManager: try: # Wait for pipeline to process EndFrame(reason="transfer_call") in serializers await asyncio.sleep(1.0) # 1 second delay for async pipeline processing - + call_transfer_manager = await get_call_transfer_manager() await call_transfer_manager.remove_transfer_context(transfer_id) logger.info(f"Background cleanup: removed transfer context {transfer_id}") except Exception as e: - logger.error(f"Background cleanup error for transfer context {transfer_id}: {e}") + logger.error( + f"Background cleanup error for transfer context {transfer_id}: {e}" + ) async def _handle_transfer_result( self, result: dict, function_call_params, properties diff --git a/pipecat b/pipecat index 0ccb4f2..d356f77 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 0ccb4f242c48b59ad34a586986bbc4a3dcec1d36 +Subproject commit d356f777f5055e6be66edba54400d214ed8174b5 diff --git a/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx b/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx index 15ed298..a37902d 100644 --- a/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx +++ b/ui/src/app/tools/[toolUuid]/components/TransferCallToolConfig.tsx @@ -1,11 +1,12 @@ "use client"; +import {useState } from "react"; + import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group"; import { Textarea } from "@/components/ui/textarea"; -import { useState, useEffect } from "react"; import { type EndCallMessageType } from "../../config"; @@ -53,14 +54,14 @@ export function TransferCallToolConfig({ const getValidationError = (): string | null => { if (!destination) return null; - + if (sipMode) { - return isValidSipEndpoint(destination) - ? null + return isValidSipEndpoint(destination) + ? null : "Please enter a valid SIP endpoint (e.g., PJSIP/1234 or SIP/extension@domain.com)"; } else { - return isValidPhoneNumber(destination) - ? null + return isValidPhoneNumber(destination) + ? null : "Please enter a valid phone number in E.164 format (e.g., +1234567890)"; } }; @@ -109,7 +110,7 @@ export function TransferCallToolConfig({