Merge branch 'main' into feat/call-tags

This commit is contained in:
Abhishek Kumar 2026-02-18 13:30:07 +05:30
commit 5c4cf14b07
117 changed files with 7365 additions and 5193 deletions

View file

@ -15,11 +15,9 @@ from pipecat.frames.frames import (
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport
from pipecat.utils.enums import EndTaskReason
if TYPE_CHECKING:
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.frames.frames import Frame
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
@ -61,7 +59,6 @@ class PipecatEngine:
task: Optional[PipelineTask] = None,
llm: Optional["LLMService"] = None,
context: Optional[LLMContext] = None,
transport: Optional[BaseTransport] = None,
workflow: WorkflowGraph,
call_context_vars: dict,
workflow_run_id: Optional[int] = None,
@ -75,7 +72,6 @@ class PipecatEngine:
self.task = task
self.llm = llm
self.context = context
self.transport = transport
self.workflow = workflow
self._call_context_vars = call_context_vars
self._workflow_run_id = workflow_run_id
@ -86,9 +82,6 @@ class PipecatEngine:
self._gathered_context: dict = {}
self._user_response_timeout_task: Optional[asyncio.Task] = None
# Stasis connection for immediate transfers
self._stasis_connection: Optional["StasisRTPConnection"] = None
# Will be set later in initialize() when we have
# access to _context
self._variable_extraction_manager = None
@ -113,6 +106,9 @@ class PipecatEngine:
self._embeddings_model: Optional[str] = embeddings_model
self._embeddings_base_url: Optional[str] = embeddings_base_url
# Audio configuration (set via set_audio_config from _run_pipeline)
self._audio_config = None
async def _get_organization_id(self) -> Optional[int]:
"""Get and cache the organization ID from workflow run."""
if self._custom_tool_manager:
@ -207,15 +203,14 @@ class PipecatEngine:
)
logger.info(f"Arguments: {function_call_params.arguments}")
# Perform variable extraction and call tags extraction before transitioning to new node
await self._perform_variable_extraction_if_needed(self._current_node)
await self._perform_call_tags_extraction_if_needed(self._current_node)
# Set context for the new node, so that when the function call result
# frame is received by LLMContextAggregator and an LLM generation
# is done, we have updated context and functions
await self.set_node(transition_to_node)
try:
# Perform variable extraction before transitioning to new node
await self._perform_variable_extraction_if_needed(self._current_node)
# Set context for the new node, so that when the function call result
# frame is received by LLMContextAggregator and an LLM generation
# is done, we have updated context and functions
await self.set_node(transition_to_node)
async def on_context_updated() -> None:
"""
@ -246,6 +241,7 @@ class PipecatEngine:
await function_call_params.result_callback(
result, properties=properties
)
except Exception as e:
logger.error(f"Error in transition function {name}: {str(e)}")
error_result = {"status": "error", "error": str(e)}
@ -278,6 +274,7 @@ class PipecatEngine:
async def calculate_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: safe_calculator")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
expr = function_call_params.arguments.get("expression", "")
result = safe_calculator(expr)
@ -293,6 +290,7 @@ class PipecatEngine:
) -> None:
logger.info(f"LLM Function Call EXECUTED: get_current_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
timezone = function_call_params.arguments.get("timezone", "UTC")
result = get_current_time(timezone)
@ -303,6 +301,7 @@ class PipecatEngine:
async def convert_time_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: convert_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
result = convert_time(
function_call_params.arguments.get("source_timezone"),
@ -333,6 +332,7 @@ class PipecatEngine:
async def retrieve_kb_func(function_call_params: FunctionCallParams) -> None:
logger.info("LLM Function Call EXECUTED: retrieve_from_knowledge_base")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
query = function_call_params.arguments.get("query", "")
organization_id = await self._get_organization_id()
@ -584,7 +584,9 @@ class PipecatEngine:
self._current_node, run_in_background=False
)
frame_to_push = CancelFrame() if abort_immediately else EndFrame()
frame_to_push = (
CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason)
)
# Apply disposition mapping - first try call_disposition if it is,
# extracted from the call conversation then fall back to reason
@ -740,22 +742,21 @@ class PipecatEngine:
"""
self.task = task
def set_stasis_connection(
self, connection: Optional["StasisRTPConnection"]
) -> None:
"""Set the Stasis RTP connection for immediate transfers.
def set_audio_config(self, audio_config) -> None:
"""Set the audio configuration for the pipeline."""
self._audio_config = audio_config
This allows the engine to initiate transfers immediately when XFER
disposition is detected, without waiting for pipeline shutdown.
def set_mute_pipeline(self, mute: bool) -> None:
"""Set the pipeline mute state.
This controls whether user input should be muted via the CallbackUserMuteStrategy.
When muted, the user's audio input will be blocked.
Args:
connection: The StasisRTPConnection instance, or None for non-Stasis transports
mute: True to mute user input, False to allow input
"""
self._stasis_connection = connection
if connection:
logger.debug(
f"Stasis connection set for immediate transfers: {connection.channel_id}"
)
logger.debug(f"Setting pipeline mute state to: {mute}")
self._mute_pipeline = mute
async def handle_llm_text_frame(self, text: str):
"""Accumulate LLM text frames to build reference text."""

View file

@ -6,12 +6,20 @@ during workflow execution.
from __future__ import annotations
import asyncio
import re
import time
import uuid
from typing import TYPE_CHECKING, Any, Optional
from loguru import logger
from api.constants import APP_ROOT_DIR
from api.db import db_client
from api.enums import ToolCategory
from api.enums import ToolCategory, WorkflowRunMode
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.factory import get_telephony_provider
from api.services.telephony.transfer_event_protocol import TransferContext
from api.services.workflow.disposition_mapper import (
get_organization_id_from_workflow_run,
)
@ -20,8 +28,13 @@ from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
)
from api.utils.hold_audio import load_hold_audio
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.frames.frames import FunctionCallResultProperties, TTSSpeakFrame
from pipecat.frames.frames import (
FunctionCallResultProperties,
OutputAudioRawFrame,
TTSSpeakFrame,
)
from pipecat.services.llm_service import FunctionCallParams
from pipecat.utils.enums import EndTaskReason
@ -115,8 +128,15 @@ class CustomToolManager:
function_name = schema["function"]["name"]
# Create and register the handler
handler = self._create_handler(tool, function_name)
self._engine.llm.register_function(function_name, handler)
handler, disable_timeout, cancel_on_interruption = self._create_handler(
tool, function_name
)
self._engine.llm.register_function(
function_name,
handler,
cancel_on_interruption=cancel_on_interruption,
disable_timeout=disable_timeout,
)
logger.debug(
f"Registered custom tool handler: {function_name} "
@ -136,10 +156,21 @@ class CustomToolManager:
Returns:
Async handler function for the tool
"""
if tool.category == ToolCategory.END_CALL.value:
return self._create_end_call_handler(tool, function_name)
# Whether to disable function call timeout
disable_timeout = False
cancel_on_interruption = True
return self._create_http_tool_handler(tool, function_name)
if tool.category == ToolCategory.END_CALL.value:
cancel_on_interruption = False
handler = self._create_end_call_handler(tool, function_name)
elif tool.category == ToolCategory.TRANSFER_CALL.value:
disable_timeout = True
cancel_on_interruption = False
handler = self._create_transfer_call_handler(tool, function_name)
else:
handler = self._create_http_tool_handler(tool, function_name)
return handler, disable_timeout, cancel_on_interruption
def _create_http_tool_handler(self, tool: Any, function_name: str):
"""Create a handler function for an HTTP API tool.
@ -230,3 +261,337 @@ class CustomToolManager:
)
return end_call_handler
def _create_transfer_call_handler(self, tool: Any, function_name: str):
"""Create a handler function for a transfer call tool.
Args:
tool: The ToolModel instance
function_name: The function name used by the LLM
Returns:
Async handler function for the transfer call tool
"""
properties = FunctionCallResultProperties(run_llm=False)
async def transfer_call_handler(
function_call_params: FunctionCallParams,
) -> None:
logger.info(f"Transfer Call Tool EXECUTED: {function_name}")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
# Get the transfer call configuration
config = tool.definition.get("config", {})
destination = config.get("destination", "")
message_type = config.get("messageType", "none")
custom_message = config.get("customMessage", "")
timeout_seconds = config.get(
"timeout", 30
) # Default 30 seconds if not configured
# Check if this is a WebRTC call - transfers are not supported
workflow_run = await db_client.get_workflow_run_by_id(
self._engine._workflow_run_id
)
if workflow_run.mode in [
WorkflowRunMode.WEBRTC.value,
WorkflowRunMode.SMALLWEBRTC.value,
]:
webrtc_error_result = {
"status": "failed",
"message": "I'm sorry, but call transfers are not available for web calls. Please try a telephony call.",
"action": "transfer_failed",
"reason": "webrtc_not_supported",
"end_call": True,
}
await self._handle_transfer_result(
webrtc_error_result, function_call_params, properties
)
return
# Validate destination phone number
if not destination or not destination.strip():
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but I don't have a phone number configured for the transfer. Please contact support to set up call transfer.",
"action": "transfer_failed",
"reason": "no_destination",
"end_call": True,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
# Validate E.164 format
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
if not re.match(E164_PHONE_REGEX, destination):
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
"action": "transfer_failed",
"reason": "invalid_destination",
"end_call": True,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
if message_type == "custom" and custom_message:
logger.info(f"Playing pre-transfer message: {custom_message}")
await self._engine.task.queue_frame(TTSSpeakFrame(custom_message))
# Get organization ID for provider configuration
organization_id = await self.get_organization_id()
if not organization_id:
validation_error_result = {
"status": "failed",
"message": "I'm sorry, there's an issue with this call transfer. Please contact support.",
"action": "transfer_failed",
"reason": "no_organization_id",
"end_call": False,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
# Get telephony provider directly (no HTTP round-trip)
provider = await get_telephony_provider(organization_id)
if not provider.supports_transfers() or not provider.validate_config():
validation_error_result = {
"status": "failed",
"message": "I'm sorry, there's an issue with this call transfer. Please contact support.",
"action": "transfer_failed",
"reason": "provider_does_not_support_transfer",
"end_call": False,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
original_call_sid = workflow_run.gathered_context.get("call_id")
# Generate a unique transfer ID for tracking this transfer
transfer_id = str(uuid.uuid4())
# Compute conference name from original call SID
conference_name = f"transfer-{original_call_sid}"
# Mute the pipeline
self._engine.set_mute_pipeline(True)
# Initiate transfer via provider with inline TwiML
transfer_result = await provider.transfer_call(
destination=destination,
transfer_id=transfer_id,
conference_name=conference_name,
timeout=timeout_seconds,
)
call_sid = transfer_result.get("call_sid")
logger.info(f"Transfer call initiated successfully: {call_sid}")
# TODO: Possible race here between saving the transfer context
# and getting a callback response from Twilio? Should we store_transfer_context
# before sending request to Twilio and update the transfer context afterwards?
# Store transfer context in Redis
call_transfer_manager = await get_call_transfer_manager()
transfer_context = TransferContext(
transfer_id=transfer_id,
call_sid=call_sid,
target_number=destination,
tool_uuid=tool.tool_uuid,
original_call_sid=original_call_sid,
conference_name=conference_name,
initiated_at=time.time(),
)
await call_transfer_manager.store_transfer_context(transfer_context)
# Wait for status callback completion using Redis pub/sub
logger.info(
f"Transfer call initiated for {destination} (transfer_id={transfer_id}), waiting for completion..."
)
# Start hold music during transfer waiting period
hold_music_stop_event = asyncio.Event()
hold_music_task = None
try:
# Use audio config for sample rate (set during pipeline setup)
sample_rate = (
self._engine._audio_config.transport_out_sample_rate
if self._engine._audio_config
else 8000
)
logger.info(
f"Starting hold music at {sample_rate}Hz while waiting for transfer"
)
# Start hold music as background task
hold_music_task = asyncio.create_task(
self.play_hold_music_loop(hold_music_stop_event, sample_rate)
)
# Wait for transfer completion using Redis pub/sub
logger.info("Waiting for transfer completion via Redis pub/sub...")
transfer_event = (
await call_transfer_manager.wait_for_transfer_completion(
transfer_id, timeout_seconds
)
)
except Exception as e:
logger.error(f"Error during transfer wait: {e}")
transfer_event = None
finally:
# Single cleanup point: stop hold music, unmute pipeline, remove context
logger.info(
"Transfer wait ended, cleaning up hold music, pipeline state, and transfer context"
)
hold_music_stop_event.set()
if hold_music_task:
await hold_music_task
self._engine.set_mute_pipeline(False)
await call_transfer_manager.remove_transfer_context(transfer_id)
# Handle result (after cleanup)
if transfer_event:
final_result = transfer_event.to_result_dict()
await self._handle_transfer_result(
final_result, function_call_params, properties
)
else:
logger.error(
f"Transfer call timed out or failed after {timeout_seconds} seconds"
)
timeout_result = {
"status": "failed",
"message": "I'm sorry, but the call is taking longer than expected to connect. The person might not be available right now. Please try calling back later.",
"action": "transfer_failed",
"reason": "timeout",
"end_call": True,
}
await self._handle_transfer_result(
timeout_result, function_call_params, properties
)
except Exception as e:
logger.error(
f"Transfer call tool '{function_name}' execution failed: {e}"
)
self._engine.set_mute_pipeline(False)
# Handle generic exception with user-friendly message
exception_result = {
"status": "failed",
"message": "I'm sorry, but something went wrong while trying to transfer your call. Please try again later or contact support if the problem persists.",
"action": "transfer_failed",
"reason": "execution_error",
"end_call": True,
}
await self._handle_transfer_result(
exception_result, function_call_params, properties
)
return transfer_call_handler
async def _handle_transfer_result(
self, result: dict, function_call_params, properties
):
"""Handle different transfer call outcomes and take appropriate action."""
action = result.get("action", "")
status = result.get("status", "")
logger.info(f"Handling transfer result: action={action}, status={status}")
if action == "transfer_success":
# Successful transfer - add original caller to conference and end pipeline
conference_id = result.get("conference_id")
original_call_sid = result.get("original_call_sid")
transfer_call_sid = result.get("transfer_call_sid")
logger.info(
f"Transfer successful! Conference: {conference_id}, Original: {original_call_sid}, Transfer: {transfer_call_sid}"
)
# Inform LLM of success and end the call with Transfer call reason
response_properties = FunctionCallResultProperties(run_llm=False)
await function_call_params.result_callback(
{
"status": "transfer_success",
"message": "Transfer successful - connecting to conference",
"conference_id": conference_id,
},
properties=response_properties,
)
await self._engine.end_call_with_reason(
EndTaskReason.TRANSFER_CALL.value, abort_immediately=False
)
elif action == "transfer_failed":
# Transfer failed - inform user via LLM and then end the call
reason = result.get("reason", "unknown")
logger.info(f"Transfer failed ({reason}), informing user")
await function_call_params.result_callback(
{
"status": "transfer_failed",
"reason": reason,
"message": "Transfer failed",
}
)
else:
# Unknown action, treat as generic success
logger.warning(f"Unknown transfer action: {action}, treating as success")
await function_call_params.result_callback(result)
async def play_hold_music_loop(
self, stop_event: asyncio.Event, sample_rate: int = 8000
):
"""Play hold music in a loop until stop event is triggered.
Args:
stop_event: Event to stop the hold music loop
sample_rate: Sample rate for the hold music (default 8000Hz for Twilio)
"""
try:
# Path to hold music file based on sample rate
hold_music_file = (
APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav"
)
hold_audio_data = load_hold_audio(hold_music_file, sample_rate)
num_samples = len(hold_audio_data) // 2
duration = int(num_samples / sample_rate)
logger.info(f"Starting hold music loop with file: {hold_music_file}")
while not stop_event.is_set():
# Queue the hold audio frame
frame = OutputAudioRawFrame(
audio=hold_audio_data,
sample_rate=sample_rate,
num_channels=1,
)
await self._engine.task.queue_frame(frame)
# Wait for the audio to play or until stopped
try:
await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5)
break # Stop event was set
except asyncio.TimeoutError:
pass # Continue looping
logger.info("Hold music loop stopped")
except Exception as e:
logger.error(f"Error in hold music loop: {e}")