chore: refactor pipecat engine custom tools and other telephony services

This commit is contained in:
Sabiha Khan 2026-02-14 07:58:28 +05:30
parent 942a20bd14
commit c0cbc65de3
10 changed files with 17 additions and 176 deletions

View file

@ -9,7 +9,6 @@ import asyncio
from typing import Dict, Set
from loguru import logger
from pipecat.audio.utils import mix_audio
from pipecat.frames.frames import (
Frame,

View file

@ -81,14 +81,7 @@ def register_event_handlers(
async def on_client_disconnected(_transport, _participant):
call_disposed = engine.is_call_disposed()
transfer_in_progress = getattr(engine, '_transfer_in_progress', False)
logger.info(f"[TRANSFER-DEBUG] on_client_disconnected triggered")
logger.info(f"[TRANSFER-DEBUG] Engine instance ID in event_handler: {id(engine)}")
logger.info(f"[TRANSFER-DEBUG] Engine type in event_handler: {type(engine)}")
logger.info(f"[TRANSFER-DEBUG] transfer_in_progress attribute exists: {hasattr(engine, '_transfer_in_progress')}")
logger.info(f"[TRANSFER-DEBUG] transfer_in_progress value: {transfer_in_progress}")
logger.info(f"[TRANSFER-DEBUG] Call disposed: {call_disposed}")
logger.debug(
f"In on_client_disconnected callback handler. Call disposed: {call_disposed}, "
f"Transfer in progress: {transfer_in_progress}"
@ -102,7 +95,7 @@ def register_event_handlers(
logger.info("Transfer in progress - skipping auto hang-up, letting redirect handle call")
return
logger.info("Transfer in progress - False, proceeding with hang up")
# Normal hang-up logic for non-transfer disconnections
await engine.end_call_with_reason(
EndTaskReason.USER_HANGUP.value, abort_immediately=True
)

View file

@ -115,7 +115,6 @@ async def run_pipeline_twilio(
# Create audio configuration for Twilio
audio_config = create_audio_config(WorkflowRunMode.TWILIO.value)
transport = await create_twilio_transport(
websocket_client,
stream_sid,
@ -125,8 +124,7 @@ async def run_pipeline_twilio(
workflow.organization_id,
vad_config,
ambient_noise_config,
)
)
await _run_pipeline(
transport,
workflow_id,

View file

@ -1,7 +1,6 @@
"""Redis-based transfer coordination service for multi-instance scaling.
"""Redis-based transfer event coordination service
Handles transfer event publishing, subscription, and context storage
across multiple API server instances.
"""
import asyncio

View file

@ -694,7 +694,7 @@ class CloudonixProvider(TelephonyProvider):
Cloudonix provider does not support call transfers.
Raises:
NotImplementedError: Always, as Cloudonix doesn't support transfers
NotImplementedError: Cloudonix call transfers are yet to be implemented
"""
raise NotImplementedError("Cloudonix provider does not support call transfers")

View file

@ -547,7 +547,7 @@ class VobizProvider(TelephonyProvider):
Vobiz provider does not support call transfers.
Raises:
NotImplementedError: Always, as Vobiz doesn't support transfers
NotImplementedError: Vobiz call transfers are yet to be implemented
"""
raise NotImplementedError("Vobiz provider does not support call transfers")

View file

@ -498,7 +498,7 @@ class VonageProvider(TelephonyProvider):
Vonage provider does not support call transfers.
Raises:
NotImplementedError: Always, as Vonage doesn't support transfers
NotImplementedError: call transfers are yet to be implemented
"""
raise NotImplementedError("Vonage provider does not support call transfers")

View file

@ -1,4 +1,4 @@
"""Redis communication protocol for transfer coordination.
"""Redis communication protocol for call transfer coordination.
Defines event formats and Redis channels for coordinating call transfers
across multiple API server instances.

View file

@ -1,133 +0,0 @@
"""Test utilities for transfer coordination.
This module provides utilities to test Redis-based transfer coordination
across multiple instances.
"""
import asyncio
import time
import uuid
from typing import Optional
from loguru import logger
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.transfer_event_protocol import (
TransferContext,
TransferEvent,
TransferEventType
)
async def test_redis_coordination():
"""Test basic Redis pub/sub coordination for transfers."""
logger.info("Testing Redis-based transfer coordination...")
call_transfer_manager = await get_call_transfer_manager()
# Test 1: Store and retrieve transfer context
tool_call_id = str(uuid.uuid4())
test_context = TransferContext(
tool_call_id=tool_call_id,
call_sid="test_call_123",
target_number="+1234567890",
tool_uuid="test_tool_uuid",
original_call_sid="original_call_123",
caller_number="+0987654321",
initiated_at=time.time()
)
logger.info("Test 1: Storing transfer context...")
await call_transfer_manager.store_transfer_context(test_context)
logger.info("Test 1: Retrieving transfer context...")
retrieved_context = await call_transfer_manager.get_transfer_context(tool_call_id)
if retrieved_context and retrieved_context.tool_call_id == tool_call_id:
logger.info("✅ Test 1 PASSED: Context storage/retrieval works")
else:
logger.error("❌ Test 1 FAILED: Context storage/retrieval failed")
return False
# Test 2: Event publishing and waiting
logger.info("Test 2: Testing event publishing...")
# Start waiting for completion in background
async def wait_for_completion():
return await call_transfer_manager.wait_for_transfer_completion(tool_call_id, 5.0)
wait_task = asyncio.create_task(wait_for_completion())
# Give it a moment to start waiting
await asyncio.sleep(0.5)
# Publish completion event
test_event = TransferEvent(
type=TransferEventType.TRANSFER_COMPLETED,
tool_call_id=tool_call_id,
original_call_sid="original_call_123",
transfer_call_sid="transfer_call_456",
conference_name="test-conference",
message="Test transfer completed successfully",
status="success",
action="transfer_success"
)
logger.info("Test 2: Publishing completion event...")
await call_transfer_manager.publish_transfer_event(test_event)
# Wait for the completion
received_event = await wait_task
if received_event and received_event.tool_call_id == tool_call_id:
logger.info("✅ Test 2 PASSED: Event pub/sub works")
else:
logger.error("❌ Test 2 FAILED: Event pub/sub failed")
return False
# Test 3: Cleanup
logger.info("Test 3: Testing cleanup...")
await call_transfer_manager.remove_transfer_context(tool_call_id)
cleanup_context = await call_transfer_manager.get_transfer_context(tool_call_id)
if cleanup_context is None:
logger.info("✅ Test 3 PASSED: Cleanup works")
else:
logger.error("❌ Test 3 FAILED: Cleanup failed")
return False
logger.info("✅ All tests PASSED! Redis coordination is working correctly.")
return True
async def test_timeout_handling():
"""Test timeout handling in transfer coordination."""
logger.info("Testing timeout handling...")
call_transfer_manager = await get_call_transfer_manager()
tool_call_id = str(uuid.uuid4())
# Wait for completion with short timeout (should timeout)
start_time = time.time()
result = await call_transfer_manager.wait_for_transfer_completion(tool_call_id, 2.0)
elapsed = time.time() - start_time
if result is None and elapsed >= 2.0:
logger.info("✅ Timeout test PASSED: Properly timed out after 2 seconds")
return True
else:
logger.error(f"❌ Timeout test FAILED: Expected timeout, got {result} in {elapsed}s")
return False
if __name__ == "__main__":
async def main():
success1 = await test_redis_coordination()
success2 = await test_timeout_handling()
if success1 and success2:
logger.info("🎉 All transfer coordination tests PASSED!")
else:
logger.error("💥 Some tests FAILED!")
asyncio.run(main())

View file

@ -262,7 +262,7 @@ class CustomToolManager:
Returns:
Async handler function for the transfer call tool
"""
# Don't run LLM after starting transfer - we're handling async response
properties = FunctionCallResultProperties(run_llm=False)
async def transfer_call_handler(
@ -296,8 +296,8 @@ class CustomToolManager:
return
# Validate E.164 format
e164_pattern = r"^\+[1-9]\d{1,14}$"
if not re.match(e164_pattern, destination):
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
if not re.match(E164_PHONE_REGEX, destination):
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.",
@ -310,11 +310,6 @@ class CustomToolManager:
)
return
# Provider validation handled by telephony endpoint
# Note: User muting and hold music are handled automatically by
# Play pre-transfer message if configured
if message_type == "custom" and custom_message:
logger.info(f"Playing pre-transfer message: {custom_message}")
await self._engine.task.queue_frame(TTSSpeakFrame(custom_message))
@ -323,7 +318,7 @@ class CustomToolManager:
from pipecat.utils.run_context import get_current_call_sid
original_call_sid = get_current_call_sid()
caller_number = None # Skip caller number for now as requested
caller_number = None # TODO: check if this is redundant now
logger.info(f"Found original call context: call_id={original_call_sid}")
@ -332,7 +327,7 @@ class CustomToolManager:
if not organization_id:
validation_error_result = {
"status": "failed",
"message": "I'm sorry, but I can't determine which organization this call belongs to. Please contact support.",
"message": "I'm sorry, there's an issue with this call transfer. Please contact support.",
"action": "transfer_failed",
"reason": "no_organization_id",
"end_call": False,
@ -341,7 +336,7 @@ class CustomToolManager:
validation_error_result, function_call_params, properties
)
return
#TODO: check if everything in transfer_data is still needed
# Prepare transfer request data
transfer_data = {
"destination": destination,
@ -352,7 +347,7 @@ class CustomToolManager:
"caller_number": caller_number, # Original caller's phone number
}
# Initialize Redis-based transfer coordination
import time
# Get backend endpoint URL
@ -500,8 +495,6 @@ class CustomToolManager:
f"Transfer initiation failed: {response.status_code} - {error_data}"
)
# No cleanup needed for Redis-based coordination
# Handle initiation failure with user-friendly message
initiation_failure_result = {
"status": "failed",
@ -518,8 +511,6 @@ class CustomToolManager:
except httpx.TimeoutException:
logger.error(f"Transfer call '{function_name}' HTTP request timed out")
# No cleanup needed for Redis-based coordination
# Handle HTTP timeout with user-friendly message
http_timeout_result = {
"status": "failed",
@ -538,8 +529,6 @@ class CustomToolManager:
f"Transfer call tool '{function_name}' execution failed: {e}"
)
# No cleanup needed for Redis-based coordination
# Handle generic exception with user-friendly message
exception_result = {
"status": "failed",
@ -576,10 +565,10 @@ class CustomToolManager:
f"Transfer successful! Conference: {conference_id}, Original: {original_call_sid}, Transfer: {transfer_call_sid}"
)
# First inform LLM of success (but don't continue call)
# Inform LLM of success and end the call with Transfer call reason
response_properties = FunctionCallResultProperties(
run_llm=False
) # We'll handle the transfer ourselves
)
await function_call_params.result_callback(
{
"status": "transfer_success",
@ -598,8 +587,6 @@ class CustomToolManager:
reason = result.get("reason", "unknown")
logger.info(f"Transfer failed ({reason}), informing user and ending call")
# Use system message pattern to direct LLM response for transfer failure
# This is more reliable than function call results
from pipecat.frames.frames import LLMMessagesAppendFrame
# Create system message with clear instructions for transfer failure
@ -644,13 +631,11 @@ class CustomToolManager:
# We'll schedule the end call after a brief delay to allow TTS
logger.info("Scheduling call end after LLM delivers failure message")
# Import here to avoid circular dependencies
import asyncio
# Schedule call end after 3 seconds to allow LLM to speak
async def delayed_end_call():
import asyncio
await asyncio.sleep(3)
await self._engine.end_call_with_reason(
f"transfer_failed_{reason}", # Include specific reason in end reason