chore: format code with pre-commit script

This commit is contained in:
Sabiha Khan 2026-02-23 15:06:25 +05:30
parent 30eebfe811
commit 000c648e7e
8 changed files with 161 additions and 110 deletions

View file

@ -96,13 +96,13 @@ class TransferCallConfig(BaseModel):
# E.164 format: +[1-9]\d{1,14}
e164_pattern = r"^\+[1-9]\d{1,14}$"
# SIP endpoint format: PJSIP/extension or SIP/extension
sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$"
is_valid_e164 = re.match(e164_pattern, v)
is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE)
if not (is_valid_e164 or is_valid_sip):
raise ValueError(
"Destination must be a valid E.164 phone number (e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)"

View file

@ -67,7 +67,7 @@ class ARIConnection:
# Redis client for channel-to-run reverse mapping (lazy init)
self._redis_client: Optional[aioredis.Redis] = None
# Transfer manager for handling call transfers (lazy init)
self._call_transfer_manager = None
@ -243,9 +243,11 @@ class ARIConnection:
channel = event.get("channel", {})
channel_id = channel.get("id", "unknown")
channel_state = channel.get("state", "unknown")
# Log all events for each channel for debugging
logger.debug(f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}")
logger.debug(
f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}"
)
if event_type == "StasisStart":
# Skip external media channels we created — they fire
@ -285,7 +287,7 @@ class ARIConnection:
self._handle_transfer_answered(transfer_id, channel_id)
)
return
# Regular outbound call - parse args to extract workflow context
args_dict = {}
for arg in app_args:
@ -316,10 +318,10 @@ class ARIConnection:
logger.info(
f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}"
)
# Check if this is a caller hangup during transfer
# await self._handle_caller_hangup_during_transfer(channel_id) TODO: handle when caller ends call after transfer initiation
workflow_run_id = await self._get_channel_run(channel_id)
if workflow_run_id:
asyncio.create_task(
@ -340,13 +342,17 @@ class ARIConnection:
f"[ARI org={self.organization_id}] ChannelDestroyed: "
f"channel={channel_id}, cause={cause} ({cause_txt}), tech_cause = {tech_cause}"
)
# Check if this is a transfer destination that failed
transfer_id = await self._get_transfer_id_for_channel(channel_id)
if transfer_id:
failure_message = self._map_hangup_cause_to_message(cause, tech_cause, cause_txt)
if transfer_id:
failure_message = self._map_hangup_cause_to_message(
cause, tech_cause, cause_txt
)
asyncio.create_task(
self._handle_transfer_failed(transfer_id, channel_id, failure_message)
self._handle_transfer_failed(
transfer_id, channel_id, failure_message
)
)
elif event_type == "ChannelDtmfReceived":
@ -627,25 +633,26 @@ class ARIConnection:
transfer_state = ctx.get("transfer_state")
# Check if this is transfer-protected external channel
if (transfer_state == "in-progress" and
channel_id == ext_channel_id and
ext_channel_id is not None):
if (
transfer_state == "in-progress"
and channel_id == ext_channel_id
and ext_channel_id is not None
):
logger.info(
f"[ARI org={self.organization_id}] Transfer in progress - skipping full teardown "
f"for external channel {channel_id}, preserving bridge {bridge_id} and caller {call_id}"
)
# Update transfer state to complete
ctx["transfer_state"] = "complete"
await db_client.update_workflow_run(
run_id=int(workflow_run_id), gathered_context=ctx
)
# Clean up only Redis markers for external channel (partial cleanup)
await self._delete_channel_run(channel_id)
await self._delete_ext_channel(channel_id)
logger.info(
f"[ARI org={self.organization_id}] Transfer cleanup complete - preserved caller {call_id} "
f"in bridge {bridge_id}"
@ -706,8 +713,10 @@ class ARIConnection:
)
# ======== CALL TRANSFER HELPER METHODS ========
def _map_hangup_cause_to_message(self, cause: int, tech_cause: str, cause_txt: str) -> str:
def _map_hangup_cause_to_message(
self, cause: int, tech_cause: str, cause_txt: str
) -> str:
"""Map Asterisk cause codes to user-friendly transfer failure messages."""
if cause == 17 and tech_cause == "486": # User busy/declined
return "The person declined the call or their line is busy."
@ -717,7 +726,7 @@ class ARIConnection:
return "The transfer call failed to connect. There may be a network issue or the number is unavailable."
else:
return f"Transfer failed: {cause_txt}"
def _is_transfer_channel(self, app_args: list) -> bool:
"""Check if appArgs indicate this is a transfer channel."""
if not app_args:
@ -725,48 +734,64 @@ class ARIConnection:
# Check if first arg is "transfer" (args are parsed as separate list items)
is_transfer = len(app_args) > 0 and app_args[0] == "transfer"
if is_transfer:
logger.debug(f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}")
logger.debug(
f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}"
)
return is_transfer
def _extract_transfer_id(self, app_args: list) -> Optional[str]:
"""Extract transfer_id from appArgs: ['transfer', '{transfer_id}', '{conf_name}']."""
# Args are parsed as separate list items, so transfer_id is at index 1
if len(app_args) > 1 and app_args[0] == "transfer":
transfer_id = app_args[1]
logger.debug(f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}")
logger.debug(
f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}"
)
return transfer_id
return None
async def _get_transfer_id_for_channel(self, channel_id: str) -> Optional[str]:
"""Get transfer_id for a channel by checking Redis mapping."""
try:
r = await self._get_redis()
transfer_id = await r.get(f"ari:transfer_channel:{channel_id}")
logger.debug(f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}")
logger.debug(
f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}"
)
return transfer_id
except Exception as e:
logger.error(f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}")
logger.error(
f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}"
)
return None
async def _store_transfer_channel_mapping(self, channel_id: str, transfer_id: str):
"""Store channel->transfer mapping in Redis for event correlation."""
try:
r = await self._get_redis()
await r.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL
await r.setex(
f"ari:transfer_channel:{channel_id}", 300, transfer_id
) # 5 minute TTL
except Exception as e:
logger.error(f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}")
async def _handle_transfer_answered(self, transfer_id: str, destination_channel_id: str):
logger.error(
f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}"
)
async def _handle_transfer_answered(
self, transfer_id: str, destination_channel_id: str
):
"""Handle transfer destination channel answered - publish success event."""
try:
logger.info(
f"[ARI Transfer org={self.organization_id}] Destination {destination_channel_id} "
f"answered for transfer {transfer_id}"
)
# Store channel mapping for potential future events
await self._store_transfer_channel_mapping(destination_channel_id, transfer_id)
await self._store_transfer_channel_mapping(
destination_channel_id, transfer_id
)
# Get transfer context
transfer_manager = await self._get_transfer_manager()
context = await transfer_manager.get_transfer_context(transfer_id)
@ -775,12 +800,12 @@ class ARIConnection:
f"[ARI Transfer org={self.organization_id}] No transfer context found for {transfer_id}"
)
return
logger.info(
f"[ARI Transfer org={self.organization_id}] Transfer {transfer_id} success: "
f"caller={context.original_call_sid} -> destination={destination_channel_id}"
)
# Publish transfer success event - this will trigger the bridge swap in serializer
success_event = TransferEvent(
type=TransferEventType.TRANSFER_ANSWERED,
@ -792,24 +817,30 @@ class ARIConnection:
status="success",
action="transfer_success",
end_call=True,
timestamp=time.time()
timestamp=time.time(),
)
await transfer_manager.publish_transfer_event(success_event)
except Exception as e:
logger.error(f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}")
logger.error(
f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}"
)
# On error, publish failure event
await self._handle_transfer_failed(transfer_id, destination_channel_id, f"Transfer processing error: {e}")
async def _handle_transfer_failed(self, transfer_id: str, channel_id: str, reason: str):
await self._handle_transfer_failed(
transfer_id, destination_channel_id, f"Transfer processing error: {e}"
)
async def _handle_transfer_failed(
self, transfer_id: str, channel_id: str, reason: str
):
"""Handle transfer failure - publish failure event."""
try:
logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}")
# Get transfer context
transfer_manager = await self._get_transfer_manager()
context = await transfer_manager.get_transfer_context(transfer_id)
# Publish failure event
failure_event = TransferEvent(
type=TransferEventType.TRANSFER_FAILED,
@ -817,14 +848,14 @@ class ARIConnection:
original_call_sid=context.original_call_sid if context else "",
transfer_call_sid=channel_id,
message=f"Transfer failed: {reason}",
status="failed",
status="failed",
action="transfer_failed",
reason=reason,
end_call=False,
timestamp=time.time()
timestamp=time.time(),
)
await transfer_manager.publish_transfer_event(failure_event)
except Exception as e:
logger.error(f"[ARI Transfer] Error handling transfer failure: {e}")

View file

@ -85,7 +85,9 @@ class CallTransferManager:
except Exception as e:
logger.error(f"Failed to remove transfer context: {e}")
async def store_transfer_channel_mapping(self, channel_id: str, transfer_id: str) -> None:
async def store_transfer_channel_mapping(
self, channel_id: str, transfer_id: str
) -> None:
"""Store channel->transfer mapping in Redis for event correlation.
Args:
@ -94,10 +96,16 @@ class CallTransferManager:
"""
try:
redis = await self._get_redis()
await redis.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL
logger.debug(f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}")
await redis.setex(
f"ari:transfer_channel:{channel_id}", 300, transfer_id
) # 5 minute TTL
logger.debug(
f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}"
)
except Exception as e:
logger.error(f"[Transfer Manager] Error storing transfer channel mapping: {e}")
logger.error(
f"[Transfer Manager] Error storing transfer channel mapping: {e}"
)
async def publish_transfer_event(self, event: TransferEvent) -> None:
"""Publish transfer event to Redis channel.

View file

@ -361,37 +361,40 @@ class ARIProvider(TelephonyProvider):
timeout: int = 30,
**kwargs: Any,
) -> Dict[str, Any]:
"""Initiate ARI call transfer by originating destination channel.
This method returns immediately after originating the channel.
The actual transfer completion is handled asynchronously via ARI events.
Args:
destination: Destination phone number (SIP endpoint)
transfer_id: Unique identifier for this transfer attempt
conference_name: Conference name (unused in ARI, kept for interface compatibility)
timeout: Transfer timeout in seconds
**kwargs: Additional arguments
Returns:
Dict containing:
- call_sid: Destination channel ID
- status: "initiated"
- provider: "ari"
- raw_response: Full ARI channel creation response
"""Initiate ARI call transfer by creating an outbound channel to the destination.
This method creates the destination channel and returns immediately. The transfer
process completes asynchronously - success/failure is determined by ARI events
and communicated through the transfer event system.
Args:
destination: Destination phone number (SIP endpoint)
transfer_id: Unique identifier for this transfer attempt
conference_name: Conference name (unused in ARI, kept for interface compatibility)
timeout: Transfer timeout in seconds
**kwargs: Additional arguments
Returns:
Dict containing:
- call_sid: Destination channel ID
- status: "initiated"
- provider: "ari"
- raw_response: Full ARI channel creation response
"""
if not self.validate_config():
raise ValueError("ARI provider not properly configured")
logger.info(
f"[ARI Transfer] Initiating transfer {transfer_id} to {destination} "
f"(timeout: {timeout}s)"
)
# Import here to avoid circular dependency
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.call_transfer_manager import (
get_call_transfer_manager,
)
from api.services.telephony.transfer_event_protocol import TransferContext
# Store transfer context for event correlation
call_transfer_manager = await get_call_transfer_manager()
context = TransferContext(
@ -401,23 +404,23 @@ class ARIProvider(TelephonyProvider):
tool_uuid=kwargs.get("tool_uuid", ""),
original_call_sid=kwargs.get("original_call_sid", ""),
conference_name=conference_name,
initiated_at=time.time()
initiated_at=time.time(),
)
await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10)
# Build SIP endpoint
if destination.startswith("SIP/") or destination.startswith("PJSIP/"):
sip_endpoint = destination
else:
sip_endpoint = f"PJSIP/{destination}"
# Build transfer appArgs for event correlation
app_args = f"transfer,{transfer_id},{conference_name}"
try:
# Build endpoint URL following existing pattern
endpoint = f"{self.base_url}/channels"
# Prepare channel creation params following existing pattern
params = {
"endpoint": sip_endpoint,
@ -425,7 +428,7 @@ class ARIProvider(TelephonyProvider):
"appArgs": app_args,
"timeout": timeout, # Keep timeout for transfer calls
}
# Originate destination channel using existing pattern
async with aiohttp.ClientSession() as session:
async with session.post(
@ -434,40 +437,46 @@ class ARIProvider(TelephonyProvider):
auth=self._get_auth(),
) as response:
response_text = await response.text()
if response.status != 200:
error_msg = f"ARI channel creation failed: {response.status} {response_text}"
logger.error(f"[ARI Transfer] {error_msg}")
await call_transfer_manager.remove_transfer_context(transfer_id)
raise Exception(error_msg)
result = json.loads(response_text)
destination_channel_id = result.get("id", "")
if not destination_channel_id:
logger.error(f"[ARI Transfer] Failed to get channel ID from response: {result}")
logger.error(
f"[ARI Transfer] Failed to get channel ID from response: {result}"
)
await call_transfer_manager.remove_transfer_context(transfer_id)
raise Exception("Failed to create destination channel")
# Update transfer context with destination channel ID
context.call_sid = destination_channel_id
await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10)
await call_transfer_manager.store_transfer_context(
context, ttl=timeout + 10
)
# Store transfer channel mapping for event correlation (works with any dialplan setup)
await call_transfer_manager.store_transfer_channel_mapping(destination_channel_id, transfer_id)
await call_transfer_manager.store_transfer_channel_mapping(
destination_channel_id, transfer_id
)
logger.info(
f"[ARI Transfer] Originated destination channel {destination_channel_id} "
f"for transfer {transfer_id}"
)
return {
"call_sid": destination_channel_id,
"status": "initiated",
"provider": self.PROVIDER_NAME,
"raw_response": result,
}
except Exception as e:
logger.error(f"[ARI Transfer] Failed to originate transfer channel: {e}")
await call_transfer_manager.remove_transfer_context(transfer_id)

View file

@ -341,16 +341,16 @@ class CustomToolManager:
# Validate destination format based on workflow run mode
if workflow_run.mode == WorkflowRunMode.ARI.value:
# For ARI provider, also accept SIP endpoints
# For ARI provider, also accept SIP endpoints
SIP_ENDPOINT_REGEX = r"^(PJSIP|SIP)\/[\w\-\.@]+$"
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
is_valid_sip = re.match(SIP_ENDPOINT_REGEX, destination)
is_valid_e164 = re.match(E164_PHONE_REGEX, destination)
if not (is_valid_sip or is_valid_e164):
validation_error_result = {
"status": "failed",
"status": "failed",
"message": "I'm sorry, but the transfer destination appears to be invalid. Please contact support to verify the transfer settings.",
"action": "transfer_failed",
"reason": "invalid_destination",
@ -367,7 +367,7 @@ class CustomToolManager:
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
"action": "transfer_failed",
"action": "transfer_failed",
"reason": "invalid_destination",
"end_call": True,
}
@ -540,7 +540,7 @@ class CustomToolManager:
finally:
# Schedule background cleanup of transfer context after pipeline processing delay
if 'transfer_id' in locals():
if "transfer_id" in locals():
asyncio.create_task(
self._cleanup_transfer_context_delayed(transfer_id)
)
@ -552,12 +552,14 @@ class CustomToolManager:
try:
# Wait for pipeline to process EndFrame(reason="transfer_call") in serializers
await asyncio.sleep(1.0) # 1 second delay for async pipeline processing
call_transfer_manager = await get_call_transfer_manager()
await call_transfer_manager.remove_transfer_context(transfer_id)
logger.info(f"Background cleanup: removed transfer context {transfer_id}")
except Exception as e:
logger.error(f"Background cleanup error for transfer context {transfer_id}: {e}")
logger.error(
f"Background cleanup error for transfer context {transfer_id}: {e}"
)
async def _handle_transfer_result(
self, result: dict, function_call_params, properties

@ -1 +1 @@
Subproject commit 0ccb4f242c48b59ad34a586986bbc4a3dcec1d36
Subproject commit d356f777f5055e6be66edba54400d214ed8174b5

View file

@ -1,11 +1,12 @@
"use client";
import {useState } from "react";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group";
import { Textarea } from "@/components/ui/textarea";
import { useState, useEffect } from "react";
import { type EndCallMessageType } from "../../config";
@ -53,14 +54,14 @@ export function TransferCallToolConfig({
const getValidationError = (): string | null => {
if (!destination) return null;
if (sipMode) {
return isValidSipEndpoint(destination)
? null
return isValidSipEndpoint(destination)
? null
: "Please enter a valid SIP endpoint (e.g., PJSIP/1234 or SIP/extension@domain.com)";
} else {
return isValidPhoneNumber(destination)
? null
return isValidPhoneNumber(destination)
? null
: "Please enter a valid phone number in E.164 format (e.g., +1234567890)";
}
};
@ -109,7 +110,7 @@ export function TransferCallToolConfig({
<div className="grid gap-2 pt-4 border-t">
<Label>Transfer Destination</Label>
<Label className="text-xs text-muted-foreground">
{sipMode
{sipMode
? "SIP endpoint to transfer the call to (e.g., PJSIP/1234 or SIP/extension@domain.com)"
: "Phone number to transfer the call to (E.164 format with country code)"
}

View file

@ -203,7 +203,7 @@ export default function ToolDetailPage() {
const sipPattern = /^(PJSIP|SIP)\/[\w\-\.@]+$/i;
const isValidE164 = e164Pattern.test(transferDestination);
const isValidSip = sipPattern.test(transferDestination);
if (!transferDestination || (!isValidE164 && !isValidSip)) {
setError("Please enter a valid phone number (E.164 format) or SIP endpoint (e.g., PJSIP/1234)");
return;