mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-10 08:05:22 +02:00
feat: tansfer calls with aasterisk
This commit is contained in:
parent
7aef9c6db5
commit
30eebfe811
10 changed files with 494 additions and 86 deletions
|
|
@ -71,7 +71,7 @@ class TransferCallConfig(BaseModel):
|
|||
"""Configuration for Transfer Call tools."""
|
||||
|
||||
destination: str = Field(
|
||||
description="Phone number to transfer the call to (E.164 format, e.g., +1234567890)"
|
||||
description="Phone number or SIP endpoint to transfer the call to (E.164 format e.g., +1234567890, or SIP endpoint e.g., PJSIP/1234)"
|
||||
)
|
||||
messageType: Literal["none", "custom"] = Field(
|
||||
default="none", description="Type of message to play before transfer"
|
||||
|
|
@ -89,16 +89,23 @@ class TransferCallConfig(BaseModel):
|
|||
@field_validator("destination")
|
||||
@classmethod
|
||||
def validate_destination(cls, v: str) -> str:
|
||||
"""Validate that destination is a valid E.164 phone number."""
|
||||
"""Validate that destination is a valid E.164 phone number or SIP endpoint."""
|
||||
# Allow empty string for initial creation (like HTTP API tools with empty URL)
|
||||
if not v.strip():
|
||||
return v
|
||||
|
||||
# E.164 format: +[1-9]\d{1,14}
|
||||
e164_pattern = r"^\+[1-9]\d{1,14}$"
|
||||
if not re.match(e164_pattern, v):
|
||||
|
||||
# SIP endpoint format: PJSIP/extension or SIP/extension
|
||||
sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$"
|
||||
|
||||
is_valid_e164 = re.match(e164_pattern, v)
|
||||
is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE)
|
||||
|
||||
if not (is_valid_e164 or is_valid_sip):
|
||||
raise ValueError(
|
||||
"Destination must be a valid E.164 phone number (e.g., +1234567890)"
|
||||
"Destination must be a valid E.164 phone number (e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)"
|
||||
)
|
||||
return v
|
||||
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ setup_logging()
|
|||
import asyncio
|
||||
import json
|
||||
import signal
|
||||
import time
|
||||
from typing import Dict, Optional, Set
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
|
@ -26,6 +27,11 @@ from api.constants import REDIS_URL
|
|||
from api.db import db_client
|
||||
from api.enums import CallType, OrganizationConfigurationKey, WorkflowRunMode
|
||||
from api.services.quota_service import check_dograh_quota_by_user_id
|
||||
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
|
||||
from api.services.telephony.transfer_event_protocol import (
|
||||
TransferEvent,
|
||||
TransferEventType,
|
||||
)
|
||||
|
||||
# Redis key pattern and TTL for channel-to-run mapping
|
||||
_CHANNEL_KEY_PREFIX = "ari:channel:"
|
||||
|
|
@ -61,6 +67,9 @@ class ARIConnection:
|
|||
|
||||
# Redis client for channel-to-run reverse mapping (lazy init)
|
||||
self._redis_client: Optional[aioredis.Redis] = None
|
||||
|
||||
# Transfer manager for handling call transfers (lazy init)
|
||||
self._call_transfer_manager = None
|
||||
|
||||
async def _get_redis(self) -> aioredis.Redis:
|
||||
"""Get Redis client instance (lazy init)."""
|
||||
|
|
@ -70,6 +79,12 @@ class ARIConnection:
|
|||
)
|
||||
return self._redis_client
|
||||
|
||||
async def _get_transfer_manager(self):
|
||||
"""Get transfer manager instance (lazy init)."""
|
||||
if not self._call_transfer_manager:
|
||||
self._call_transfer_manager = await get_call_transfer_manager()
|
||||
return self._call_transfer_manager
|
||||
|
||||
async def _set_channel_run(self, channel_id: str, workflow_run_id: str):
|
||||
"""Store channel_id -> workflow_run_id mapping in Redis."""
|
||||
r = await self._get_redis()
|
||||
|
|
@ -228,6 +243,9 @@ class ARIConnection:
|
|||
channel = event.get("channel", {})
|
||||
channel_id = channel.get("id", "unknown")
|
||||
channel_state = channel.get("state", "unknown")
|
||||
|
||||
# Log all events for each channel for debugging
|
||||
logger.debug(f"[ARI EVENT org={self.organization_id}] {event_type}: channel={channel_id}, state={channel_state}")
|
||||
|
||||
if event_type == "StasisStart":
|
||||
# Skip external media channels we created — they fire
|
||||
|
|
@ -255,7 +273,20 @@ class ARIConnection:
|
|||
)
|
||||
else:
|
||||
# Outbound call (state == "Up") — originated by us
|
||||
# Parse args to extract workflow context
|
||||
# Check if this is a transfer channel first
|
||||
if self._is_transfer_channel(app_args):
|
||||
transfer_id = self._extract_transfer_id(app_args)
|
||||
if transfer_id:
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] Transfer destination answered: "
|
||||
f"channel={channel_id}, transfer_id={transfer_id}"
|
||||
)
|
||||
asyncio.create_task(
|
||||
self._handle_transfer_answered(transfer_id, channel_id)
|
||||
)
|
||||
return
|
||||
|
||||
# Regular outbound call - parse args to extract workflow context
|
||||
args_dict = {}
|
||||
for arg in app_args:
|
||||
for pair in arg.split(","):
|
||||
|
|
@ -285,6 +316,10 @@ class ARIConnection:
|
|||
logger.info(
|
||||
f"[ARI org={self.organization_id}] StasisEnd: channel={channel_id}"
|
||||
)
|
||||
|
||||
# Check if this is a caller hangup during transfer
|
||||
# await self._handle_caller_hangup_during_transfer(channel_id) TODO: handle when caller ends call after transfer initiation
|
||||
|
||||
workflow_run_id = await self._get_channel_run(channel_id)
|
||||
if workflow_run_id:
|
||||
asyncio.create_task(
|
||||
|
|
@ -298,12 +333,21 @@ class ARIConnection:
|
|||
)
|
||||
|
||||
elif event_type == "ChannelDestroyed":
|
||||
cause = channel.get("cause", 0)
|
||||
cause_txt = channel.get("cause_txt", "unknown")
|
||||
cause = event.get("cause", 0)
|
||||
cause_txt = event.get("cause_txt", "unknown")
|
||||
tech_cause = event.get("tech_cause", "unknown")
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] ChannelDestroyed: "
|
||||
f"channel={channel_id}, cause={cause} ({cause_txt})"
|
||||
f"channel={channel_id}, cause={cause} ({cause_txt}), tech_cause = {tech_cause}"
|
||||
)
|
||||
|
||||
# Check if this is a transfer destination that failed
|
||||
transfer_id = await self._get_transfer_id_for_channel(channel_id)
|
||||
if transfer_id:
|
||||
failure_message = self._map_hangup_cause_to_message(cause, tech_cause, cause_txt)
|
||||
asyncio.create_task(
|
||||
self._handle_transfer_failed(transfer_id, channel_id, failure_message)
|
||||
)
|
||||
|
||||
elif event_type == "ChannelDtmfReceived":
|
||||
digit = event.get("digit", "")
|
||||
|
|
@ -580,7 +624,35 @@ class ARIConnection:
|
|||
call_id = ctx.get("call_id")
|
||||
ext_channel_id = ctx.get("ext_channel_id")
|
||||
bridge_id = ctx.get("bridge_id")
|
||||
transfer_state = ctx.get("transfer_state")
|
||||
|
||||
# Check if this is transfer-protected external channel
|
||||
if (transfer_state == "in-progress" and
|
||||
channel_id == ext_channel_id and
|
||||
ext_channel_id is not None):
|
||||
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] Transfer in progress - skipping full teardown "
|
||||
f"for external channel {channel_id}, preserving bridge {bridge_id} and caller {call_id}"
|
||||
)
|
||||
|
||||
# Update transfer state to complete
|
||||
ctx["transfer_state"] = "complete"
|
||||
await db_client.update_workflow_run(
|
||||
run_id=int(workflow_run_id), gathered_context=ctx
|
||||
)
|
||||
|
||||
# Clean up only Redis markers for external channel (partial cleanup)
|
||||
await self._delete_channel_run(channel_id)
|
||||
await self._delete_ext_channel(channel_id)
|
||||
|
||||
logger.info(
|
||||
f"[ARI org={self.organization_id}] Transfer cleanup complete - preserved caller {call_id} "
|
||||
f"in bridge {bridge_id}"
|
||||
)
|
||||
return
|
||||
|
||||
# Normal full teardown for non-transfer scenarios (transfer_state is None or not in-progress)
|
||||
# Delete the bridge first (removes channels from it)
|
||||
if bridge_id:
|
||||
await self._delete_bridge(bridge_id)
|
||||
|
|
@ -633,6 +705,129 @@ class ARIConnection:
|
|||
f"{response.status} {text}"
|
||||
)
|
||||
|
||||
# ======== CALL TRANSFER HELPER METHODS ========
|
||||
|
||||
def _map_hangup_cause_to_message(self, cause: int, tech_cause: str, cause_txt: str) -> str:
|
||||
"""Map Asterisk cause codes to user-friendly transfer failure messages."""
|
||||
if cause == 17 and tech_cause == "486": # User busy/declined
|
||||
return "The person declined the call or their line is busy."
|
||||
elif cause == 19 and tech_cause == "480": # No answer
|
||||
return "The transfer call was not answered. The person may be busy or unavailable right now."
|
||||
elif cause == 21: # Call rejected
|
||||
return "The transfer call failed to connect. There may be a network issue or the number is unavailable."
|
||||
else:
|
||||
return f"Transfer failed: {cause_txt}"
|
||||
|
||||
def _is_transfer_channel(self, app_args: list) -> bool:
|
||||
"""Check if appArgs indicate this is a transfer channel."""
|
||||
if not app_args:
|
||||
return False
|
||||
# Check if first arg is "transfer" (args are parsed as separate list items)
|
||||
is_transfer = len(app_args) > 0 and app_args[0] == "transfer"
|
||||
if is_transfer:
|
||||
logger.debug(f"[ARI org={self.organization_id}] Detected transfer channel with args: {app_args}")
|
||||
return is_transfer
|
||||
|
||||
def _extract_transfer_id(self, app_args: list) -> Optional[str]:
|
||||
"""Extract transfer_id from appArgs: ['transfer', '{transfer_id}', '{conf_name}']."""
|
||||
# Args are parsed as separate list items, so transfer_id is at index 1
|
||||
if len(app_args) > 1 and app_args[0] == "transfer":
|
||||
transfer_id = app_args[1]
|
||||
logger.debug(f"[ARI org={self.organization_id}] Extracted transfer_id: {transfer_id}")
|
||||
return transfer_id
|
||||
return None
|
||||
|
||||
async def _get_transfer_id_for_channel(self, channel_id: str) -> Optional[str]:
|
||||
"""Get transfer_id for a channel by checking Redis mapping."""
|
||||
try:
|
||||
r = await self._get_redis()
|
||||
transfer_id = await r.get(f"ari:transfer_channel:{channel_id}")
|
||||
logger.debug(f"[ARI Transfer] Looking up transfer_id for channel {channel_id}: {transfer_id}")
|
||||
return transfer_id
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI org={self.organization_id}] Error getting transfer ID for channel {channel_id}: {e}")
|
||||
return None
|
||||
|
||||
async def _store_transfer_channel_mapping(self, channel_id: str, transfer_id: str):
|
||||
"""Store channel->transfer mapping in Redis for event correlation."""
|
||||
try:
|
||||
r = await self._get_redis()
|
||||
await r.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI org={self.organization_id}] Error storing transfer channel mapping: {e}")
|
||||
|
||||
async def _handle_transfer_answered(self, transfer_id: str, destination_channel_id: str):
|
||||
"""Handle transfer destination channel answered - publish success event."""
|
||||
try:
|
||||
logger.info(
|
||||
f"[ARI Transfer org={self.organization_id}] Destination {destination_channel_id} "
|
||||
f"answered for transfer {transfer_id}"
|
||||
)
|
||||
|
||||
# Store channel mapping for potential future events
|
||||
await self._store_transfer_channel_mapping(destination_channel_id, transfer_id)
|
||||
|
||||
# Get transfer context
|
||||
transfer_manager = await self._get_transfer_manager()
|
||||
context = await transfer_manager.get_transfer_context(transfer_id)
|
||||
if not context:
|
||||
logger.error(
|
||||
f"[ARI Transfer org={self.organization_id}] No transfer context found for {transfer_id}"
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer org={self.organization_id}] Transfer {transfer_id} success: "
|
||||
f"caller={context.original_call_sid} -> destination={destination_channel_id}"
|
||||
)
|
||||
|
||||
# Publish transfer success event - this will trigger the bridge swap in serializer
|
||||
success_event = TransferEvent(
|
||||
type=TransferEventType.TRANSFER_ANSWERED,
|
||||
transfer_id=transfer_id,
|
||||
original_call_sid=context.original_call_sid,
|
||||
transfer_call_sid=destination_channel_id,
|
||||
conference_name=context.conference_name,
|
||||
message="Transfer destination answered",
|
||||
status="success",
|
||||
action="transfer_success",
|
||||
end_call=True,
|
||||
timestamp=time.time()
|
||||
)
|
||||
await transfer_manager.publish_transfer_event(success_event)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI Transfer org={self.organization_id}] Error handling transfer answer: {e}")
|
||||
# On error, publish failure event
|
||||
await self._handle_transfer_failed(transfer_id, destination_channel_id, f"Transfer processing error: {e}")
|
||||
|
||||
async def _handle_transfer_failed(self, transfer_id: str, channel_id: str, reason: str):
|
||||
"""Handle transfer failure - publish failure event."""
|
||||
try:
|
||||
logger.info(f"[ARI Transfer] Transfer {transfer_id} failed: {reason}")
|
||||
|
||||
# Get transfer context
|
||||
transfer_manager = await self._get_transfer_manager()
|
||||
context = await transfer_manager.get_transfer_context(transfer_id)
|
||||
|
||||
# Publish failure event
|
||||
failure_event = TransferEvent(
|
||||
type=TransferEventType.TRANSFER_FAILED,
|
||||
transfer_id=transfer_id,
|
||||
original_call_sid=context.original_call_sid if context else "",
|
||||
transfer_call_sid=channel_id,
|
||||
message=f"Transfer failed: {reason}",
|
||||
status="failed",
|
||||
action="transfer_failed",
|
||||
reason=reason,
|
||||
end_call=False,
|
||||
timestamp=time.time()
|
||||
)
|
||||
await transfer_manager.publish_transfer_event(failure_event)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI Transfer] Error handling transfer failure: {e}")
|
||||
|
||||
async def _delete_channel(self, channel_id: str):
|
||||
"""Delete (hang up) an ARI channel. Ignores 404 (already gone)."""
|
||||
|
||||
|
|
|
|||
|
|
@ -85,6 +85,20 @@ class CallTransferManager:
|
|||
except Exception as e:
|
||||
logger.error(f"Failed to remove transfer context: {e}")
|
||||
|
||||
async def store_transfer_channel_mapping(self, channel_id: str, transfer_id: str) -> None:
|
||||
"""Store channel->transfer mapping in Redis for event correlation.
|
||||
|
||||
Args:
|
||||
channel_id: ARI channel ID
|
||||
transfer_id: Transfer identifier
|
||||
"""
|
||||
try:
|
||||
redis = await self._get_redis()
|
||||
await redis.setex(f"ari:transfer_channel:{channel_id}", 300, transfer_id) # 5 minute TTL
|
||||
logger.debug(f"[Transfer Manager] Stored channel mapping: channel={channel_id}, transfer_id={transfer_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"[Transfer Manager] Error storing transfer channel mapping: {e}")
|
||||
|
||||
async def publish_transfer_event(self, event: TransferEvent) -> None:
|
||||
"""Publish transfer event to Redis channel.
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ The ARI WebSocket event listener runs as a separate process (ari_manager.py).
|
|||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
|
@ -349,8 +350,8 @@ class ARIProvider(TelephonyProvider):
|
|||
# ======== CALL TRANSFER METHODS ========
|
||||
|
||||
def supports_transfers(self) -> bool:
|
||||
"""ARI does not currently support call transfers."""
|
||||
return False
|
||||
"""ARI supports call transfers via bridge manipulation."""
|
||||
return True
|
||||
|
||||
async def transfer_call(
|
||||
self,
|
||||
|
|
@ -360,14 +361,123 @@ class ARIProvider(TelephonyProvider):
|
|||
timeout: int = 30,
|
||||
**kwargs: Any,
|
||||
) -> Dict[str, Any]:
|
||||
"""ARI call transfers are not yet implemented."""
|
||||
raise NotImplementedError("ARI provider does not support call transfers")
|
||||
"""Initiate ARI call transfer by originating destination channel.
|
||||
|
||||
This method returns immediately after originating the channel.
|
||||
The actual transfer completion is handled asynchronously via ARI events.
|
||||
|
||||
Args:
|
||||
destination: Destination phone number (SIP endpoint)
|
||||
transfer_id: Unique identifier for this transfer attempt
|
||||
conference_name: Conference name (unused in ARI, kept for interface compatibility)
|
||||
timeout: Transfer timeout in seconds
|
||||
**kwargs: Additional arguments
|
||||
|
||||
Returns:
|
||||
Dict containing:
|
||||
- call_sid: Destination channel ID
|
||||
- status: "initiated"
|
||||
- provider: "ari"
|
||||
- raw_response: Full ARI channel creation response
|
||||
"""
|
||||
if not self.validate_config():
|
||||
raise ValueError("ARI provider not properly configured")
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Initiating transfer {transfer_id} to {destination} "
|
||||
f"(timeout: {timeout}s)"
|
||||
)
|
||||
|
||||
# Import here to avoid circular dependency
|
||||
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
|
||||
from api.services.telephony.transfer_event_protocol import TransferContext
|
||||
|
||||
# Store transfer context for event correlation
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
context = TransferContext(
|
||||
transfer_id=transfer_id,
|
||||
call_sid=None, # Will be updated after channel creation
|
||||
target_number=destination,
|
||||
tool_uuid=kwargs.get("tool_uuid", ""),
|
||||
original_call_sid=kwargs.get("original_call_sid", ""),
|
||||
conference_name=conference_name,
|
||||
initiated_at=time.time()
|
||||
)
|
||||
await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10)
|
||||
|
||||
# Build SIP endpoint
|
||||
if destination.startswith("SIP/") or destination.startswith("PJSIP/"):
|
||||
sip_endpoint = destination
|
||||
else:
|
||||
sip_endpoint = f"PJSIP/{destination}"
|
||||
|
||||
# Build transfer appArgs for event correlation
|
||||
app_args = f"transfer,{transfer_id},{conference_name}"
|
||||
|
||||
try:
|
||||
# Build endpoint URL following existing pattern
|
||||
endpoint = f"{self.base_url}/channels"
|
||||
|
||||
# Prepare channel creation params following existing pattern
|
||||
params = {
|
||||
"endpoint": sip_endpoint,
|
||||
"app": self.app_name,
|
||||
"appArgs": app_args,
|
||||
"timeout": timeout, # Keep timeout for transfer calls
|
||||
}
|
||||
|
||||
# Originate destination channel using existing pattern
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
endpoint,
|
||||
params=params,
|
||||
auth=self._get_auth(),
|
||||
) as response:
|
||||
response_text = await response.text()
|
||||
|
||||
if response.status != 200:
|
||||
error_msg = f"ARI channel creation failed: {response.status} {response_text}"
|
||||
logger.error(f"[ARI Transfer] {error_msg}")
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
raise Exception(error_msg)
|
||||
|
||||
result = json.loads(response_text)
|
||||
|
||||
destination_channel_id = result.get("id", "")
|
||||
if not destination_channel_id:
|
||||
logger.error(f"[ARI Transfer] Failed to get channel ID from response: {result}")
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
raise Exception("Failed to create destination channel")
|
||||
|
||||
# Update transfer context with destination channel ID
|
||||
context.call_sid = destination_channel_id
|
||||
await call_transfer_manager.store_transfer_context(context, ttl=timeout + 10)
|
||||
|
||||
# Store transfer channel mapping for event correlation (works with any dialplan setup)
|
||||
await call_transfer_manager.store_transfer_channel_mapping(destination_channel_id, transfer_id)
|
||||
|
||||
logger.info(
|
||||
f"[ARI Transfer] Originated destination channel {destination_channel_id} "
|
||||
f"for transfer {transfer_id}"
|
||||
)
|
||||
|
||||
return {
|
||||
"call_sid": destination_channel_id,
|
||||
"status": "initiated",
|
||||
"provider": self.PROVIDER_NAME,
|
||||
"raw_response": result,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[ARI Transfer] Failed to originate transfer channel: {e}")
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
raise
|
||||
|
||||
# ======== ARI-SPECIFIC METHODS ========
|
||||
|
||||
async def hangup_channel(self, channel_id: str, reason: str = "normal") -> bool:
|
||||
"""Hang up an ARI channel."""
|
||||
endpoint = f"{self.base_url}/channels/{channel_id}"
|
||||
endpwoint = f"{self.base_url}/channels/{channel_id}"
|
||||
params = {"reason_code": reason}
|
||||
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -339,20 +339,42 @@ class CustomToolManager:
|
|||
)
|
||||
return
|
||||
|
||||
# Validate E.164 format
|
||||
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
|
||||
if not re.match(E164_PHONE_REGEX, destination):
|
||||
validation_error_result = {
|
||||
"status": "failed",
|
||||
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "invalid_destination",
|
||||
"end_call": True,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
# Validate destination format based on workflow run mode
|
||||
if workflow_run.mode == WorkflowRunMode.ARI.value:
|
||||
# For ARI provider, also accept SIP endpoints
|
||||
SIP_ENDPOINT_REGEX = r"^(PJSIP|SIP)\/[\w\-\.@]+$"
|
||||
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
|
||||
|
||||
is_valid_sip = re.match(SIP_ENDPOINT_REGEX, destination)
|
||||
is_valid_e164 = re.match(E164_PHONE_REGEX, destination)
|
||||
|
||||
if not (is_valid_sip or is_valid_e164):
|
||||
validation_error_result = {
|
||||
"status": "failed",
|
||||
"message": "I'm sorry, but the transfer destination appears to be invalid. Please contact support to verify the transfer settings.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "invalid_destination",
|
||||
"end_call": True,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
else:
|
||||
# For non-ARI providers (Twilio, etc), use E.164 validation
|
||||
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
|
||||
if not re.match(E164_PHONE_REGEX, destination):
|
||||
validation_error_result = {
|
||||
"status": "failed",
|
||||
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
|
||||
"action": "transfer_failed",
|
||||
"reason": "invalid_destination",
|
||||
"end_call": True,
|
||||
}
|
||||
await self._handle_transfer_result(
|
||||
validation_error_result, function_call_params, properties
|
||||
)
|
||||
return
|
||||
|
||||
if message_type == "custom" and custom_message:
|
||||
logger.info(f"Playing pre-transfer message: {custom_message}")
|
||||
|
|
@ -466,15 +488,15 @@ class CustomToolManager:
|
|||
transfer_event = None
|
||||
|
||||
finally:
|
||||
# Single cleanup point: stop hold music, unmute pipeline, remove context
|
||||
# Cleanup hold music and pipeline state
|
||||
# Transfer context cleanup is now handled by respective serializers
|
||||
logger.info(
|
||||
"Transfer wait ended, cleaning up hold music, pipeline state, and transfer context"
|
||||
"Transfer wait ended, cleaning up hold music and pipeline state"
|
||||
)
|
||||
hold_music_stop_event.set()
|
||||
if hold_music_task:
|
||||
await hold_music_task
|
||||
self._engine.set_mute_pipeline(False)
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
|
||||
# Handle result (after cleanup)
|
||||
if transfer_event:
|
||||
|
|
@ -516,8 +538,27 @@ class CustomToolManager:
|
|||
exception_result, function_call_params, properties
|
||||
)
|
||||
|
||||
finally:
|
||||
# Schedule background cleanup of transfer context after pipeline processing delay
|
||||
if 'transfer_id' in locals():
|
||||
asyncio.create_task(
|
||||
self._cleanup_transfer_context_delayed(transfer_id)
|
||||
)
|
||||
|
||||
return transfer_call_handler
|
||||
|
||||
async def _cleanup_transfer_context_delayed(self, transfer_id: str):
|
||||
"""Background task to clean up transfer context after pipeline processing delay."""
|
||||
try:
|
||||
# Wait for pipeline to process EndFrame(reason="transfer_call") in serializers
|
||||
await asyncio.sleep(1.0) # 1 second delay for async pipeline processing
|
||||
|
||||
call_transfer_manager = await get_call_transfer_manager()
|
||||
await call_transfer_manager.remove_transfer_context(transfer_id)
|
||||
logger.info(f"Background cleanup: removed transfer context {transfer_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Background cleanup error for transfer context {transfer_id}: {e}")
|
||||
|
||||
async def _handle_transfer_result(
|
||||
self, result: dict, function_call_params, properties
|
||||
):
|
||||
|
|
|
|||
|
|
@ -74,27 +74,27 @@ services:
|
|||
networks:
|
||||
- app-network
|
||||
|
||||
api:
|
||||
image: ${REGISTRY:-dograhai}/dograh-api:latest
|
||||
volumes:
|
||||
- shared-tmp:/tmp
|
||||
environment:
|
||||
# Core application config
|
||||
ENVIRONMENT: "local"
|
||||
LOG_LEVEL: "INFO"
|
||||
# api:
|
||||
# image: ${REGISTRY:-dograhai}/dograh-api:latest
|
||||
# volumes:
|
||||
# - shared-tmp:/tmp
|
||||
# environment:
|
||||
# # Core application config
|
||||
# ENVIRONMENT: "local"
|
||||
# LOG_LEVEL: "INFO"
|
||||
|
||||
# Replace this environment variable if you are using a custom
|
||||
# domain to host the stack
|
||||
BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
|
||||
# # Replace this environment variable if you are using a custom
|
||||
# # domain to host the stack
|
||||
# BACKEND_API_ENDPOINT: "${BACKEND_API_ENDPOINT:-http://localhost:8000}"
|
||||
|
||||
# Database configuration (using containerized postgres)
|
||||
DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
|
||||
# # Database configuration (using containerized postgres)
|
||||
# DATABASE_URL: "postgresql+asyncpg://postgres:postgres@postgres:5432/postgres"
|
||||
|
||||
# Redis configuration (using containerized redis)
|
||||
REDIS_URL: "redis://:redissecret@redis:6379"
|
||||
# # Redis configuration (using containerized redis)
|
||||
# REDIS_URL: "redis://:redissecret@redis:6379"
|
||||
|
||||
# Storage configuration - using local MinIO
|
||||
ENABLE_AWS_S3: "false"
|
||||
# # Storage configuration - using local MinIO
|
||||
# ENABLE_AWS_S3: "false"
|
||||
|
||||
# MinIO
|
||||
MINIO_ENDPOINT: "minio:9000"
|
||||
|
|
@ -112,10 +112,10 @@ services:
|
|||
# LANGFUSE_PUBLIC_KEY: ""
|
||||
# LANGFUSE_HOST: ""
|
||||
|
||||
# TURN server configuration (for WebRTC NAT traversal in remote server)
|
||||
# Uses time-limited credentials via TURN REST API (HMAC-SHA1)
|
||||
TURN_HOST: "${TURN_HOST:-}"
|
||||
TURN_SECRET: "${TURN_SECRET:-}"
|
||||
# # TURN server configuration (for WebRTC NAT traversal in remote server)
|
||||
# # Uses time-limited credentials via TURN REST API (HMAC-SHA1)
|
||||
# TURN_HOST: "${TURN_HOST:-}"
|
||||
# TURN_SECRET: "${TURN_SECRET:-}"
|
||||
|
||||
OSS_JWT_SECRET: "${OSS_JWT_SECRET:-ChangeMeInProduction}"
|
||||
|
||||
|
|
@ -182,32 +182,32 @@ services:
|
|||
networks:
|
||||
- app-network
|
||||
|
||||
coturn:
|
||||
image: coturn/coturn:4.8.0
|
||||
container_name: coturn
|
||||
restart: unless-stopped
|
||||
profiles: ["remote"]
|
||||
ports:
|
||||
- "3478:3478/udp"
|
||||
- "3478:3478/tcp"
|
||||
- "5349:5349/udp"
|
||||
- "5349:5349/tcp"
|
||||
- "49152-49200:49152-49200/udp"
|
||||
volumes:
|
||||
- ./turnserver.conf:/etc/coturn/turnserver.conf:ro
|
||||
command:
|
||||
- -c
|
||||
- /etc/coturn/turnserver.conf
|
||||
networks:
|
||||
- app-network
|
||||
# coturn:
|
||||
# image: coturn/coturn:4.8.0
|
||||
# container_name: coturn
|
||||
# restart: unless-stopped
|
||||
# profiles: ["remote"]
|
||||
# ports:
|
||||
# - "3478:3478/udp"
|
||||
# - "3478:3478/tcp"
|
||||
# - "5349:5349/udp"
|
||||
# - "5349:5349/tcp"
|
||||
# - "49152-49200:49152-49200/udp"
|
||||
# volumes:
|
||||
# - ./turnserver.conf:/etc/coturn/turnserver.conf:ro
|
||||
# command:
|
||||
# - -c
|
||||
# - /etc/coturn/turnserver.conf
|
||||
# networks:
|
||||
# - app-network
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
redis_data:
|
||||
minio-data:
|
||||
driver: local
|
||||
shared-tmp:
|
||||
driver: local
|
||||
# shared-tmp:
|
||||
# driver: local
|
||||
|
||||
networks:
|
||||
app-network:
|
||||
|
|
|
|||
2
pipecat
2
pipecat
|
|
@ -1 +1 @@
|
|||
Subproject commit 6aa0502a9834d536aba9589cec87d827e66f2fad
|
||||
Subproject commit 0ccb4f242c48b59ad34a586986bbc4a3dcec1d36
|
||||
4
ui/package-lock.json
generated
4
ui/package-lock.json
generated
|
|
@ -1,12 +1,12 @@
|
|||
{
|
||||
"name": "ui",
|
||||
"version": "1.13.0",
|
||||
"version": "1.14.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "ui",
|
||||
"version": "1.13.0",
|
||||
"version": "1.14.0",
|
||||
"dependencies": {
|
||||
"@dagrejs/dagre": "^1.1.4",
|
||||
"@hey-api/client-fetch": "^0.10.0",
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import { Input } from "@/components/ui/input";
|
|||
import { Label } from "@/components/ui/label";
|
||||
import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group";
|
||||
import { Textarea } from "@/components/ui/textarea";
|
||||
import { useState, useEffect } from "react";
|
||||
|
||||
import { type EndCallMessageType } from "../../config";
|
||||
|
||||
|
|
@ -37,20 +38,46 @@ export function TransferCallToolConfig({
|
|||
timeout,
|
||||
onTimeoutChange,
|
||||
}: TransferCallToolConfigProps) {
|
||||
// Basic E.164 validation pattern
|
||||
const [sipMode, setSipMode] = useState(() => /^(PJSIP|SIP)\//i.test(destination));
|
||||
|
||||
// Validation patterns
|
||||
const isValidPhoneNumber = (phone: string): boolean => {
|
||||
const e164Pattern = /^\+[1-9]\d{1,14}$/;
|
||||
return e164Pattern.test(phone);
|
||||
};
|
||||
|
||||
const phoneNumberError = destination && !isValidPhoneNumber(destination);
|
||||
const isValidSipEndpoint = (endpoint: string): boolean => {
|
||||
const sipPattern = /^(PJSIP|SIP)\/[\w\-\.@]+$/i;
|
||||
return sipPattern.test(endpoint);
|
||||
};
|
||||
|
||||
const getValidationError = (): string | null => {
|
||||
if (!destination) return null;
|
||||
|
||||
if (sipMode) {
|
||||
return isValidSipEndpoint(destination)
|
||||
? null
|
||||
: "Please enter a valid SIP endpoint (e.g., PJSIP/1234 or SIP/extension@domain.com)";
|
||||
} else {
|
||||
return isValidPhoneNumber(destination)
|
||||
? null
|
||||
: "Please enter a valid phone number in E.164 format (e.g., +1234567890)";
|
||||
}
|
||||
};
|
||||
|
||||
const destinationError = getValidationError();
|
||||
|
||||
const handleSipModeToggle = () => {
|
||||
setSipMode(!sipMode);
|
||||
onDestinationChange(""); // Clear destination when switching modes
|
||||
};
|
||||
|
||||
return (
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle>Transfer Call Configuration</CardTitle>
|
||||
<CardDescription>
|
||||
Configure call transfer settings (Twilio only)
|
||||
Configure call transfer settings. Supports phone numbers (Twilio) and SIP endpoints (Asterisk ARI).
|
||||
</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent className="space-y-6">
|
||||
|
|
@ -80,21 +107,31 @@ export function TransferCallToolConfig({
|
|||
</div>
|
||||
|
||||
<div className="grid gap-2 pt-4 border-t">
|
||||
<Label>Destination Phone Number</Label>
|
||||
<Label>Transfer Destination</Label>
|
||||
<Label className="text-xs text-muted-foreground">
|
||||
Phone number to transfer the call to (E.164 format with country code)
|
||||
{sipMode
|
||||
? "SIP endpoint to transfer the call to (e.g., PJSIP/1234 or SIP/extension@domain.com)"
|
||||
: "Phone number to transfer the call to (E.164 format with country code)"
|
||||
}
|
||||
</Label>
|
||||
<Input
|
||||
value={destination}
|
||||
onChange={(e) => onDestinationChange(e.target.value)}
|
||||
placeholder="+1234567890"
|
||||
className={phoneNumberError ? "border-red-500 focus:border-red-500" : ""}
|
||||
placeholder={sipMode ? "PJSIP/1234 or SIP/extension@domain.com" : "+1234567890"}
|
||||
className={destinationError ? "border-red-500 focus:border-red-500" : ""}
|
||||
/>
|
||||
{phoneNumberError && (
|
||||
{destinationError && (
|
||||
<Label className="text-xs text-red-500">
|
||||
Please enter a valid phone number in E.164 format (e.g., +1234567890)
|
||||
{destinationError}
|
||||
</Label>
|
||||
)}
|
||||
<button
|
||||
type="button"
|
||||
className="text-xs text-muted-foreground hover:text-foreground underline w-fit"
|
||||
onClick={handleSipModeToggle}
|
||||
>
|
||||
{sipMode ? "Use phone number instead" : "Use SIP endpoint instead"}
|
||||
</button>
|
||||
</div>
|
||||
|
||||
<div className="grid gap-4 pt-4 border-t">
|
||||
|
|
|
|||
|
|
@ -198,10 +198,14 @@ export default function ToolDetailPage() {
|
|||
|
||||
// Validation based on tool type
|
||||
if (tool.category === "transfer_call") {
|
||||
// Validate destination phone number for Transfer Call tools
|
||||
// Validate destination for Transfer Call tools (supports both E.164 and SIP endpoints)
|
||||
const e164Pattern = /^\+[1-9]\d{1,14}$/;
|
||||
if (!transferDestination || !e164Pattern.test(transferDestination)) {
|
||||
setError("Please enter a valid phone number in E.164 format (e.g., +1234567890)");
|
||||
const sipPattern = /^(PJSIP|SIP)\/[\w\-\.@]+$/i;
|
||||
const isValidE164 = e164Pattern.test(transferDestination);
|
||||
const isValidSip = sipPattern.test(transferDestination);
|
||||
|
||||
if (!transferDestination || (!isValidE164 && !isValidSip)) {
|
||||
setError("Please enter a valid phone number (E.164 format) or SIP endpoint (e.g., PJSIP/1234)");
|
||||
return;
|
||||
}
|
||||
} else if (tool.category !== "end_call") {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue