diff --git a/api/schemas/tool.py b/api/schemas/tool.py index 6767e28e..1fa55c69 100644 --- a/api/schemas/tool.py +++ b/api/schemas/tool.py @@ -8,7 +8,6 @@ when the same schema is surfaced through MCP or SDK authoring flows. from __future__ import annotations -import re from datetime import datetime from typing import Annotated, Any, Dict, List, Literal, Optional, Union @@ -186,8 +185,8 @@ class TransferCallConfig(BaseModel): destination: str = Field( description=( - "Phone number or SIP endpoint to transfer the call to, e.g. " - "+1234567890 or PJSIP/1234." + "Phone number, SIP endpoint, or template to transfer the call to, e.g. " + "+1234567890, PJSIP/1234, or {{initial_context.transfer_destination}}." ) ) messageType: Literal["none", "custom", "audio"] = Field( @@ -206,27 +205,6 @@ class TransferCallConfig(BaseModel): description="Maximum seconds to wait for the destination to answer.", ) - @field_validator("destination") - @classmethod - def validate_destination(cls, v: str) -> str: - """Validate that destination is a valid E.164 phone number or SIP endpoint.""" - if not v.strip(): - return v - - e164_pattern = r"^\+[1-9]\d{1,14}$" - sip_pattern = r"^(PJSIP|SIP)/[\w\-\.@]+$" - - is_valid_e164 = re.match(e164_pattern, v) - is_valid_sip = re.match(sip_pattern, v, re.IGNORECASE) - - if not (is_valid_e164 or is_valid_sip): - raise ValueError( - "Destination must be a valid E.164 phone number " - "(e.g., +1234567890) or SIP endpoint (e.g., PJSIP/1234)" - ) - return v - - class McpToolConfig(BaseModel): """Configuration for a customer MCP server tool definition.""" diff --git a/api/services/integrations/registry.py b/api/services/integrations/registry.py index 3c9a5f1b..409fcea9 100644 --- a/api/services/integrations/registry.py +++ b/api/services/integrations/registry.py @@ -131,7 +131,9 @@ async def run_completion_handlers( f"Integration completion handler failed for package " f"{package.name!r}: {exc}" ) - results[f"integration_{package.name}"] = {"error": "completion_handler_failed"} + results[f"integration_{package.name}"] = { + "error": "completion_handler_failed" + } continue if package_result: results.update(package_result) diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index 25298d73..cb72ac46 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -7,7 +7,6 @@ during workflow execution. from __future__ import annotations import asyncio -import re import time import uuid from typing import TYPE_CHECKING, Any, Dict, List, Optional @@ -32,12 +31,32 @@ from api.services.workflow.tools.custom_tool import ( execute_http_tool, tool_to_function_schema, ) +from api.utils.template_renderer import render_template if TYPE_CHECKING: from api.services.workflow.mcp_tool_session import McpToolSession from api.services.workflow.pipecat_engine import PipecatEngine +def _render_transfer_destination( + destination_template: Any, + call_context_vars: Optional[Dict[str, Any]], + gathered_context_vars: Optional[Dict[str, Any]], +) -> str: + """Resolve a transfer destination template into a concrete provider target.""" + + initial_context = dict(call_context_vars or {}) + render_context: Dict[str, Any] = { + **initial_context, + "initial_context": initial_context, + "gathered_context": dict(gathered_context_vars or {}), + } + rendered = render_template(destination_template, render_context) + if rendered is None: + return "" + return str(rendered).strip() + + def get_function_schema( function_name: str, description: str, @@ -541,6 +560,12 @@ class CustomToolManager: ) return + destination = _render_transfer_destination( + destination, + self._engine._call_context_vars, + self._engine._gathered_context, + ) + # Validate destination phone number if not destination or not destination.strip(): validation_error_result = { @@ -554,41 +579,6 @@ class CustomToolManager: ) return - # Validate destination format based on workflow run mode - if workflow_run.mode == WorkflowRunMode.ARI.value: - # For ARI provider, also accept SIP endpoints - SIP_ENDPOINT_REGEX = r"^(PJSIP|SIP)\/[\w\-\.@]+$" - E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$" - - is_valid_sip = re.match(SIP_ENDPOINT_REGEX, destination) - is_valid_e164 = re.match(E164_PHONE_REGEX, destination) - - if not (is_valid_sip or is_valid_e164): - validation_error_result = { - "status": "failed", - "message": "I'm sorry, but the transfer destination appears to be invalid. Please contact support to verify the transfer settings.", - "action": "transfer_failed", - "reason": "invalid_destination", - } - await self._handle_transfer_result( - validation_error_result, function_call_params, properties - ) - return - else: - # For non-ARI providers (Twilio, etc), use E.164 validation - E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$" - if not re.match(E164_PHONE_REGEX, destination): - validation_error_result = { - "status": "failed", - "message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.", - "action": "transfer_failed", - "reason": "invalid_destination", - } - await self._handle_transfer_result( - validation_error_result, function_call_params, properties - ) - return - played = await self._play_config_message(config) if played: self._engine._queued_speech_mute_state = "waiting" @@ -648,12 +638,27 @@ class CustomToolManager: self._engine.set_mute_pipeline(True) # Initiate transfer via provider with inline TwiML - transfer_result = await provider.transfer_call( - destination=destination, - transfer_id=transfer_id, - conference_name=conference_name, - timeout=timeout_seconds, - ) + try: + transfer_result = await provider.transfer_call( + destination=destination, + transfer_id=transfer_id, + conference_name=conference_name, + timeout=timeout_seconds, + ) + except Exception as e: + logger.error(f"Transfer provider failed: {e}") + self._engine.set_mute_pipeline(False) + await call_transfer_manager.remove_transfer_context(transfer_id) + provider_error_result = { + "status": "failed", + "message": f"Transfer provider failed: {e}", + "action": "transfer_failed", + "reason": "provider_error", + } + await self._handle_transfer_result( + provider_error_result, function_call_params, properties + ) + return call_sid = transfer_result.get("call_sid") logger.info(f"Transfer call initiated successfully: {call_sid}") @@ -809,7 +814,7 @@ class CustomToolManager: { "status": "transfer_failed", "reason": reason, - "message": "Transfer failed", + "message": result.get("message") or "Transfer failed", } ) else: diff --git a/api/tests/test_custom_tools.py b/api/tests/test_custom_tools.py index c066528a..e3523edc 100644 --- a/api/tests/test_custom_tools.py +++ b/api/tests/test_custom_tools.py @@ -8,6 +8,7 @@ This module tests: """ from dataclasses import dataclass +from types import SimpleNamespace from typing import Any, Dict from unittest.mock import AsyncMock, Mock, patch @@ -27,6 +28,7 @@ from pipecat.pipeline.pipeline import Pipeline from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.services.llm_service import FunctionCallParams +from api.enums import WorkflowRunMode from api.services.workflow.pipecat_engine_custom_tools import get_function_schema from api.services.workflow.tools.custom_tool import ( _coerce_parameter_value, @@ -1157,6 +1159,186 @@ class TestCustomToolManagerUnit: # Verify result was returned assert result_received["status"] == "success" + @pytest.mark.asyncio + async def test_transfer_call_renders_destination_from_initial_context(self): + """Transfer call tools resolve destination templates before provider calls.""" + from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager + + mock_engine = Mock() + mock_engine._workflow_run_id = 1 + mock_engine._call_context_vars = { + "transfer_destination": "+14155550123", + } + mock_engine._gathered_context = {} + mock_engine._fetch_recording_audio = None + mock_engine._audio_config = SimpleNamespace(transport_out_sample_rate=8000) + mock_engine._transport_output = SimpleNamespace(queue_frame=AsyncMock()) + mock_engine._get_organization_id = AsyncMock(return_value=1) + mock_engine.set_mute_pipeline = Mock() + mock_engine.end_call_with_reason = AsyncMock() + + manager = CustomToolManager(mock_engine) + tool = MockToolModel( + tool_uuid="transfer-tool-uuid", + name="Transfer Call", + description="Transfer the caller", + category="transfer_call", + definition={ + "schema_version": 1, + "type": "transfer_call", + "config": { + "destination": "{{initial_context.transfer_destination}}", + "timeout": 30, + }, + }, + ) + handler, _timeout_secs = manager._create_handler(tool, "transfer_call") + + workflow_run = SimpleNamespace( + mode=WorkflowRunMode.TWILIO.value, + gathered_context={"call_id": "caller-call-sid"}, + ) + provider = Mock() + provider.supports_transfers.return_value = True + provider.validate_config.return_value = True + provider.transfer_call = AsyncMock(return_value={"call_sid": "dest-call-sid"}) + + transfer_event = Mock() + transfer_event.to_result_dict.return_value = { + "status": "failed", + "action": "transfer_failed", + "reason": "test_complete", + } + transfer_manager = Mock() + transfer_manager.store_transfer_context = AsyncMock() + transfer_manager.wait_for_transfer_completion = AsyncMock( + return_value=transfer_event + ) + + result_received = None + + async def mock_result_callback(result, properties=None): + nonlocal result_received + result_received = result + + mock_params = Mock() + mock_params.arguments = {} + mock_params.result_callback = mock_result_callback + + with ( + patch( + "api.services.workflow.pipecat_engine_custom_tools.db_client.get_workflow_run_by_id", + new=AsyncMock(return_value=workflow_run), + ), + patch( + "api.services.workflow.pipecat_engine_custom_tools.get_telephony_provider_for_run", + new=AsyncMock(return_value=provider), + ), + patch( + "api.services.workflow.pipecat_engine_custom_tools.get_call_transfer_manager", + new=AsyncMock(return_value=transfer_manager), + ), + patch( + "api.services.workflow.pipecat_engine_custom_tools.play_audio_loop", + new=AsyncMock(return_value=None), + ), + ): + await handler(mock_params) + + provider.transfer_call.assert_awaited_once() + assert provider.transfer_call.await_args.kwargs["destination"] == "+14155550123" + first_context = transfer_manager.store_transfer_context.await_args_list[0].args[ + 0 + ] + assert first_context.target_number == "+14155550123" + assert result_received["status"] == "transfer_failed" + + @pytest.mark.asyncio + async def test_transfer_call_propagates_provider_destination_error(self): + """Provider-specific destination failures are returned through the tool result.""" + from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager + + mock_engine = Mock() + mock_engine._workflow_run_id = 1 + mock_engine._call_context_vars = {} + mock_engine._gathered_context = {} + mock_engine._fetch_recording_audio = None + mock_engine._audio_config = SimpleNamespace(transport_out_sample_rate=8000) + mock_engine._transport_output = SimpleNamespace(queue_frame=AsyncMock()) + mock_engine._get_organization_id = AsyncMock(return_value=1) + mock_engine.set_mute_pipeline = Mock() + mock_engine.end_call_with_reason = AsyncMock() + + manager = CustomToolManager(mock_engine) + tool = MockToolModel( + tool_uuid="transfer-tool-uuid", + name="Transfer Call", + description="Transfer the caller", + category="transfer_call", + definition={ + "schema_version": 1, + "type": "transfer_call", + "config": { + "destination": "provider-specific-destination", + "timeout": 30, + }, + }, + ) + handler, _timeout_secs = manager._create_handler(tool, "transfer_call") + + workflow_run = SimpleNamespace( + mode=WorkflowRunMode.TWILIO.value, + gathered_context={"call_id": "caller-call-sid"}, + ) + provider = Mock() + provider.supports_transfers.return_value = True + provider.validate_config.return_value = True + provider.transfer_call = AsyncMock( + side_effect=Exception("provider rejected destination") + ) + + transfer_manager = Mock() + transfer_manager.store_transfer_context = AsyncMock() + transfer_manager.remove_transfer_context = AsyncMock() + + result_received = None + + async def mock_result_callback(result, properties=None): + nonlocal result_received + result_received = result + + mock_params = Mock() + mock_params.arguments = {} + mock_params.result_callback = mock_result_callback + + with ( + patch( + "api.services.workflow.pipecat_engine_custom_tools.db_client.get_workflow_run_by_id", + new=AsyncMock(return_value=workflow_run), + ), + patch( + "api.services.workflow.pipecat_engine_custom_tools.get_telephony_provider_for_run", + new=AsyncMock(return_value=provider), + ), + patch( + "api.services.workflow.pipecat_engine_custom_tools.get_call_transfer_manager", + new=AsyncMock(return_value=transfer_manager), + ), + ): + await handler(mock_params) + + provider.transfer_call.assert_awaited_once() + assert ( + provider.transfer_call.await_args.kwargs["destination"] + == "provider-specific-destination" + ) + transfer_manager.remove_transfer_context.assert_awaited_once() + assert result_received == { + "status": "transfer_failed", + "reason": "provider_error", + "message": "Transfer provider failed: provider rejected destination", + } + def _update_llm_context(context, system_message, functions): """Inline helper replicating the old update_llm_context for tests.""" diff --git a/api/tests/test_tool_schema.py b/api/tests/test_tool_schema.py new file mode 100644 index 00000000..61afc227 --- /dev/null +++ b/api/tests/test_tool_schema.py @@ -0,0 +1,15 @@ +from api.schemas.tool import TransferCallConfig + + +def test_transfer_call_destination_accepts_initial_context_template(): + config = TransferCallConfig( + destination="{{initial_context.transfer_destination}}", + ) + + assert config.destination == "{{initial_context.transfer_destination}}" + + +def test_transfer_call_destination_accepts_provider_specific_literal(): + config = TransferCallConfig(destination="provider-specific-destination") + + assert config.destination == "provider-specific-destination" diff --git a/ui/src/app/telephony-configurations/page.tsx b/ui/src/app/telephony-configurations/page.tsx index ca99468e..e725999a 100644 --- a/ui/src/app/telephony-configurations/page.tsx +++ b/ui/src/app/telephony-configurations/page.tsx @@ -238,7 +238,7 @@ export default function TelephonyConfigurationsPage() {
Enter one of these destination formats:
+