dograh/api/services/workflow/pipecat_engine_custom_tools.py
2026-02-06 09:45:34 +05:30

400 lines
15 KiB
Python

"""Custom tool management for PipecatEngine.
This module handles fetching, registering, and executing user-defined tools
during workflow execution.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING, Any, Optional
from loguru import logger
from api.db import db_client
from api.enums import ToolCategory
from api.services.workflow.disposition_mapper import (
get_organization_id_from_workflow_run,
)
from api.services.workflow.pipecat_engine_utils import get_function_schema
from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
)
from api.services.workflow.transfer_event_protocol import (
TransferEventType,
wait_for_transfer_signal,
)
from api.utils.hold_audio import get_hold_audio_duration_ms, load_hold_audio
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.frames.frames import (
FunctionCallResultProperties,
OutputAudioRawFrame,
TTSSpeakFrame,
)
from pipecat.services.llm_service import FunctionCallParams
from pipecat.utils.enums import EndTaskReason
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine import PipecatEngine
class CustomToolManager:
"""Manager for custom tool registration and execution.
This class handles:
1. Fetching tools from the database based on tool UUIDs
2. Converting tools to LLM function schemas
3. Registering tool execution handlers with the LLM
4. Executing tools when invoked by the LLM
"""
def __init__(self, engine: "PipecatEngine") -> None:
self._engine = engine
self._organization_id: Optional[int] = None
async def get_organization_id(self) -> Optional[int]:
"""Get and cache the organization ID from workflow run."""
if self._organization_id is None:
self._organization_id = await get_organization_id_from_workflow_run(
self._engine._workflow_run_id
)
return self._organization_id
async def get_tool_schemas(self, tool_uuids: list[str]) -> list[FunctionSchema]:
"""Fetch custom tools and convert them to function schemas.
Args:
tool_uuids: List of tool UUIDs to fetch
Returns:
List of FunctionSchema objects for LLM
"""
organization_id = await self.get_organization_id()
if not organization_id:
logger.warning("Cannot fetch custom tools: organization_id not available")
return []
try:
tools = await db_client.get_tools_by_uuids(tool_uuids, organization_id)
schemas: list[FunctionSchema] = []
for tool in tools:
raw_schema = tool_to_function_schema(tool)
function_name = raw_schema["function"]["name"]
# Convert to FunctionSchema object for compatibility with update_llm_context
func_schema = get_function_schema(
function_name,
raw_schema["function"]["description"],
properties=raw_schema["function"]["parameters"].get(
"properties", {}
),
required=raw_schema["function"]["parameters"].get("required", []),
)
schemas.append(func_schema)
logger.debug(
f"Loaded {len(schemas)} custom tools for node: "
f"{[s.name for s in schemas]}"
)
return schemas
except Exception as e:
logger.error(f"Failed to fetch custom tools: {e}")
return []
async def register_handlers(self, tool_uuids: list[str]) -> None:
"""Register custom tool execution handlers with the LLM.
Args:
tool_uuids: List of tool UUIDs to register handlers for
"""
organization_id = await self.get_organization_id()
if not organization_id:
logger.warning(
"Cannot register custom tool handlers: organization_id not available"
)
return
try:
tools = await db_client.get_tools_by_uuids(tool_uuids, organization_id)
for tool in tools:
schema = tool_to_function_schema(tool)
function_name = schema["function"]["name"]
# Create and register the handler
handler = self._create_handler(tool, function_name)
self._engine.llm.register_function(function_name, handler)
logger.debug(
f"Registered custom tool handler: {function_name} "
f"(tool_uuid: {tool.tool_uuid})"
)
except Exception as e:
logger.error(f"Failed to register custom tool handlers: {e}")
def _create_handler(self, tool: Any, function_name: str):
"""Create a handler function for a tool based on its category.
Args:
tool: The ToolModel instance
function_name: The function name used by the LLM
Returns:
Async handler function for the tool
"""
if tool.category == ToolCategory.END_CALL.value:
return self._create_end_call_handler(tool, function_name)
if tool.category == ToolCategory.TRANSFER_CALL.value:
return self._create_transfer_call_handler(tool, function_name)
return self._create_http_tool_handler(tool, function_name)
def _create_http_tool_handler(self, tool: Any, function_name: str):
"""Create a handler function for an HTTP API tool.
Args:
tool: The ToolModel instance
function_name: The function name used by the LLM
Returns:
Async handler function for the HTTP API tool
"""
async def http_tool_handler(
function_call_params: FunctionCallParams,
) -> None:
logger.info(f"HTTP Tool EXECUTED: {function_name}")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
result = await execute_http_tool(
tool=tool,
arguments=function_call_params.arguments,
call_context_vars=self._engine._call_context_vars,
organization_id=self._organization_id,
)
await function_call_params.result_callback(result)
except Exception as e:
logger.error(f"HTTP tool '{function_name}' execution failed: {e}")
await function_call_params.result_callback(
{"status": "error", "error": str(e)}
)
return http_tool_handler
def _create_end_call_handler(self, tool: Any, function_name: str):
"""Create a handler function for an end call tool.
Args:
tool: The ToolModel instance
function_name: The function name used by the LLM
Returns:
Async handler function for the end call tool
"""
# Don't run LLM after end call - we're terminating
properties = FunctionCallResultProperties(run_llm=False)
async def end_call_handler(
function_call_params: FunctionCallParams,
) -> None:
logger.info(f"End Call Tool EXECUTED: {function_name}")
try:
# Get the end call configuration
config = tool.definition.get("config", {})
message_type = config.get("messageType", "none")
custom_message = config.get("customMessage", "")
# Send result callback first
await function_call_params.result_callback(
{"status": "success", "action": "ending_call"},
properties=properties,
)
if message_type == "custom" and custom_message:
# Queue the custom message to be spoken
logger.info(f"Playing custom goodbye message: {custom_message}")
await self._engine.task.queue_frame(TTSSpeakFrame(custom_message))
# End the call after the message (not immediately)
await self._engine.end_call_with_reason(
EndTaskReason.END_CALL_TOOL_REASON.value,
abort_immediately=False,
)
else:
# No message - end call immediately
logger.info("Ending call immediately (no goodbye message)")
await self._engine.end_call_with_reason(
EndTaskReason.END_CALL_TOOL_REASON.value, abort_immediately=True
)
except Exception as e:
logger.error(f"End call tool '{function_name}' execution failed: {e}")
# Still try to end the call even if there's an error
await self._engine.end_call_with_reason(
EndTaskReason.UNEXPECTED_ERROR.value, abort_immediately=True
)
return end_call_handler
def _create_transfer_call_handler(self, tool: Any, function_name: str):
"""Create a handler function for a transfer call tool.
Args:
tool: The ToolModel instance
function_name: The function name used by the LLM
Returns:
Async handler function for the transfer call tool
"""
async def play_hold_music_loop(stop_event: asyncio.Event) -> None:
"""Play hold music in a loop until stop_event is set."""
sample_rate = self._engine._audio_out_sample_rate
try:
hold_audio = load_hold_audio(sample_rate)
duration_ms = get_hold_audio_duration_ms(sample_rate)
duration_secs = duration_ms / 1000.0
logger.info(
f"Starting hold music loop at {sample_rate}Hz, "
f"duration={duration_secs:.2f}s per loop"
)
while not stop_event.is_set():
# Queue the hold audio frame
frame = OutputAudioRawFrame(
audio=hold_audio,
sample_rate=sample_rate,
num_channels=1,
)
await self._engine.task.queue_frame(frame)
# Wait for the audio to play or until stopped
try:
await asyncio.wait_for(
stop_event.wait(), timeout=duration_secs + 1.5
)
break # Stop event was set
except asyncio.TimeoutError:
pass # Continue looping
logger.info("Hold music loop stopped")
except Exception as e:
logger.error(f"Error playing hold music: {e}")
async def transfer_call_handler(
function_call_params: FunctionCallParams,
) -> None:
logger.info(f"Transfer Call Tool EXECUTED: {function_name}")
stop_hold_music = asyncio.Event()
hold_music_task: Optional[asyncio.Task] = None
try:
# Get the transfer call configuration
config = tool.definition.get("config", {})
transfer_number = config.get("transferNumber", "")
transfer_message = config.get("transferMessage", "")
if not transfer_number:
logger.error("Transfer number not configured")
await function_call_params.result_callback(
{"status": "error", "error": "Transfer number not configured"}
)
return
logger.info(f"Initiating transfer to: {transfer_number}")
# Mute pipeline before playing transfer message
self._engine.mute_pipeline()
# Play transfer message if configured
if transfer_message:
logger.info(f"Playing transfer message: {transfer_message}")
await self._engine.task.queue_frame(TTSSpeakFrame(transfer_message))
# Store transfer intent in gathered context
self._engine._gathered_context["transfer_requested"] = True
self._engine._gathered_context["transfer_number"] = transfer_number
# Start playing hold music in the background
hold_music_task = asyncio.create_task(
play_hold_music_loop(stop_hold_music)
)
# Wait for external signal to proceed with transfer (30s timeout)
workflow_run_id = self._engine._workflow_run_id
logger.info(
f"Waiting for transfer signal for workflow_run_id: {workflow_run_id}"
)
transfer_event = await wait_for_transfer_signal(
workflow_run_id=workflow_run_id,
timeout_seconds=8.0,
)
# Stop hold music
stop_hold_music.set()
if transfer_event is None:
# Timeout - transfer failed
logger.warning("Transfer signal timed out")
self._engine._gathered_context["transfer_status"] = "timed_out"
await function_call_params.result_callback(
{"status": "error", "error": "Transfer signal timed out"}
)
return
if transfer_event.type == TransferEventType.TRANSFER_CANCEL.value:
# Cancelled - transfer failed
logger.info("Transfer was cancelled")
self._engine._gathered_context["transfer_status"] = "cancelled"
await function_call_params.result_callback(
{"status": "error", "error": "Transfer was cancelled"}
)
return
# Success - proceed with transfer
logger.info("Transfer signal received - proceeding with transfer")
self._engine._gathered_context["transfer_status"] = "success"
# Lets send result callback so that timeout task is cancelled. Lets not
# run llm
await function_call_params.result_callback(
{"status": "error", "error": "Transfer was cancelled"},
properties=FunctionCallResultProperties(run_llm=False),
)
# Terminate the call after the call is added to the conference
await self._engine.end_call_with_reason(
EndTaskReason.CALL_TRANSFERRED.value,
abort_immediately=True,
)
except Exception as e:
logger.error(
f"Transfer call tool '{function_name}' execution failed: {e}"
)
await function_call_params.result_callback(
{"status": "error", "error": str(e)}
)
finally:
# Ensure hold music is stopped
stop_hold_music.set()
if hold_music_task and not hold_music_task.done():
hold_music_task.cancel()
try:
await hold_music_task
except asyncio.CancelledError:
pass
return transfer_call_handler