transfer call

This commit is contained in:
Sabiha Khan 2026-01-22 12:19:34 +05:30
parent 5d14d17ceb
commit 51adfdda66
41 changed files with 2633 additions and 167 deletions

View file

@ -112,6 +112,9 @@ class PipecatEngine:
self._embeddings_api_key: Optional[str] = embeddings_api_key
self._embeddings_model: Optional[str] = embeddings_model
self._embeddings_base_url: Optional[str] = embeddings_base_url
# Transfer state tracking - prevents auto hang-up during call transfers
self._transfer_in_progress: bool = False
async def _get_organization_id(self) -> Optional[int]:
"""Get and cache the organization ID from workflow run."""
@ -207,14 +210,14 @@ class PipecatEngine:
)
logger.info(f"Arguments: {function_call_params.arguments}")
# Perform variable extraction before transitioning to new node
await self._perform_variable_extraction_if_needed(self._current_node)
# Set context for the new node, so that when the function call result
# frame is received by LLMContextAggregator and an LLM generation
# is done, we have updated context and functions
await self.set_node(transition_to_node)
try:
# Perform variable extraction before transitioning to new node
await self._perform_variable_extraction_if_needed(self._current_node)
# Set context for the new node, so that when the function call result
# frame is received by LLMContextAggregator and an LLM generation
# is done, we have updated context and functions
await self.set_node(transition_to_node)
async def on_context_updated() -> None:
"""
@ -245,6 +248,7 @@ class PipecatEngine:
await function_call_params.result_callback(
result, properties=properties
)
except Exception as e:
logger.error(f"Error in transition function {name}: {str(e)}")
error_result = {"status": "error", "error": str(e)}
@ -277,6 +281,7 @@ class PipecatEngine:
async def calculate_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: safe_calculator")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
expr = function_call_params.arguments.get("expression", "")
result = safe_calculator(expr)
@ -292,6 +297,7 @@ class PipecatEngine:
) -> None:
logger.info(f"LLM Function Call EXECUTED: get_current_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
timezone = function_call_params.arguments.get("timezone", "UTC")
result = get_current_time(timezone)
@ -302,6 +308,7 @@ class PipecatEngine:
async def convert_time_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: convert_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
result = convert_time(
function_call_params.arguments.get("source_timezone"),
@ -332,6 +339,7 @@ class PipecatEngine:
async def retrieve_kb_func(function_call_params: FunctionCallParams) -> None:
logger.info("LLM Function Call EXECUTED: retrieve_from_knowledge_base")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
query = function_call_params.arguments.get("query", "")
organization_id = await self._get_organization_id()
@ -532,7 +540,7 @@ class PipecatEngine:
self._current_node, run_in_background=False
)
frame_to_push = CancelFrame() if abort_immediately else EndFrame()
frame_to_push = CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason)
# Apply disposition mapping - first try call_disposition if it is,
# extracted from the call conversation then fall back to reason
@ -705,6 +713,18 @@ class PipecatEngine:
f"Stasis connection set for immediate transfers: {connection.channel_id}"
)
def set_mute_pipeline(self, mute: bool) -> None:
"""Set the pipeline mute state.
This controls whether user input should be muted via the CallbackUserMuteStrategy.
When muted, the user's audio input will be blocked.
Args:
mute: True to mute user input, False to allow input
"""
logger.debug(f"Setting pipeline mute state to: {mute}")
self._mute_pipeline = mute
async def handle_llm_text_frame(self, text: str):
"""Accumulate LLM text frames to build reference text."""
self._current_llm_generation_reference_text += text

View file

@ -6,8 +6,11 @@ during workflow execution.
from __future__ import annotations
import asyncio
import re
from typing import TYPE_CHECKING, Any, Optional
import aiohttp
from loguru import logger
from api.db import db_client
@ -21,9 +24,24 @@ from api.services.workflow.tools.custom_tool import (
tool_to_function_schema,
)
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.frames.frames import FunctionCallResultProperties, TTSSpeakFrame
from pipecat.frames.frames import (
FunctionCallResultProperties,
TTSSpeakFrame,
OutputAudioRawFrame,
)
from pipecat.services.llm_service import FunctionCallParams
from pipecat.utils.enums import EndTaskReason
from pipecat.transports.websocket.fastapi import FastAPIWebsocketClient
from api.utils.hold_audio import load_hold_audio
from api.services.telephony.transfer_coordination import get_transfer_coordinator
from api.services.telephony.transfer_event_protocol import (
TransferEvent,
TransferContext,
TransferEventType,
)
from dograh.api.utils.common import get_backend_endpoints
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine import PipecatEngine
@ -138,6 +156,8 @@ class CustomToolManager:
"""
if tool.category == ToolCategory.END_CALL.value:
return self._create_end_call_handler(tool, function_name)
elif tool.category == ToolCategory.TRANSFER_CALL.value:
return self._create_transfer_call_handler(tool, function_name)
return self._create_http_tool_handler(tool, function_name)
@ -230,3 +250,505 @@ class CustomToolManager:
)
return end_call_handler
def _create_transfer_call_handler(self, tool: Any, function_name: str):
"""Create a handler function for a transfer call tool.
Args:
tool: The ToolModel instance
function_name: The function name used by the LLM
Returns:
Async handler function for the transfer call tool
"""
# Don't run LLM after starting transfer - we're handling async response
properties = FunctionCallResultProperties(run_llm=False)
async def transfer_call_handler(
function_call_params: FunctionCallParams,
) -> None:
logger.info(f"Transfer Call Tool EXECUTED: {function_name}")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
# Get the transfer call configuration
config = tool.definition.get("config", {})
destination = config.get("destination", "")
message_type = config.get("messageType", "none")
custom_message = config.get("customMessage", "")
timeout_seconds = config.get(
"timeout", 30
) # Default 30 seconds if not configured
# Validate destination phone number
if not destination or not destination.strip():
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but I don't have a phone number configured for the transfer. Please contact support to set up call transfer.",
"action": "transfer_failed",
"reason": "no_destination",
"end_call": True,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
# Validate E.164 format
e164_pattern = r"^\+[1-9]\d{1,14}$"
if not re.match(e164_pattern, destination):
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
"action": "transfer_failed",
"reason": "invalid_destination",
"end_call": True,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
# Provider validation handled by telephony endpoint
# Note: User muting and hold music are handled automatically by
# Play pre-transfer message if configured
if message_type == "custom" and custom_message:
logger.info(f"Playing pre-transfer message: {custom_message}")
await self._engine.task.queue_frame(TTSSpeakFrame(custom_message))
# Get original call information from Pipecat context
from pipecat.utils.context import get_current_call_sid
original_call_sid = get_current_call_sid()
caller_number = None # Skip caller number for now as requested
logger.info(f"Found original call context: call_id={original_call_sid}")
# Get organization ID for provider configuration
organization_id = await self.get_organization_id()
if not organization_id:
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but I can't determine which organization this call belongs to. Please contact support.",
"action": "transfer_failed",
"reason": "no_organization_id",
"end_call": False,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
# Prepare transfer request data
transfer_data = {
"destination": destination,
"organization_id": organization_id, # Required for provider configuration
"tool_call_id": function_call_params.tool_call_id, # Use LLM's tool call ID for pipeline coordination
"tool_uuid": tool.tool_uuid, # Add tool UUID for tracing and validation
"original_call_sid": original_call_sid, # Original caller's call SID
"caller_number": caller_number, # Original caller's phone number
}
# Initialize Redis-based transfer coordination
import httpx
import time
# Get backend endpoint URL
backend_url, _ = await get_backend_endpoints()
# Get transfer coordinator for Redis-based coordination
transfer_coordinator = await get_transfer_coordinator()
# Now initiate the transfer call
transfer_url = f"{backend_url}/api/v1/telephony/call-transfer"
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
transfer_url,
json=transfer_data,
headers={"Content-Type": "application/json"},
# Authentication headers added by provider if needed
)
if response.status_code == 200:
result_data = response.json()
logger.info(f"Transfer initiated successfully: {result_data}")
# Wait for webhook completion using standard Pipecat async pattern
logger.info(
f"Transfer call initiated for {destination}, waiting for webhook completion..."
)
# Start hold music during transfer waiting period
hold_music_stop_event = asyncio.Event()
hold_music_task = None
try:
# Mute the pipeline to prevent further LLM generations during transfer
logger.info("Muting pipeline during transfer call")
self._engine.set_mute_pipeline(True)
# Determine sample rate from transport (default to 8000Hz for Twilio)
sample_rate = 8000
if hasattr(self._engine.transport, "output") and hasattr(
self._engine.transport.output(), "sample_rate"
):
sample_rate = getattr(
self._engine.transport.output(), "sample_rate", 8000
)
logger.info(
f"Starting hold music at {sample_rate}Hz while waiting for transfer"
)
# Start hold music as background task
hold_music_task = asyncio.create_task(
self.play_hold_music_loop(
hold_music_stop_event, sample_rate
)
)
# Wait for transfer completion using Redis pub/sub
logger.info(
"Waiting for transfer completion via Redis pub/sub..."
)
transfer_event = (
await transfer_coordinator.wait_for_transfer_completion(
transfer_data["tool_call_id"], timeout_seconds
)
)
# Stop hold music and unmute pipeline
logger.info(
"Transfer completed, stopping hold music and unmuting pipeline"
)
hold_music_stop_event.set()
if hold_music_task:
await hold_music_task
self._engine.set_mute_pipeline(False)
if transfer_event:
# Get result from transfer event
final_result = transfer_event.to_result_dict()
# Get transfer context for caller number
transfer_context = (
await transfer_coordinator.get_transfer_context(
transfer_data["tool_call_id"]
)
)
if transfer_context and transfer_context.caller_number:
final_result["caller_number"] = (
transfer_context.caller_number
)
# Handle the transfer result and inform user appropriately
await self._handle_transfer_result(
final_result, function_call_params, properties
)
else:
# Handle timeout case
logger.error(
f"Transfer call timed out after {timeout_seconds} seconds"
)
# Create timeout result and handle it through the same flow
timeout_result = {
"status": "failed",
"message": "I'm sorry, but the call is taking longer than expected to connect. The person might not be available right now. Please try calling back later.",
"action": "transfer_failed",
"reason": "timeout",
"end_call": True,
}
await self._handle_transfer_result(
timeout_result, function_call_params, properties
)
except Exception as e:
logger.error(f"Error during transfer wait: {e}")
# Stop hold music and unmute pipeline on error
logger.info(
"Transfer error, stopping hold music and unmuting pipeline"
)
hold_music_stop_event.set()
if hold_music_task:
await hold_music_task
self._engine.set_mute_pipeline(False)
# Handle error case
error_result = {
"status": "failed",
"message": "I'm sorry, but there was an issue processing the transfer. Please try again.",
"action": "transfer_failed",
"reason": "system_error",
"end_call": True,
}
await self._handle_transfer_result(
error_result, function_call_params, properties
)
else:
error_data = (
response.json()
if response.content
else {"error": "Unknown error"}
)
logger.error(
f"Transfer initiation failed: {response.status_code} - {error_data}"
)
# No cleanup needed for Redis-based coordination
# Handle initiation failure with user-friendly message
initiation_failure_result = {
"status": "failed",
"message": "I'm sorry, but I'm having trouble setting up the call transfer right now. There might be a technical issue. Please try again later or contact support.",
"action": "transfer_failed",
"reason": "initiation_failed",
"end_call": True,
}
await self._handle_transfer_result(
initiation_failure_result, function_call_params, properties
)
except httpx.TimeoutException:
logger.error(f"Transfer call '{function_name}' HTTP request timed out")
# No cleanup needed for Redis-based coordination
# Handle HTTP timeout with user-friendly message
http_timeout_result = {
"status": "failed",
"message": "I'm sorry, but there seems to be a network issue preventing me from setting up the call transfer. Please try again in a moment.",
"action": "transfer_failed",
"reason": "network_timeout",
"end_call": True,
}
await self._handle_transfer_result(
http_timeout_result, function_call_params, properties
)
except Exception as e:
logger.error(
f"Transfer call tool '{function_name}' execution failed: {e}"
)
# No cleanup needed for Redis-based coordination
# Handle generic exception with user-friendly message
exception_result = {
"status": "failed",
"message": "I'm sorry, but something went wrong while trying to transfer your call. Please try again later or contact support if the problem persists.",
"action": "transfer_failed",
"reason": "execution_error",
"end_call": True,
}
await self._handle_transfer_result(
exception_result, function_call_params, properties
)
return transfer_call_handler
async def _handle_transfer_result(
self, result: dict, function_call_params, properties
):
"""Handle different transfer call outcomes and take appropriate action."""
action = result.get("action", "")
status = result.get("status", "")
message = result.get("message", "")
should_end_call = result.get("end_call", False)
logger.info(f"Handling transfer result: action={action}, status={status}")
if action == "transfer_success":
# Successful transfer - add original caller to conference and end pipeline
conference_id = result.get("conference_id")
original_call_sid = result.get("original_call_sid")
transfer_call_sid = result.get("transfer_call_sid")
logger.info(
f"Transfer successful! Conference: {conference_id}, Original: {original_call_sid}, Transfer: {transfer_call_sid}"
)
# First inform LLM of success (but don't continue call)
response_properties = FunctionCallResultProperties(
run_llm=False
) # We'll handle the transfer ourselves
await function_call_params.result_callback(
{
"status": "transfer_success",
"message": "Transfer successful - connecting to conference",
"conference_id": conference_id,
},
properties=response_properties,
)
await self._engine.end_call_with_reason(
EndTaskReason.TRANSFER_CALL.value, abort_immediately=False
)
elif action == "transfer_failed":
# Transfer failed - inform user via LLM and then end the call
reason = result.get("reason", "unknown")
logger.info(f"Transfer failed ({reason}), informing user and ending call")
# Use system message pattern to direct LLM response for transfer failure
# This is more reliable than function call results
from pipecat.frames.frames import LLMMessagesAppendFrame
# Create system message with clear instructions for transfer failure
failure_instruction = {
"role": "system",
"content": f"IMPORTANT: The transfer call has FAILED. Reason: {reason}. You must inform the customer about this failure using this message: '{message}' Then immediately say goodbye and end the conversation. Do NOT ask if they need anything else or continue the conversation. Do NOT continue with transfer language.",
}
# Push the system message to LLM context
await self._engine.task.queue_frame(
LLMMessagesAppendFrame([failure_instruction], run_llm=True)
)
# Also send the function call result for consistency
response_properties = FunctionCallResultProperties(
run_llm=False
) # LLM will be triggered by system message
await function_call_params.result_callback(
{"status": "transfer_failed", "reason": reason, "message": message},
properties=response_properties,
)
# Set appropriate disposition for analytics
disposition_map = {
"no_answer": "transfer_no_answer",
"busy": "transfer_busy",
"call_failed": "transfer_failed",
"timeout": "transfer_timeout",
"no_destination": "transfer_config_error",
"invalid_destination": "transfer_config_error",
"initiation_failed": "transfer_system_error",
"network_timeout": "transfer_system_error",
"execution_error": "transfer_system_error",
}
disposition = disposition_map.get(reason, "transfer_failed")
logger.info(
f"Setting disposition: {disposition} for transfer failure reason: {reason}"
)
# Give the LLM time to speak the message, then end the call with disposition
# We'll schedule the end call after a brief delay to allow TTS
logger.info("Scheduling call end after LLM delivers failure message")
# Import here to avoid circular dependencies
import asyncio
# Schedule call end after 3 seconds to allow LLM to speak
async def delayed_end_call():
import asyncio
await asyncio.sleep(3)
await self._engine.end_call_with_reason(
f"transfer_failed_{reason}", # Include specific reason in end reason
abort_immediately=False, # Allow any queued speech to complete
)
# Create task to end call asynchronously
asyncio.create_task(delayed_end_call())
elif action == "transfer_completed":
# This should no longer happen since we ignore "completed" status in webhook
# to avoid overriding successful transfers
logger.warning(
"Received unexpected 'transfer_completed' action - this should be ignored by webhook now"
)
logger.warning(
"If you see this message, there might be an issue with the webhook status filtering"
)
# For safety, treat it as a generic result without ending the call
await function_call_params.result_callback(result, properties=properties)
else:
# Unknown action, treat as generic success
logger.warning(f"Unknown transfer action: {action}, treating as success")
await function_call_params.result_callback(result, properties=properties)
async def play_hold_music_loop(
self, stop_event: asyncio.Event, sample_rate: int = 8000
):
"""Play hold music in a loop until stop event is triggered.
Args:
stop_event: Event to stop the hold music loop
sample_rate: Sample rate for the hold music (default 8000Hz for Twilio)
"""
try:
import os
# Path to hold music file based on sample rate
assets_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "assets"
)
# Select appropriate hold music file
if sample_rate == 16000:
hold_music_file = os.path.join(
assets_dir, "transfer_hold_ring_16000.wav"
)
else: # Default to 8000Hz for Twilio
hold_music_file = os.path.join(
assets_dir, "transfer_hold_ring_8000.wav"
)
logger.info(f"Starting hold music loop with file: {hold_music_file}")
# Load hold music audio data
hold_audio_data = load_hold_audio(hold_music_file, sample_rate)
if not hold_audio_data:
logger.error("Failed to load hold music data")
return
# Convert bytes to audio frames - each frame should be about 20ms worth of audio
# For 8000Hz: 20ms = 160 samples = 320 bytes (16-bit)
# For 16000Hz: 20ms = 320 samples = 640 bytes (16-bit)
frame_size = 320 if sample_rate == 8000 else 640
audio_data = hold_audio_data
total_length = len(audio_data)
position = 0
logger.info(
f"Hold music loaded: {total_length} bytes, frame size: {frame_size}"
)
while not stop_event.is_set():
# Extract audio chunk
if position + frame_size > total_length:
# Reached end of audio, loop back to beginning
position = 0
audio_chunk = audio_data[position : position + frame_size]
position += frame_size
# Create audio frame
audio_frame = OutputAudioRawFrame(
audio=audio_chunk,
sample_rate=sample_rate,
num_channels=1,
)
# Queue the frame
await self._engine.task.queue_frame(audio_frame)
# Sleep for frame duration (20ms)
await asyncio.sleep(0.02)
logger.info("Hold music loop stopped")
except Exception as e:
logger.error(f"Error in hold music loop: {e}")