diff --git a/api/services/looptalk/audio_streamer.py b/api/services/looptalk/audio_streamer.py index 8221c4de..0acdb223 100644 --- a/api/services/looptalk/audio_streamer.py +++ b/api/services/looptalk/audio_streamer.py @@ -9,7 +9,6 @@ import asyncio from typing import Dict, Set from loguru import logger - from pipecat.audio.utils import mix_audio from pipecat.frames.frames import ( Frame, diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index c1a9fe21..aef7a349 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -81,14 +81,7 @@ def register_event_handlers( async def on_client_disconnected(_transport, _participant): call_disposed = engine.is_call_disposed() transfer_in_progress = getattr(engine, '_transfer_in_progress', False) - - logger.info(f"[TRANSFER-DEBUG] on_client_disconnected triggered") - logger.info(f"[TRANSFER-DEBUG] Engine instance ID in event_handler: {id(engine)}") - logger.info(f"[TRANSFER-DEBUG] Engine type in event_handler: {type(engine)}") - logger.info(f"[TRANSFER-DEBUG] transfer_in_progress attribute exists: {hasattr(engine, '_transfer_in_progress')}") - logger.info(f"[TRANSFER-DEBUG] transfer_in_progress value: {transfer_in_progress}") - logger.info(f"[TRANSFER-DEBUG] Call disposed: {call_disposed}") - + logger.debug( f"In on_client_disconnected callback handler. Call disposed: {call_disposed}, " f"Transfer in progress: {transfer_in_progress}" @@ -102,7 +95,7 @@ def register_event_handlers( logger.info("Transfer in progress - skipping auto hang-up, letting redirect handle call") return logger.info("Transfer in progress - False, proceeding with hang up") - # Normal hang-up logic for non-transfer disconnections + await engine.end_call_with_reason( EndTaskReason.USER_HANGUP.value, abort_immediately=True ) diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 9693c592..5f5eab80 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -115,7 +115,6 @@ async def run_pipeline_twilio( # Create audio configuration for Twilio audio_config = create_audio_config(WorkflowRunMode.TWILIO.value) - transport = await create_twilio_transport( websocket_client, stream_sid, @@ -125,8 +124,7 @@ async def run_pipeline_twilio( workflow.organization_id, vad_config, ambient_noise_config, - ) - + ) await _run_pipeline( transport, workflow_id, diff --git a/api/services/telephony/call_transfer_manager.py b/api/services/telephony/call_transfer_manager.py index cd8bfb7a..b542a7fc 100644 --- a/api/services/telephony/call_transfer_manager.py +++ b/api/services/telephony/call_transfer_manager.py @@ -1,7 +1,6 @@ -"""Redis-based transfer coordination service for multi-instance scaling. +"""Redis-based transfer event coordination service Handles transfer event publishing, subscription, and context storage -across multiple API server instances. """ import asyncio diff --git a/api/services/telephony/providers/cloudonix_provider.py b/api/services/telephony/providers/cloudonix_provider.py index ecc23411..268afa7c 100644 --- a/api/services/telephony/providers/cloudonix_provider.py +++ b/api/services/telephony/providers/cloudonix_provider.py @@ -694,7 +694,7 @@ class CloudonixProvider(TelephonyProvider): Cloudonix provider does not support call transfers. Raises: - NotImplementedError: Always, as Cloudonix doesn't support transfers + NotImplementedError: Cloudonix call transfers are yet to be implemented """ raise NotImplementedError("Cloudonix provider does not support call transfers") diff --git a/api/services/telephony/providers/vobiz_provider.py b/api/services/telephony/providers/vobiz_provider.py index 84ba44af..ae75b6be 100644 --- a/api/services/telephony/providers/vobiz_provider.py +++ b/api/services/telephony/providers/vobiz_provider.py @@ -547,7 +547,7 @@ class VobizProvider(TelephonyProvider): Vobiz provider does not support call transfers. Raises: - NotImplementedError: Always, as Vobiz doesn't support transfers + NotImplementedError: Vobiz call transfers are yet to be implemented """ raise NotImplementedError("Vobiz provider does not support call transfers") diff --git a/api/services/telephony/providers/vonage_provider.py b/api/services/telephony/providers/vonage_provider.py index a6170587..5640c410 100644 --- a/api/services/telephony/providers/vonage_provider.py +++ b/api/services/telephony/providers/vonage_provider.py @@ -498,7 +498,7 @@ class VonageProvider(TelephonyProvider): Vonage provider does not support call transfers. Raises: - NotImplementedError: Always, as Vonage doesn't support transfers + NotImplementedError: call transfers are yet to be implemented """ raise NotImplementedError("Vonage provider does not support call transfers") diff --git a/api/services/telephony/transfer_event_protocol.py b/api/services/telephony/transfer_event_protocol.py index 6b2ef900..32056c79 100644 --- a/api/services/telephony/transfer_event_protocol.py +++ b/api/services/telephony/transfer_event_protocol.py @@ -1,4 +1,4 @@ -"""Redis communication protocol for transfer coordination. +"""Redis communication protocol for call transfer coordination. Defines event formats and Redis channels for coordinating call transfers across multiple API server instances. diff --git a/api/services/telephony/transfer_test.py b/api/services/telephony/transfer_test.py deleted file mode 100644 index 6441c8bd..00000000 --- a/api/services/telephony/transfer_test.py +++ /dev/null @@ -1,133 +0,0 @@ -"""Test utilities for transfer coordination. - -This module provides utilities to test Redis-based transfer coordination -across multiple instances. -""" - -import asyncio -import time -import uuid -from typing import Optional - -from loguru import logger - -from api.services.telephony.call_transfer_manager import get_call_transfer_manager -from api.services.telephony.transfer_event_protocol import ( - TransferContext, - TransferEvent, - TransferEventType -) - - -async def test_redis_coordination(): - """Test basic Redis pub/sub coordination for transfers.""" - logger.info("Testing Redis-based transfer coordination...") - - call_transfer_manager = await get_call_transfer_manager() - - # Test 1: Store and retrieve transfer context - tool_call_id = str(uuid.uuid4()) - test_context = TransferContext( - tool_call_id=tool_call_id, - call_sid="test_call_123", - target_number="+1234567890", - tool_uuid="test_tool_uuid", - original_call_sid="original_call_123", - caller_number="+0987654321", - initiated_at=time.time() - ) - - logger.info("Test 1: Storing transfer context...") - await call_transfer_manager.store_transfer_context(test_context) - - logger.info("Test 1: Retrieving transfer context...") - retrieved_context = await call_transfer_manager.get_transfer_context(tool_call_id) - - if retrieved_context and retrieved_context.tool_call_id == tool_call_id: - logger.info("✅ Test 1 PASSED: Context storage/retrieval works") - else: - logger.error("❌ Test 1 FAILED: Context storage/retrieval failed") - return False - - # Test 2: Event publishing and waiting - logger.info("Test 2: Testing event publishing...") - - # Start waiting for completion in background - async def wait_for_completion(): - return await call_transfer_manager.wait_for_transfer_completion(tool_call_id, 5.0) - - wait_task = asyncio.create_task(wait_for_completion()) - - # Give it a moment to start waiting - await asyncio.sleep(0.5) - - # Publish completion event - test_event = TransferEvent( - type=TransferEventType.TRANSFER_COMPLETED, - tool_call_id=tool_call_id, - original_call_sid="original_call_123", - transfer_call_sid="transfer_call_456", - conference_name="test-conference", - message="Test transfer completed successfully", - status="success", - action="transfer_success" - ) - - logger.info("Test 2: Publishing completion event...") - await call_transfer_manager.publish_transfer_event(test_event) - - # Wait for the completion - received_event = await wait_task - - if received_event and received_event.tool_call_id == tool_call_id: - logger.info("✅ Test 2 PASSED: Event pub/sub works") - else: - logger.error("❌ Test 2 FAILED: Event pub/sub failed") - return False - - # Test 3: Cleanup - logger.info("Test 3: Testing cleanup...") - await call_transfer_manager.remove_transfer_context(tool_call_id) - - cleanup_context = await call_transfer_manager.get_transfer_context(tool_call_id) - if cleanup_context is None: - logger.info("✅ Test 3 PASSED: Cleanup works") - else: - logger.error("❌ Test 3 FAILED: Cleanup failed") - return False - - logger.info("✅ All tests PASSED! Redis coordination is working correctly.") - return True - - -async def test_timeout_handling(): - """Test timeout handling in transfer coordination.""" - logger.info("Testing timeout handling...") - - call_transfer_manager = await get_call_transfer_manager() - tool_call_id = str(uuid.uuid4()) - - # Wait for completion with short timeout (should timeout) - start_time = time.time() - result = await call_transfer_manager.wait_for_transfer_completion(tool_call_id, 2.0) - elapsed = time.time() - start_time - - if result is None and elapsed >= 2.0: - logger.info("✅ Timeout test PASSED: Properly timed out after 2 seconds") - return True - else: - logger.error(f"❌ Timeout test FAILED: Expected timeout, got {result} in {elapsed}s") - return False - - -if __name__ == "__main__": - async def main(): - success1 = await test_redis_coordination() - success2 = await test_timeout_handling() - - if success1 and success2: - logger.info("🎉 All transfer coordination tests PASSED!") - else: - logger.error("💥 Some tests FAILED!") - - asyncio.run(main()) \ No newline at end of file diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index 5c430163..2aeb795d 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -262,7 +262,7 @@ class CustomToolManager: Returns: Async handler function for the transfer call tool """ - # Don't run LLM after starting transfer - we're handling async response + properties = FunctionCallResultProperties(run_llm=False) async def transfer_call_handler( @@ -296,8 +296,8 @@ class CustomToolManager: return # Validate E.164 format - e164_pattern = r"^\+[1-9]\d{1,14}$" - if not re.match(e164_pattern, destination): + E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$" + if not re.match(E164_PHONE_REGEX, destination): validation_error_result = { "status": "failed", "message": "I'm sorry, but the transfer phone number appears to be invalid. Please contact support to verify the transfer settings.", @@ -310,11 +310,6 @@ class CustomToolManager: ) return - # Provider validation handled by telephony endpoint - - # Note: User muting and hold music are handled automatically by - - # Play pre-transfer message if configured if message_type == "custom" and custom_message: logger.info(f"Playing pre-transfer message: {custom_message}") await self._engine.task.queue_frame(TTSSpeakFrame(custom_message)) @@ -323,7 +318,7 @@ class CustomToolManager: from pipecat.utils.run_context import get_current_call_sid original_call_sid = get_current_call_sid() - caller_number = None # Skip caller number for now as requested + caller_number = None # TODO: check if this is redundant now logger.info(f"Found original call context: call_id={original_call_sid}") @@ -332,7 +327,7 @@ class CustomToolManager: if not organization_id: validation_error_result = { "status": "failed", - "message": "I'm sorry, but I can't determine which organization this call belongs to. Please contact support.", + "message": "I'm sorry, there's an issue with this call transfer. Please contact support.", "action": "transfer_failed", "reason": "no_organization_id", "end_call": False, @@ -341,7 +336,7 @@ class CustomToolManager: validation_error_result, function_call_params, properties ) return - + #TODO: check if everything in transfer_data is still needed # Prepare transfer request data transfer_data = { "destination": destination, @@ -352,7 +347,7 @@ class CustomToolManager: "caller_number": caller_number, # Original caller's phone number } - # Initialize Redis-based transfer coordination + import time # Get backend endpoint URL @@ -500,8 +495,6 @@ class CustomToolManager: f"Transfer initiation failed: {response.status_code} - {error_data}" ) - # No cleanup needed for Redis-based coordination - # Handle initiation failure with user-friendly message initiation_failure_result = { "status": "failed", @@ -518,8 +511,6 @@ class CustomToolManager: except httpx.TimeoutException: logger.error(f"Transfer call '{function_name}' HTTP request timed out") - # No cleanup needed for Redis-based coordination - # Handle HTTP timeout with user-friendly message http_timeout_result = { "status": "failed", @@ -538,8 +529,6 @@ class CustomToolManager: f"Transfer call tool '{function_name}' execution failed: {e}" ) - # No cleanup needed for Redis-based coordination - # Handle generic exception with user-friendly message exception_result = { "status": "failed", @@ -576,10 +565,10 @@ class CustomToolManager: f"Transfer successful! Conference: {conference_id}, Original: {original_call_sid}, Transfer: {transfer_call_sid}" ) - # First inform LLM of success (but don't continue call) + # Inform LLM of success and end the call with Transfer call reason response_properties = FunctionCallResultProperties( run_llm=False - ) # We'll handle the transfer ourselves + ) await function_call_params.result_callback( { "status": "transfer_success", @@ -598,8 +587,6 @@ class CustomToolManager: reason = result.get("reason", "unknown") logger.info(f"Transfer failed ({reason}), informing user and ending call") - # Use system message pattern to direct LLM response for transfer failure - # This is more reliable than function call results from pipecat.frames.frames import LLMMessagesAppendFrame # Create system message with clear instructions for transfer failure @@ -644,13 +631,11 @@ class CustomToolManager: # We'll schedule the end call after a brief delay to allow TTS logger.info("Scheduling call end after LLM delivers failure message") - # Import here to avoid circular dependencies import asyncio # Schedule call end after 3 seconds to allow LLM to speak async def delayed_end_call(): import asyncio - await asyncio.sleep(3) await self._engine.end_call_with_reason( f"transfer_failed_{reason}", # Include specific reason in end reason