feat: add template variable rendering for transfer call destination

This commit is contained in:
Abhishek Kumar 2026-07-02 13:35:25 +05:30
parent 65d46bc313
commit 9966940624
8 changed files with 279 additions and 142 deletions

View file

@ -8,7 +8,6 @@ when the same schema is surfaced through MCP or SDK authoring flows.
from __future__ import annotations
import re
from datetime import datetime
from typing import Annotated, Any, Dict, List, Literal, Optional, Union
@ -186,8 +185,8 @@ class TransferCallConfig(BaseModel):
destination: str = Field(
description=(
"Phone number or SIP endpoint to transfer the call to, e.g. "
"+1234567890 or PJSIP/1234."
"Phone number, SIP endpoint, or template to transfer the call to, e.g. "
"+1234567890, PJSIP/1234, or {{initial_context.transfer_destination}}."
)
)
messageType: Literal["none", "custom", "audio"] = Field(
@ -206,27 +205,6 @@ class TransferCallConfig(BaseModel):
description="Maximum seconds to wait for the destination to answer.",
)
@field_validator("destination")
@classmethod
def validate_destination(cls, v: str) -> str:
"""Validate that destination is a valid E.164 phone number or SIP endpoint."""
if not v.strip():
return v
e164_pattern = r"^\+[1-9]\d{1,14}$"
sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$"
is_valid_e164 = re.match(e164_pattern, v)
is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE)
if not (is_valid_e164 or is_valid_sip):
raise ValueError(
"Destination must be a valid E.164 phone number "
"(e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)"
)
return v
class McpToolConfig(BaseModel):
"""Configuration for a customer MCP server tool definition."""

View file

@ -131,7 +131,9 @@ async def run_completion_handlers(
f"Integration completion handler failed for package "
f"{package.name!r}: {exc}"
)
results[f"integration_{package.name}"] = {"error": "completion_handler_failed"}
results[f"integration_{package.name}"] = {
"error": "completion_handler_failed"
}
continue
if package_result:
results.update(package_result)

View file

@ -7,7 +7,6 @@ during workflow execution.
from __future__ import annotations
import asyncio
import re
import time
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional
@ -32,12 +31,32 @@ from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
)
from api.utils.template_renderer import render_template
if TYPE_CHECKING:
from api.services.workflow.mcp_tool_session import McpToolSession
from api.services.workflow.pipecat_engine import PipecatEngine
def _render_transfer_destination(
destination_template: Any,
call_context_vars: Optional[Dict[str, Any]],
gathered_context_vars: Optional[Dict[str, Any]],
) -> str:
"""Resolve a transfer destination template into a concrete provider target."""
initial_context = dict(call_context_vars or {})
render_context: Dict[str, Any] = {
**initial_context,
"initial_context": initial_context,
"gathered_context": dict(gathered_context_vars or {}),
}
rendered = render_template(destination_template, render_context)
if rendered is None:
return ""
return str(rendered).strip()
def get_function_schema(
function_name: str,
description: str,
@ -541,6 +560,12 @@ class CustomToolManager:
)
return
destination = _render_transfer_destination(
destination,
self._engine._call_context_vars,
self._engine._gathered_context,
)
# Validate destination phone number
if not destination or not destination.strip():
validation_error_result = {
@ -554,41 +579,6 @@ class CustomToolManager:
)
return
# Validate destination format based on workflow run mode
if workflow_run.mode == WorkflowRunMode.ARI.value:
# For ARI provider, also accept SIP endpoints
SIP_ENDPOINT_REGEX = r"^(PJSIP|SIP)\/[\w\-\.@]+$"
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
is_valid_sip = re.match(SIP_ENDPOINT_REGEX, destination)
is_valid_e164 = re.match(E164_PHONE_REGEX, destination)
if not (is_valid_sip or is_valid_e164):
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but the transfer destination appears to be invalid. Please contact support to verify the transfer settings.",
"action": "transfer_failed",
"reason": "invalid_destination",
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
else:
# For non-ARI providers (Twilio, etc), use E.164 validation
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
if not re.match(E164_PHONE_REGEX, destination):
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
"action": "transfer_failed",
"reason": "invalid_destination",
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
played = await self._play_config_message(config)
if played:
self._engine._queued_speech_mute_state = "waiting"
@ -648,12 +638,27 @@ class CustomToolManager:
self._engine.set_mute_pipeline(True)
# Initiate transfer via provider with inline TwiML
transfer_result = await provider.transfer_call(
destination=destination,
transfer_id=transfer_id,
conference_name=conference_name,
timeout=timeout_seconds,
)
try:
transfer_result = await provider.transfer_call(
destination=destination,
transfer_id=transfer_id,
conference_name=conference_name,
timeout=timeout_seconds,
)
except Exception as e:
logger.error(f"Transfer provider failed: {e}")
self._engine.set_mute_pipeline(False)
await call_transfer_manager.remove_transfer_context(transfer_id)
provider_error_result = {
"status": "failed",
"message": f"Transfer provider failed: {e}",
"action": "transfer_failed",
"reason": "provider_error",
}
await self._handle_transfer_result(
provider_error_result, function_call_params, properties
)
return
call_sid = transfer_result.get("call_sid")
logger.info(f"Transfer call initiated successfully: {call_sid}")
@ -809,7 +814,7 @@ class CustomToolManager:
{
"status": "transfer_failed",
"reason": reason,
"message": "Transfer failed",
"message": result.get("message") or "Transfer failed",
}
)
else:

View file

@ -8,6 +8,7 @@ This module tests:
"""
from dataclasses import dataclass
from types import SimpleNamespace
from typing import Any, Dict
from unittest.mock import AsyncMock, Mock, patch
@ -27,6 +28,7 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.services.llm_service import FunctionCallParams
from api.enums import WorkflowRunMode
from api.services.workflow.pipecat_engine_custom_tools import get_function_schema
from api.services.workflow.tools.custom_tool import (
_coerce_parameter_value,
@ -1157,6 +1159,186 @@ class TestCustomToolManagerUnit:
# Verify result was returned
assert result_received["status"] == "success"
@pytest.mark.asyncio
async def test_transfer_call_renders_destination_from_initial_context(self):
"""Transfer call tools resolve destination templates before provider calls."""
from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager
mock_engine = Mock()
mock_engine._workflow_run_id = 1
mock_engine._call_context_vars = {
"transfer_destination": "+14155550123",
}
mock_engine._gathered_context = {}
mock_engine._fetch_recording_audio = None
mock_engine._audio_config = SimpleNamespace(transport_out_sample_rate=8000)
mock_engine._transport_output = SimpleNamespace(queue_frame=AsyncMock())
mock_engine._get_organization_id = AsyncMock(return_value=1)
mock_engine.set_mute_pipeline = Mock()
mock_engine.end_call_with_reason = AsyncMock()
manager = CustomToolManager(mock_engine)
tool = MockToolModel(
tool_uuid="transfer-tool-uuid",
name="Transfer Call",
description="Transfer the caller",
category="transfer_call",
definition={
"schema_version": 1,
"type": "transfer_call",
"config": {
"destination": "{{initial_context.transfer_destination}}",
"timeout": 30,
},
},
)
handler, _timeout_secs = manager._create_handler(tool, "transfer_call")
workflow_run = SimpleNamespace(
mode=WorkflowRunMode.TWILIO.value,
gathered_context={"call_id": "caller-call-sid"},
)
provider = Mock()
provider.supports_transfers.return_value = True
provider.validate_config.return_value = True
provider.transfer_call = AsyncMock(return_value={"call_sid": "dest-call-sid"})
transfer_event = Mock()
transfer_event.to_result_dict.return_value = {
"status": "failed",
"action": "transfer_failed",
"reason": "test_complete",
}
transfer_manager = Mock()
transfer_manager.store_transfer_context = AsyncMock()
transfer_manager.wait_for_transfer_completion = AsyncMock(
return_value=transfer_event
)
result_received = None
async def mock_result_callback(result, properties=None):
nonlocal result_received
result_received = result
mock_params = Mock()
mock_params.arguments = {}
mock_params.result_callback = mock_result_callback
with (
patch(
"api.services.workflow.pipecat_engine_custom_tools.db_client.get_workflow_run_by_id",
new=AsyncMock(return_value=workflow_run),
),
patch(
"api.services.workflow.pipecat_engine_custom_tools.get_telephony_provider_for_run",
new=AsyncMock(return_value=provider),
),
patch(
"api.services.workflow.pipecat_engine_custom_tools.get_call_transfer_manager",
new=AsyncMock(return_value=transfer_manager),
),
patch(
"api.services.workflow.pipecat_engine_custom_tools.play_audio_loop",
new=AsyncMock(return_value=None),
),
):
await handler(mock_params)
provider.transfer_call.assert_awaited_once()
assert provider.transfer_call.await_args.kwargs["destination"] == "+14155550123"
first_context = transfer_manager.store_transfer_context.await_args_list[0].args[
0
]
assert first_context.target_number == "+14155550123"
assert result_received["status"] == "transfer_failed"
@pytest.mark.asyncio
async def test_transfer_call_propagates_provider_destination_error(self):
"""Provider-specific destination failures are returned through the tool result."""
from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager
mock_engine = Mock()
mock_engine._workflow_run_id = 1
mock_engine._call_context_vars = {}
mock_engine._gathered_context = {}
mock_engine._fetch_recording_audio = None
mock_engine._audio_config = SimpleNamespace(transport_out_sample_rate=8000)
mock_engine._transport_output = SimpleNamespace(queue_frame=AsyncMock())
mock_engine._get_organization_id = AsyncMock(return_value=1)
mock_engine.set_mute_pipeline = Mock()
mock_engine.end_call_with_reason = AsyncMock()
manager = CustomToolManager(mock_engine)
tool = MockToolModel(
tool_uuid="transfer-tool-uuid",
name="Transfer Call",
description="Transfer the caller",
category="transfer_call",
definition={
"schema_version": 1,
"type": "transfer_call",
"config": {
"destination": "provider-specific-destination",
"timeout": 30,
},
},
)
handler, _timeout_secs = manager._create_handler(tool, "transfer_call")
workflow_run = SimpleNamespace(
mode=WorkflowRunMode.TWILIO.value,
gathered_context={"call_id": "caller-call-sid"},
)
provider = Mock()
provider.supports_transfers.return_value = True
provider.validate_config.return_value = True
provider.transfer_call = AsyncMock(
side_effect=Exception("provider rejected destination")
)
transfer_manager = Mock()
transfer_manager.store_transfer_context = AsyncMock()
transfer_manager.remove_transfer_context = AsyncMock()
result_received = None
async def mock_result_callback(result, properties=None):
nonlocal result_received
result_received = result
mock_params = Mock()
mock_params.arguments = {}
mock_params.result_callback = mock_result_callback
with (
patch(
"api.services.workflow.pipecat_engine_custom_tools.db_client.get_workflow_run_by_id",
new=AsyncMock(return_value=workflow_run),
),
patch(
"api.services.workflow.pipecat_engine_custom_tools.get_telephony_provider_for_run",
new=AsyncMock(return_value=provider),
),
patch(
"api.services.workflow.pipecat_engine_custom_tools.get_call_transfer_manager",
new=AsyncMock(return_value=transfer_manager),
),
):
await handler(mock_params)
provider.transfer_call.assert_awaited_once()
assert (
provider.transfer_call.await_args.kwargs["destination"]
== "provider-specific-destination"
)
transfer_manager.remove_transfer_context.assert_awaited_once()
assert result_received == {
"status": "transfer_failed",
"reason": "provider_error",
"message": "Transfer provider failed: provider rejected destination",
}
def _update_llm_context(context, system_message, functions):
"""Inline helper replicating the old update_llm_context for tests."""

View file

@ -0,0 +1,15 @@
from api.schemas.tool import TransferCallConfig
def test_transfer_call_destination_accepts_initial_context_template():
config = TransferCallConfig(
destination="{{initial_context.transfer_destination}}",
)
assert config.destination == "{{initial_context.transfer_destination}}"
def test_transfer_call_destination_accepts_provider_specific_literal():
config = TransferCallConfig(destination="provider-specific-destination")
assert config.destination == "provider-specific-destination"