chore: code refactor

This commit is contained in:
Abhishek Kumar 2026-02-14 13:43:20 +05:30
parent c0cbc65de3
commit c7812bf189
29 changed files with 538 additions and 800 deletions

6
.claude/settings.json Normal file
View file

@ -0,0 +1,6 @@
{
"effortLevel": "high",
"env": {
"CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS": "1"
}
}

View file

@ -5,15 +5,15 @@ Revises: 02ffd7f23d1d
Create Date: 2026-02-03 11:18:11.417837
"""
from typing import Sequence, Union
from alembic import op
from alembic_postgresql_enum import TableReference
# revision identifiers, used by Alembic.
revision: str = '1a7d74d54e8f'
down_revision: Union[str, None] = '02ffd7f23d1d'
revision: str = "1a7d74d54e8f"
down_revision: Union[str, None] = "02ffd7f23d1d"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

View file

@ -4,15 +4,13 @@ Consolidated from split modules for easier maintenance.
"""
import json
import time
import uuid
from datetime import UTC, datetime
from typing import Dict, Optional
from typing import Optional
from fastapi import (
APIRouter,
Depends,
Form,
Header,
HTTPException,
Request,
@ -36,11 +34,14 @@ from api.services.campaign.campaign_call_dispatcher import campaign_call_dispatc
from api.services.campaign.campaign_event_publisher import get_campaign_event_publisher
from api.services.quota_service import check_dograh_quota, check_dograh_quota_by_user_id
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.transfer_event_protocol import TransferContext
from api.services.telephony.factory import (
get_all_telephony_providers,
get_telephony_provider,
)
from api.services.telephony.transfer_event_protocol import (
TransferEvent,
TransferEventType,
)
from api.utils.common import get_backend_endpoints
from api.utils.telephony_helper import (
generic_hangup_response,
@ -48,7 +49,6 @@ from api.utils.telephony_helper import (
numbers_match,
parse_webhook_request,
)
from pipecat.services.llm_service import FunctionCallParams
from pipecat.utils.run_context import set_current_run_id
router = APIRouter(prefix="/telephony")
@ -509,15 +509,15 @@ async def transfer_twiml(conference_name: str):
Called by Twilio when we redirect the call after closing the WebSocket stream.
"""
logger.info(f"[TRANSFER-TWIML] Generating conference TwiML for: {conference_name}")
twiml_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>Connecting you now.</Say>
<Dial>
<Conference endConferenceOnExit="false" startConferenceOnEnter="true">{conference_name}</Conference>
<Conference endConferenceOnExit="true" startConferenceOnEnter="true">{conference_name}</Conference>
</Dial>
</Response>"""
logger.info(f"[TRANSFER-TWIML] Generated TwiML: {twiml_content}")
return HTMLResponse(content=twiml_content, media_type="application/xml")
@ -1460,6 +1460,8 @@ async def handle_inbound_fallback(request: Request):
)
return generic_hangup_response()
@router.post("/cloudonix/cdr")
async def handle_cloudonix_cdr(request: Request):
"""Handle Cloudonix CDR (Call Detail Record) webhooks.
@ -1510,223 +1512,138 @@ async def handle_cloudonix_cdr(request: Request):
f"disposition: {cdr_data.get('disposition')}, status: {status_update.status}"
)
return {"status": "success"}
class TransferCallRequest(BaseModel):
"""Request model for initiating call transfer using webhook-driven completion"""
"""Request model for initiating a call transfer."""
destination: str # E.164 format phone number (required)
organization_id: int # Organization ID for provider configuration
transfer_id: str # Unique identifier for tracking this transfer
conference_name: str # Conference name for the transfer
timeout: Optional[int] = 20 # seconds to wait for answer
tool_call_id: Optional[str] = None # will generate if not provided
tool_uuid: Optional[str] = None # tool UUID for tracing and validation
original_call_sid: Optional[str] = None # original caller's call SID
caller_number: Optional[str] = None # original caller's phone number
@field_validator("destination")
@classmethod
def validate_destination(cls, destination: str) -> str:
"""Validate destination is in E.164 format."""
import re
if not destination or not destination.strip():
raise ValueError("Destination phone number is required")
E164_PHONE_REGEX = r"^\+[1-9]\d{1,14}$"
if not re.match(E164_PHONE_REGEX, destination.strip()):
raise ValueError(f"Invalid phone number format: {destination}. Must be E.164 format (e.g., +1234567890)")
raise ValueError(
f"Invalid phone number format: {destination}. Must be E.164 format (e.g., +1234567890)"
)
return destination.strip()
@router.post("/call-transfer")
async def initiate_call_transfer(request: TransferCallRequest):
"""Initiate call transfer without blocking the pipeline"""
import aiohttp
# Generate tool_call_id if not provided
if not request.tool_call_id:
request.tool_call_id = f"transfer_{int(time.time())}_{uuid.uuid4().hex[:8]}"
logger.info(f"Starting call transfer to {request.destination} with tool_call_id: {request.tool_call_id}, tool_uuid: {request.tool_uuid}")
"""Initiate a call transfer via the telephony provider.
This endpoint only initiates the outbound call. Transfer context
(original_call_sid, etc.) is stored by the caller
before invoking this endpoint.
"""
logger.info(
f"Starting call transfer to {request.destination} with transfer_id: {request.transfer_id}"
)
try:
from api.services.telephony.factory import get_transfer_provider
try:
provider = await get_transfer_provider(request.organization_id)
provider = await get_telephony_provider(request.organization_id)
except ValueError as e:
logger.error(f"Transfer provider validation failed: {e}")
raise HTTPException(
status_code=400,
detail=f"Call transfer not supported: {str(e)}"
status_code=400, detail=f"Call transfer not supported: {str(e)}"
)
# Validate configuration before attempting transfer
if not provider.supports_transfers():
raise HTTPException(
status_code=400,
detail=f"Provider '{provider.PROVIDER_NAME}' does not support call transfers",
)
if not provider.validate_config():
logger.error(f"Provider {provider.PROVIDER_NAME} configuration is invalid")
raise HTTPException(
status_code=400,
detail=f"Telephony provider '{provider.PROVIDER_NAME}' is not properly configured for transfers"
detail=f"Telephony provider '{provider.PROVIDER_NAME}' is not properly configured for transfers",
)
# Initiate transfer call via provider
logger.info(f"Initiating transfer call via {provider.PROVIDER_NAME} provider")
try:
transfer_result = await provider.transfer_call(
destination=request.destination,
tool_call_id=request.tool_call_id,
timeout=request.timeout
transfer_id=request.transfer_id,
conference_name=request.conference_name,
timeout=request.timeout,
)
except NotImplementedError as e:
# fallback for get_transfer_provider validation
logger.error(f"Provider {provider.PROVIDER_NAME} doesn't support transfers: {e}")
logger.error(
f"Provider {provider.PROVIDER_NAME} doesn't support transfers: {e}"
)
raise HTTPException(
status_code=400,
detail=f"Provider '{provider.PROVIDER_NAME}' does not support call transfers"
detail=f"Provider '{provider.PROVIDER_NAME}' does not support call transfers",
)
except Exception as e:
# Provider API call failed
logger.error(f"Provider transfer call failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Transfer call failed: {str(e)}"
status_code=500, detail=f"Transfer call failed: {str(e)}"
)
call_sid = transfer_result.get("call_sid")
logger.info(f"Transfer call initiated successfully: {call_sid}")
logger.debug(f"Transfer result: {transfer_result}")
# Store the transfer context in Redis for webhook completion
call_transfer_manager = await get_call_transfer_manager()
transfer_context = TransferContext(
tool_call_id=request.tool_call_id,
call_sid=call_sid,
target_number=request.destination,
tool_uuid=request.tool_uuid,
original_call_sid=request.original_call_sid,
caller_number=request.caller_number,
initiated_at=time.time()
)
await call_transfer_manager.store_transfer_context(transfer_context)
return {
"status": "transfer_initiated",
"status": "transfer_initiated",
"call_id": call_sid,
"message": f"Calling {request.destination}...",
"tool_call_id": request.tool_call_id,
"provider": provider.PROVIDER_NAME
"transfer_id": request.transfer_id,
"provider": provider.PROVIDER_NAME,
}
except HTTPException:
# Re-raise HTTP exceptions (already properly formatted)
raise
except Exception as e:
# Catch any other unexpected errors
logger.error(f"Unexpected error during transfer call: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal error during transfer: {str(e)}"
status_code=500, detail=f"Internal error during transfer: {str(e)}"
)
@router.post("/transfer-call-handler/{tool_call_id}")
async def handle_transfer_call_answered(tool_call_id: str, request: Request):
"""Handle when target answers the transfer call"""
logger.info(f"Transfer call answered for tool_call_id: {tool_call_id}")
@router.post("/transfer-result/{transfer_id}")
async def complete_transfer_function_call(transfer_id: str, request: Request):
"""Webhook endpoint to complete the function call with transfer result.
Called by Twilio's StatusCallback when the transfer call status changes.
"""
form_data = await request.form()
data = dict(form_data)
call_sid = data.get("CallSid", "")
# Get transfer context from Redis
call_transfer_manager = await get_call_transfer_manager()
transfer_context = await call_transfer_manager.get_transfer_context(tool_call_id)
original_call_sid = transfer_context.original_call_sid if transfer_context else None
# Use original call SID for conference name if available, otherwise fall back to transfer call SID
base_call_sid = original_call_sid or call_sid
conference_name = f"transfer-{base_call_sid}"
logger.info(f"Using conference name: {conference_name}")
# Publish Redis event for transfer answer completion
try:
# Get transfer coordinator and context
call_transfer_manager = await get_call_transfer_manager()
transfer_context = await call_transfer_manager.get_transfer_context(tool_call_id)
if transfer_context:
# Create transfer answered event
from api.services.telephony.transfer_event_protocol import TransferEvent, TransferEventType
transfer_event = TransferEvent(
type=TransferEventType.TRANSFER_ANSWERED,
tool_call_id=tool_call_id,
original_call_sid=original_call_sid,
transfer_call_sid=call_sid,
conference_name=conference_name,
message="Great! The destination number answered. Let me transfer you now.",
status="success",
action="transfer_success"
)
# Publish the event to Redis
await call_transfer_manager.publish_transfer_event(transfer_event)
logger.info(f"Published TRANSFER_ANSWERED event for {tool_call_id}")
else:
logger.warning(f"No transfer context found for {tool_call_id}")
except Exception as e:
logger.error(f"Error publishing transfer answered event for {tool_call_id}: {e}")
# Return TwiML to put the answerer into the conference
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>You have answered a transfer call. Connecting you now.</Say>
<Dial>
<Conference>{conference_name}</Conference>
</Dial>
</Response>"""
return HTMLResponse(content=twiml, media_type="application/xml")
@router.post("/transfer-result/{tool_call_id}")
async def complete_transfer_function_call(tool_call_id: str, request: Request):
"""Webhook endpoint to complete the function call with transfer result"""
form_data = await request.form()
data = dict(form_data)
call_status = data.get("CallStatus", "")
call_sid = data.get("CallSid", "")
logger.info(f"Transfer result(call status) webhook: {tool_call_id} status={call_status}")
# Skip "completed" status to avoid overriding successful transfer results
# The "answered" status already handled the success case
if call_status == "completed":
logger.info(f"Ignoring 'completed' status for {tool_call_id} to avoid overriding previous results")
return {"status": "ignored", "reason": "completed_status_filtered"}
# Import required event classes
from api.services.telephony.transfer_event_protocol import TransferEvent, TransferEventType
logger.info(
f"Transfer result(call status) webhook: {transfer_id} status={call_status}"
)
# Get transfer context from Redis for additional information
call_transfer_manager = await get_call_transfer_manager()
transfer_context = await call_transfer_manager.get_transfer_context(tool_call_id)
transfer_context = await call_transfer_manager.get_transfer_context(transfer_id)
original_call_sid = transfer_context.original_call_sid if transfer_context else None
caller_number = transfer_context.caller_number if transfer_context else None
conference_name = transfer_context.conference_name if transfer_context else None
# Determine the result based on call status with user-friendly messaging
if call_status == "answered":
# Use original call SID for conference name if available, otherwise fall back to transfer call SID
base_call_sid = original_call_sid or call_sid
conference_name = f"transfer-{base_call_sid}"
if call_status in ("answered", "completed"):
result = {
"status": "success",
"message": "Great! The destination number answered. Let me transfer you now.",
@ -1734,8 +1651,7 @@ async def complete_transfer_function_call(tool_call_id: str, request: Request):
"conference_id": conference_name,
"transfer_call_sid": call_sid, # The outbound transfer call SID
"original_call_sid": original_call_sid, # The original caller's SID
"caller_number": caller_number,
"end_call": False # Continue with transfer
"end_call": False, # Continue with transfer
}
elif call_status == "no-answer":
result = {
@ -1744,31 +1660,33 @@ async def complete_transfer_function_call(tool_call_id: str, request: Request):
"message": "The transfer call was not answered. The person may be busy or unavailable right now.",
"action": "transfer_failed",
"call_sid": call_sid,
"end_call": True
"end_call": True,
}
elif call_status == "busy":
result = {
"status": "transfer_failed",
"status": "transfer_failed",
"reason": "busy",
"message": "The transfer call encountered a busy signal. The person is likely on another call.",
"action": "transfer_failed",
"call_sid": call_sid,
"end_call": True
"end_call": True,
}
elif call_status == "failed":
result = {
"status": "transfer_failed",
"reason": "call_failed",
"message": "The transfer call failed to connect. There may be a network issue or the number is unavailable.",
"action": "transfer_failed",
"action": "transfer_failed",
"call_sid": call_sid,
"end_call": True
"end_call": True,
}
else:
# Intermediate status (ringing, in-progress, etc.), don't complete yet
logger.info(f"Received intermediate status {call_status}, waiting for final status")
logger.info(
f"Received intermediate status {call_status}, waiting for final status"
)
return {"status": "pending"}
# Complete the function call with Redis event publishing
try:
# Determine event type based on result status
@ -1778,41 +1696,27 @@ async def complete_transfer_function_call(tool_call_id: str, request: Request):
event_type = TransferEventType.TRANSFER_TIMEOUT
else:
event_type = TransferEventType.TRANSFER_FAILED
# Create and publish transfer event
# Add caller_number to result if not already present
if "caller_number" not in result and caller_number:
result["caller_number"] = caller_number
transfer_event = TransferEvent(
type=event_type,
tool_call_id=tool_call_id,
transfer_id=transfer_id,
original_call_sid=original_call_sid or "",
transfer_call_sid=call_sid,
conference_name=result.get("conference_id"),
conference_name=conference_name,
message=result.get("message", ""),
status=result["status"],
action=result.get("action", ""),
reason=result.get("reason"),
end_call=result.get("end_call", False)
end_call=result.get("end_call", False),
)
# Publish the event via Redis
await call_transfer_manager.publish_transfer_event(transfer_event)
logger.info(f"Published {event_type} event for {tool_call_id}")
# Clean up transfer context from Redis
await call_transfer_manager.remove_transfer_context(tool_call_id)
logger.info(f"Function call {tool_call_id} completed with result: {result['status']}")
logger.info(
f"Published {event_type} event for {transfer_id} with result: {result['status']}"
)
except Exception as e:
logger.error(f"Error completing function call {tool_call_id}: {e}")
logger.error(f"Error completing transfer {transfer_id}: {e}")
return {"status": "completed", "result": result}

View file

@ -1,8 +1,8 @@
"""API routes for managing tools."""
import re
from datetime import datetime
from typing import Annotated, Any, Dict, List, Literal, Optional, Union
import re
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field, field_validator
@ -70,10 +70,10 @@ class TransferCallConfig(BaseModel):
default=None, description="Custom message to play before transferring the call"
)
timeout: int = Field(
default=30,
ge=5,
le=120,
description="Maximum time in seconds to wait for destination to answer (5-120 seconds)"
default=30,
ge=5,
le=120,
description="Maximum time in seconds to wait for destination to answer (5-120 seconds)",
)
@field_validator("destination")
@ -83,7 +83,7 @@ class TransferCallConfig(BaseModel):
# Allow empty string for initial creation (like HTTP API tools with empty URL)
if not v.strip():
return v
# E.164 format: +[1-9]\d{1,14}
e164_pattern = r"^\+[1-9]\d{1,14}$"
if not re.match(e164_pattern, v):
@ -140,7 +140,9 @@ class CreateToolRequest(BaseModel):
"""Validate that category is a valid ToolCategory value."""
valid_categories = [c.value for c in ToolCategory]
if v not in valid_categories:
raise ValueError(f"Invalid category '{v}'. Must be one of: {', '.join(valid_categories)}")
raise ValueError(
f"Invalid category '{v}'. Must be one of: {', '.join(valid_categories)}"
)
return v

View file

@ -278,7 +278,15 @@ class DograhTTSService(BaseTTSConfiguration):
SARVAM_TTS_MODELS = ["bulbul:v2", "bulbul:v3"]
SARVAM_V2_VOICES = ["anushka", "manisha", "vidya", "arya", "abhilash", "karun", "hitesh"]
SARVAM_V2_VOICES = [
"anushka",
"manisha",
"vidya",
"arya",
"abhilash",
"karun",
"hitesh",
]
SARVAM_V3_VOICES = [
"shubh",
"aditya",

View file

@ -80,22 +80,14 @@ def register_event_handlers(
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(_transport, _participant):
call_disposed = engine.is_call_disposed()
transfer_in_progress = getattr(engine, '_transfer_in_progress', False)
logger.debug(
f"In on_client_disconnected callback handler. Call disposed: {call_disposed}, "
f"Transfer in progress: {transfer_in_progress}"
f"In on_client_disconnected callback handler. Call disposed: {call_disposed}"
)
# Stop recordings
await audio_buffer.stop_recording()
# Skip auto hang-up if transfer is in progress
if transfer_in_progress:
logger.info("Transfer in progress - skipping auto hang-up, letting redirect handle call")
return
logger.info("Transfer in progress - False, proceeding with hang up")
await engine.end_call_with_reason(
EndTaskReason.USER_HANGUP.value, abort_immediately=True
)

View file

@ -73,8 +73,7 @@ def build_pipeline(
pipeline_engine_callback_processor,
]
)
processors.extend(
[
tts, # TTS

View file

@ -124,7 +124,7 @@ async def run_pipeline_twilio(
workflow.organization_id,
vad_config,
ambient_noise_config,
)
)
await _run_pipeline(
transport,
workflow_id,
@ -556,8 +556,9 @@ async def _run_pipeline(
# Create pipeline components
audio_buffer, context = create_pipeline_components(audio_config)
# Set the context and audio_buffer after creation
# Set the context, audio_config, and audio_buffer after creation
engine.set_context(context)
engine.set_audio_config(audio_config)
# Set Stasis connection for immediate transfers (if available)
if stasis_connection:
@ -638,6 +639,9 @@ async def _run_pipeline(
@user_context_aggregator.event_handler("on_user_turn_idle")
async def on_user_turn_idle(aggregator):
if engine._transferring_call:
logger.debug("Not calling user idle since we are transferring call.")
return
await user_idle_handler.handle_idle(aggregator)
@user_context_aggregator.event_handler("on_user_turn_started")

View file

@ -24,7 +24,6 @@ from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
from loguru import logger
librnnoise_path = os.path.normpath(
str(APP_ROOT_DIR / "native" / "rnnoise" / "librnnoise.so")

View file

@ -316,16 +316,18 @@ class TelephonyProvider(ABC):
async def transfer_call(
self,
destination: str,
tool_call_id: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any
**kwargs: Any,
) -> Dict[str, Any]:
"""
Initiate a call transfer to a destination number.
Args:
destination: The destination phone number (E.164 format)
tool_call_id: Unique identifier for tracking this transfer
transfer_id: Unique identifier for tracking this transfer
conference_name: Name of the conference to join the destination into
timeout: Transfer timeout in seconds
**kwargs: Provider-specific additional parameters
@ -334,7 +336,6 @@ class TelephonyProvider(ABC):
- call_sid: Provider's call identifier
- status: Transfer initiation status
- provider: Provider name
- webhook_urls: Dict with answer and status callback URLs
Raises:
NotImplementedError: If provider doesn't support transfers

View file

@ -5,26 +5,27 @@ Handles transfer event publishing, subscription, and context storage
import asyncio
import time
from typing import Optional, Dict, Any
from loguru import logger
from typing import Dict, Optional
import redis.asyncio as aioredis
from loguru import logger
from api.constants import REDIS_URL
from api.services.telephony.transfer_event_protocol import (
TransferEvent,
TransferContext,
TransferEvent,
TransferEventType,
TransferRedisChannels
TransferRedisChannels,
)
class CallTransferManager:
"""Manages call transfer events and context storage using Redis."""
def __init__(self, redis_client: Optional[aioredis.Redis] = None):
self._redis_client = redis_client
self._pubsub_connections: Dict[str, aioredis.client.PubSub] = {}
async def _get_redis(self) -> aioredis.Redis:
"""Get Redis client instance."""
if not self._redis_client:
@ -32,34 +33,36 @@ class CallTransferManager:
REDIS_URL, decode_responses=True
)
return self._redis_client
async def store_transfer_context(self, context: TransferContext, ttl: int = 300) -> None:
async def store_transfer_context(
self, context: TransferContext, ttl: int = 300
) -> None:
"""Store transfer context in Redis with TTL.
Args:
context: Transfer context data
ttl: Time to live in seconds (default 5 minutes)
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(context.tool_call_id)
key = TransferRedisChannels.transfer_context_key(context.transfer_id)
await redis.setex(key, ttl, context.to_json())
logger.debug(f"Stored transfer context for {context.tool_call_id}")
logger.debug(f"Stored transfer context for {context.transfer_id}")
except Exception as e:
logger.error(f"Failed to store transfer context: {e}")
async def get_transfer_context(self, tool_call_id: str) -> Optional[TransferContext]:
async def get_transfer_context(self, transfer_id: str) -> Optional[TransferContext]:
"""Retrieve transfer context from Redis.
Args:
tool_call_id: Tool call identifier
transfer_id: Transfer identifier
Returns:
Transfer context if found, None otherwise
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(tool_call_id)
key = TransferRedisChannels.transfer_context_key(transfer_id)
data = await redis.get(key)
if data:
return TransferContext.from_json(data)
@ -67,24 +70,24 @@ class CallTransferManager:
except Exception as e:
logger.error(f"Failed to get transfer context: {e}")
return None
async def remove_transfer_context(self, tool_call_id: str) -> None:
async def remove_transfer_context(self, transfer_id: str) -> None:
"""Remove transfer context from Redis.
Args:
tool_call_id: Tool call identifier
transfer_id: Transfer identifier
"""
try:
redis = await self._get_redis()
key = TransferRedisChannels.transfer_context_key(tool_call_id)
key = TransferRedisChannels.transfer_context_key(transfer_id)
await redis.delete(key)
logger.debug(f"Removed transfer context for {tool_call_id}")
logger.debug(f"Removed transfer context for {transfer_id}")
except Exception as e:
logger.error(f"Failed to remove transfer context: {e}")
async def publish_transfer_event(self, event: TransferEvent) -> None:
"""Publish transfer event to Redis channel.
Args:
event: Transfer event to publish
"""
@ -92,74 +95,69 @@ class CallTransferManager:
# Add timestamp if not present
if event.timestamp is None:
event.timestamp = time.time()
redis = await self._get_redis()
channel = TransferRedisChannels.transfer_events(event.tool_call_id)
channel = TransferRedisChannels.transfer_events(event.transfer_id)
await redis.publish(channel, event.to_json())
logger.info(f"Published {event.type} event for {event.tool_call_id}")
logger.info(f"Published {event.type} event for {event.transfer_id}")
except Exception as e:
logger.error(f"Failed to publish transfer event: {e}")
async def wait_for_transfer_completion(
self,
tool_call_id: str,
timeout_seconds: float = 30.0
self, transfer_id: str, timeout_seconds: float = 30.0
) -> Optional[TransferEvent]:
"""Wait for transfer completion event using Redis pub/sub.
Args:
tool_call_id: Tool call identifier to wait for
transfer_id: Transfer identifier to wait for
timeout_seconds: Maximum time to wait
Returns:
Transfer completion event if received, None on timeout
"""
channel = TransferRedisChannels.transfer_events(tool_call_id)
channel = TransferRedisChannels.transfer_events(transfer_id)
redis = await self._get_redis()
pubsub = redis.pubsub()
try:
await pubsub.subscribe(channel)
logger.info(f"Waiting for transfer completion on {channel} (timeout: {timeout_seconds}s)")
logger.info(
f"Waiting for transfer completion on {channel} (timeout: {timeout_seconds}s)"
)
# Wait for completion event with timeout
async def wait_for_message():
async for message in pubsub.listen():
if message["type"] == "message":
try:
event = TransferEvent.from_json(message["data"])
logger.info(f"Received {event.type} event for {tool_call_id}")
logger.info(
f"Received {event.type} event for {transfer_id}"
)
# Check if this is a completion event
if event.type in [
TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful
TransferEventType.TRANSFER_COMPLETED,
TransferEventType.TRANSFER_FAILED,
TransferEventType.TRANSFER_CANCELLED,
TransferEventType.TRANSFER_TIMEOUT
]:
if (
event.type
in [
TransferEventType.TRANSFER_ANSWERED, # Call answered = transfer successful
TransferEventType.TRANSFER_COMPLETED,
TransferEventType.TRANSFER_FAILED,
TransferEventType.TRANSFER_CANCELLED,
TransferEventType.TRANSFER_TIMEOUT,
]
):
return event
except Exception as e:
logger.error(f"Failed to parse transfer event: {e}")
continue
return None
# Wait with timeout
result = await asyncio.wait_for(wait_for_message(), timeout=timeout_seconds)
return result
except asyncio.TimeoutError:
logger.error(f"Transfer completion wait timed out for {tool_call_id}")
# Publish timeout event for other instances
timeout_event = TransferEvent(
type=TransferEventType.TRANSFER_TIMEOUT,
tool_call_id=tool_call_id,
original_call_sid="",
status="failed",
reason="timeout",
end_call=True
)
await self.publish_transfer_event(timeout_event)
logger.debug(f"Transfer completion wait timed out for {transfer_id}")
return None
except Exception as e:
logger.error(f"Error waiting for transfer completion: {e}")
@ -170,7 +168,7 @@ class CallTransferManager:
await pubsub.close()
except Exception as e:
logger.error(f"Error closing pubsub connection: {e}")
async def cleanup(self):
"""Clean up Redis connections."""
try:
@ -181,7 +179,7 @@ class CallTransferManager:
except:
pass
self._pubsub_connections.clear()
# Close main Redis connection
if self._redis_client:
await self._redis_client.close()
@ -199,4 +197,4 @@ async def get_call_transfer_manager() -> CallTransferManager:
global _call_transfer_manager
if not _call_transfer_manager:
_call_transfer_manager = CallTransferManager()
return _call_transfer_manager
return _call_transfer_manager

View file

@ -128,29 +128,3 @@ async def get_all_telephony_providers() -> List[Type[TelephonyProvider]]:
List of provider classes that can be used for webhook detection
"""
return [CloudonixProvider, TwilioProvider, VobizProvider, VonageProvider]
async def get_transfer_provider(organization_id: int) -> TelephonyProvider:
"""
Get telephony provider that supports call transfers.
Args:
organization_id: Organization ID for provider lookup
Returns:
Configured telephony provider that supports transfers
Raises:
ValueError: If provider doesn't support transfers or org not configured
"""
provider = await get_telephony_provider(organization_id)
if not provider.supports_transfers():
config = await load_telephony_config(organization_id)
provider_type = config.get("provider", "unknown")
raise ValueError(
f"Organization telephony provider '{provider_type}' does not support call transfers. "
f"Only Twilio provider supports transfers."
)
return provider

View file

@ -686,9 +686,10 @@ class CloudonixProvider(TelephonyProvider):
async def transfer_call(
self,
destination: str,
tool_call_id: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any
**kwargs: Any,
) -> Dict[str, Any]:
"""
Cloudonix provider does not support call transfers.

View file

@ -10,7 +10,6 @@ import aiohttp
from fastapi import HTTPException
from loguru import logger
from twilio.request_validator import RequestValidator
from pipecat.utils.run_context import set_current_call_sid
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
@ -283,11 +282,6 @@ class TwilioProvider(TelephonyProvider):
try:
stream_sid = start_msg["start"]["streamSid"]
call_sid = start_msg["start"]["callSid"]
# Set call SID in Pipecat context for use throughout the pipeline
set_current_call_sid(call_sid)
logger.info(f"Set call SID context: {call_sid}")
except KeyError:
logger.error("Missing streamSid or callSid in start message")
await websocket.close(code=4400, reason="Missing stream identifiers")
@ -473,16 +467,21 @@ class TwilioProvider(TelephonyProvider):
async def transfer_call(
self,
destination: str,
tool_call_id: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any
**kwargs: Any,
) -> Dict[str, Any]:
"""
Initiate a call transfer via Twilio.
Uses inline TwiML to put the destination into a conference when they answer,
and a status callback to track the transfer outcome.
Args:
destination: The destination phone number (E.164 format)
tool_call_id: Unique identifier for tracking this transfer
transfer_id: Unique identifier for tracking this transfer
conference_name: Name of the conference to join the destination into
timeout: Transfer timeout in seconds
**kwargs: Additional Twilio-specific parameters
@ -502,11 +501,18 @@ class TwilioProvider(TelephonyProvider):
backend_endpoint, _ = await get_backend_endpoints()
# Generate webhook URLs for the transfer call
call_url = f"{backend_endpoint}/api/v1/telephony/transfer-call-handler/{tool_call_id}"
status_callback_url = f"{backend_endpoint}/api/v1/telephony/transfer-result/{tool_call_id}"
status_callback_url = (
f"{backend_endpoint}/api/v1/telephony/transfer-result/{transfer_id}"
)
logger.debug(f"Transfer webhook URLs - Answer: {call_url}, Status: {status_callback_url}")
# Inline TwiML: when the destination answers, put them into the conference
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say>You have answered a transfer call. Connecting you now.</Say>
<Dial>
<Conference endConferenceOnExit="true">{conference_name}</Conference>
</Dial>
</Response>"""
# Prepare Twilio API call data
endpoint = f"{self.base_url}/Calls.json"
@ -514,49 +520,55 @@ class TwilioProvider(TelephonyProvider):
"To": destination,
"From": from_number,
"Timeout": timeout,
"Url": call_url,
"Twiml": twiml,
"StatusCallback": status_callback_url,
"StatusCallbackEvent": ["answered", "no-answer", "busy", "failed", "completed"],
"StatusCallbackMethod": "POST"
"StatusCallbackEvent": [
"answered",
"no-answer",
"busy",
"failed",
"completed",
],
"StatusCallbackMethod": "POST",
}
# Add any additional kwargs
data.update(kwargs)
try:
# Make Twilio API call
logger.info(f"Making Twilio transfer API call to: {endpoint}")
logger.info(f"Transfer call data: {data}")
logger.debug(f"Transfer call data: {data}")
async with aiohttp.ClientSession() as session:
auth = aiohttp.BasicAuth(self.account_sid, self.auth_token)
async with session.post(endpoint, data=data, auth=auth) as response:
response_status = response.status
response_text = await response.text()
logger.info(f"Twilio transfer API response status: {response_status}")
logger.info(f"Twilio transfer API response body: {response_text}")
logger.info(
f"Twilio transfer API response status: {response_status}"
)
logger.debug(f"Twilio transfer API response body: {response_text}")
if response_status in [200, 201]:
try:
response_data = await response.json()
call_sid = response_data.get("sid")
logger.info(f"Transfer call initiated successfully: {call_sid}")
logger.info(
f"Transfer call initiated successfully: {call_sid}"
)
return {
"call_sid": call_sid,
"status": response_data.get("status", "queued"),
"provider": self.PROVIDER_NAME,
"webhook_urls": {
"answer": call_url,
"status": status_callback_url
},
"from_number": from_number,
"to_number": destination,
"raw_response": response_data
"raw_response": response_data,
}
except Exception as e:
logger.error(f"Failed to parse Twilio transfer response JSON: {e}")
logger.error(
f"Failed to parse Twilio transfer response JSON: {e}"
)
raise Exception(f"Failed to parse transfer response: {e}")
else:
error_msg = f"Twilio API call failed with status {response_status}: {response_text}"

View file

@ -539,9 +539,10 @@ class VobizProvider(TelephonyProvider):
async def transfer_call(
self,
destination: str,
tool_call_id: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any
**kwargs: Any,
) -> Dict[str, Any]:
"""
Vobiz provider does not support call transfers.

View file

@ -490,9 +490,10 @@ class VonageProvider(TelephonyProvider):
async def transfer_call(
self,
destination: str,
tool_call_id: str,
transfer_id: str,
conference_name: str,
timeout: int = 30,
**kwargs: Any
**kwargs: Any,
) -> Dict[str, Any]:
"""
Vonage provider does not support call transfers.

View file

@ -12,7 +12,7 @@ from typing import Any, Dict, Optional
class TransferEventType(str, Enum):
"""Types of transfer events sent between instances."""
TRANSFER_INITIATED = "transfer_initiated"
TRANSFER_ANSWERED = "transfer_answered"
TRANSFER_COMPLETED = "transfer_completed"
@ -24,9 +24,9 @@ class TransferEventType(str, Enum):
@dataclass
class TransferEvent:
"""Event data structure for transfer coordination."""
type: TransferEventType
tool_call_id: str
transfer_id: str
original_call_sid: str
transfer_call_sid: Optional[str] = None
target_number: Optional[str] = None
@ -37,16 +37,16 @@ class TransferEvent:
reason: Optional[str] = None
end_call: bool = False
timestamp: Optional[float] = None
def to_json(self) -> str:
"""Convert event to JSON string."""
return json.dumps(asdict(self))
@classmethod
@classmethod
def from_json(cls, data: str) -> "TransferEvent":
"""Create event from JSON string."""
return cls(**json.loads(data))
def to_result_dict(self) -> Dict[str, Any]:
"""Convert to function call result format."""
result = {
@ -56,9 +56,8 @@ class TransferEvent:
"conference_id": self.conference_name,
"transfer_call_sid": self.transfer_call_sid,
"original_call_sid": self.original_call_sid,
"caller_number": None, # Will be populated by webhook handler
"end_call": self.end_call,
"reason": self.reason
"reason": self.reason,
}
return result
@ -66,24 +65,24 @@ class TransferEvent:
@dataclass
class TransferContext:
"""Transfer context data stored in Redis."""
tool_call_id: str
transfer_id: str
call_sid: Optional[str]
target_number: str
tool_uuid: str
original_call_sid: str
caller_number: Optional[str]
conference_name: str
initiated_at: float
def to_json(self) -> str:
"""Convert context to JSON string."""
return json.dumps(asdict(self))
@classmethod
def from_json(cls, data: str) -> "TransferContext":
"""Create context from JSON string."""
"""Create context from JSON string."""
return cls(**json.loads(data))
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@ -91,13 +90,13 @@ class TransferContext:
class TransferRedisChannels:
"""Redis channel naming conventions for transfer events."""
@staticmethod
def transfer_events(tool_call_id: str) -> str:
"""Channel for transfer events for a specific tool call."""
return f"transfer:events:{tool_call_id}"
def transfer_events(transfer_id: str) -> str:
"""Channel for transfer events for a specific transfer."""
return f"transfer:events:{transfer_id}"
@staticmethod
def transfer_context_key(tool_call_id: str) -> str:
def transfer_context_key(transfer_id: str) -> str:
"""Redis key for transfer context storage."""
return f"transfer:context:{tool_call_id}"
return f"transfer:context:{transfer_id}"

View file

@ -108,13 +108,16 @@ class PipecatEngine:
# Custom tool manager (initialized in initialize())
self._custom_tool_manager: Optional[CustomToolManager] = None
# Tracks whether a call transfer is in progress
self._transferring_call: bool = False
# Embeddings configuration (passed from run_pipeline.py)
self._embeddings_api_key: Optional[str] = embeddings_api_key
self._embeddings_model: Optional[str] = embeddings_model
self._embeddings_base_url: Optional[str] = embeddings_base_url
# Transfer state tracking - prevents auto hang-up during call transfers
self._transfer_in_progress: bool = False
# Audio configuration (set via set_audio_config from _run_pipeline)
self._audio_config = None
async def _get_organization_id(self) -> Optional[int]:
"""Get and cache the organization ID from workflow run."""
@ -248,7 +251,7 @@ class PipecatEngine:
await function_call_params.result_callback(
result, properties=properties
)
except Exception as e:
logger.error(f"Error in transition function {name}: {str(e)}")
error_result = {"status": "error", "error": str(e)}
@ -281,7 +284,7 @@ class PipecatEngine:
async def calculate_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: safe_calculator")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
expr = function_call_params.arguments.get("expression", "")
result = safe_calculator(expr)
@ -297,7 +300,7 @@ class PipecatEngine:
) -> None:
logger.info(f"LLM Function Call EXECUTED: get_current_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
timezone = function_call_params.arguments.get("timezone", "UTC")
result = get_current_time(timezone)
@ -308,7 +311,7 @@ class PipecatEngine:
async def convert_time_func(function_call_params: FunctionCallParams) -> None:
logger.info(f"LLM Function Call EXECUTED: convert_time")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
result = convert_time(
function_call_params.arguments.get("source_timezone"),
@ -339,7 +342,7 @@ class PipecatEngine:
async def retrieve_kb_func(function_call_params: FunctionCallParams) -> None:
logger.info("LLM Function Call EXECUTED: retrieve_from_knowledge_base")
logger.info(f"Arguments: {function_call_params.arguments}")
try:
query = function_call_params.arguments.get("query", "")
organization_id = await self._get_organization_id()
@ -540,7 +543,9 @@ class PipecatEngine:
self._current_node, run_in_background=False
)
frame_to_push = CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason)
frame_to_push = (
CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason)
)
# Apply disposition mapping - first try call_disposition if it is,
# extracted from the call conversation then fall back to reason
@ -713,6 +718,10 @@ class PipecatEngine:
f"Stasis connection set for immediate transfers: {connection.channel_id}"
)
def set_audio_config(self, audio_config) -> None:
"""Set the audio configuration for the pipeline."""
self._audio_config = audio_config
def set_mute_pipeline(self, mute: bool) -> None:
"""Set the pipeline mute state.
@ -725,6 +734,15 @@ class PipecatEngine:
logger.debug(f"Setting pipeline mute state to: {mute}")
self._mute_pipeline = mute
def set_transferring_call(self, transferring: bool) -> None:
"""Set the call transfer state.
Args:
transferring: True when a call transfer is in progress, False otherwise
"""
logger.debug(f"Setting transferring call state to: {transferring}")
self._transferring_call = transferring
async def handle_llm_text_frame(self, text: str):
"""Accumulate LLM text frames to build reference text."""
self._current_llm_generation_reference_text += text

View file

@ -8,14 +8,17 @@ from __future__ import annotations
import asyncio
import re
import time
import uuid
from typing import TYPE_CHECKING, Any, Optional
import aiohttp
import httpx
from loguru import logger
from api.db import db_client
from api.enums import ToolCategory
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.factory import get_telephony_provider
from api.services.telephony.transfer_event_protocol import TransferContext
from api.services.workflow.disposition_mapper import (
get_organization_id_from_workflow_run,
)
@ -24,25 +27,15 @@ from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
)
from api.utils.hold_audio import load_hold_audio
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.frames.frames import (
FunctionCallResultProperties,
TTSSpeakFrame,
OutputAudioRawFrame,
TTSSpeakFrame,
)
from pipecat.services.llm_service import FunctionCallParams
from pipecat.utils.enums import EndTaskReason
from pipecat.transports.websocket.fastapi import FastAPIWebsocketClient
from api.utils.hold_audio import load_hold_audio
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.transfer_event_protocol import (
TransferEvent,
TransferContext,
TransferEventType,
)
from dograh.api.utils.common import get_backend_endpoints
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine import PipecatEngine
@ -134,8 +127,15 @@ class CustomToolManager:
function_name = schema["function"]["name"]
# Create and register the handler
handler = self._create_handler(tool, function_name)
self._engine.llm.register_function(function_name, handler)
handler, disable_timeout, cancel_on_interruption = self._create_handler(
tool, function_name
)
self._engine.llm.register_function(
function_name,
handler,
cancel_on_interruption=cancel_on_interruption,
disable_timeout=disable_timeout,
)
logger.debug(
f"Registered custom tool handler: {function_name} "
@ -155,12 +155,21 @@ class CustomToolManager:
Returns:
Async handler function for the tool
"""
if tool.category == ToolCategory.END_CALL.value:
return self._create_end_call_handler(tool, function_name)
elif tool.category == ToolCategory.TRANSFER_CALL.value:
return self._create_transfer_call_handler(tool, function_name)
# Whether to disable function call timeout
disable_timeout = False
cancel_on_interruption = True
return self._create_http_tool_handler(tool, function_name)
if tool.category == ToolCategory.END_CALL.value:
cancel_on_interruption = False
handler = self._create_end_call_handler(tool, function_name)
elif tool.category == ToolCategory.TRANSFER_CALL.value:
disable_timeout = True
cancel_on_interruption = False
handler = self._create_transfer_call_handler(tool, function_name)
else:
handler = self._create_http_tool_handler(tool, function_name)
return handler, disable_timeout, cancel_on_interruption
def _create_http_tool_handler(self, tool: Any, function_name: str):
"""Create a handler function for an HTTP API tool.
@ -314,14 +323,6 @@ class CustomToolManager:
logger.info(f"Playing pre-transfer message: {custom_message}")
await self._engine.task.queue_frame(TTSSpeakFrame(custom_message))
# Get original call information from Pipecat context
from pipecat.utils.run_context import get_current_call_sid
original_call_sid = get_current_call_sid()
caller_number = None # TODO: check if this is redundant now
logger.info(f"Found original call context: call_id={original_call_sid}")
# Get organization ID for provider configuration
organization_id = await self.get_organization_id()
if not organization_id:
@ -336,198 +337,143 @@ class CustomToolManager:
validation_error_result, function_call_params, properties
)
return
#TODO: check if everything in transfer_data is still needed
# Prepare transfer request data
transfer_data = {
"destination": destination,
"organization_id": organization_id, # Required for provider configuration
"tool_call_id": function_call_params.tool_call_id, # Use LLM's tool call ID for pipeline coordination
"tool_uuid": tool.tool_uuid, # Add tool UUID for tracing and validation
"original_call_sid": original_call_sid, # Original caller's call SID
"caller_number": caller_number, # Original caller's phone number
}
# Get telephony provider directly (no HTTP round-trip)
provider = await get_telephony_provider(organization_id)
if not provider.supports_transfers() or not provider.validate_config():
validation_error_result = {
"status": "failed",
"message": "I'm sorry, there's an issue with this call transfer. Please contact support.",
"action": "transfer_failed",
"reason": "provider_does_not_support_transfer",
"end_call": False,
}
await self._handle_transfer_result(
validation_error_result, function_call_params, properties
)
return
import time
# Get original callSID from gathered_context
workflow_run = await db_client.get_workflow_run_by_id(
self._engine._workflow_run_id
)
original_call_sid = workflow_run.gathered_context.get("call_id")
# Get backend endpoint URL
backend_url, _ = await get_backend_endpoints()
# Generate a unique transfer ID for tracking this transfer
transfer_id = str(uuid.uuid4())
# Get transfer coordinator for Redis-based coordination
# Compute conference name from original call SID
conference_name = f"transfer-{original_call_sid}"
# Mark transfer in progress and mute the pipeline
self._engine.set_transferring_call(True)
self._engine.set_mute_pipeline(True)
# Initiate transfer via provider with inline TwiML
transfer_result = await provider.transfer_call(
destination=destination,
transfer_id=transfer_id,
conference_name=conference_name,
timeout=timeout_seconds,
)
call_sid = transfer_result.get("call_sid")
logger.info(f"Transfer call initiated successfully: {call_sid}")
# TODO: Possible race here between saving the transfer context
# and getting a callback response from Twilio? Should we store_transfer_context
# before sending request to Twilio and update the transfer context afterwards?
# Store transfer context in Redis
call_transfer_manager = await get_call_transfer_manager()
transfer_context = TransferContext(
transfer_id=transfer_id,
call_sid=call_sid,
target_number=destination,
tool_uuid=tool.tool_uuid,
original_call_sid=original_call_sid,
conference_name=conference_name,
initiated_at=time.time(),
)
await call_transfer_manager.store_transfer_context(transfer_context)
# Now initiate the transfer call
transfer_url = f"{backend_url}/api/v1/telephony/call-transfer"
# Wait for status callback completion using Redis pub/sub
logger.info(
f"Transfer call initiated for {destination} (transfer_id={transfer_id}), waiting for completion..."
)
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
transfer_url,
json=transfer_data,
headers={"Content-Type": "application/json"},
# Authentication headers added by provider if needed
# Start hold music during transfer waiting period
hold_music_stop_event = asyncio.Event()
hold_music_task = None
try:
# Use audio config for sample rate (set during pipeline setup)
sample_rate = (
self._engine._audio_config.transport_out_sample_rate
if self._engine._audio_config
else 8000
)
if response.status_code == 200:
result_data = response.json()
logger.info(f"Transfer initiated successfully: {result_data}")
logger.info(
f"Starting hold music at {sample_rate}Hz while waiting for transfer"
)
# Wait for webhook completion using standard Pipecat async pattern
logger.info(
f"Transfer call initiated for {destination}, waiting for webhook completion..."
# Start hold music as background task
hold_music_task = asyncio.create_task(
self.play_hold_music_loop(hold_music_stop_event, sample_rate)
)
# Wait for transfer completion using Redis pub/sub
logger.info("Waiting for transfer completion via Redis pub/sub...")
transfer_event = (
await call_transfer_manager.wait_for_transfer_completion(
transfer_id, timeout_seconds
)
)
# Start hold music during transfer waiting period
hold_music_stop_event = asyncio.Event()
hold_music_task = None
except Exception as e:
logger.error(f"Error during transfer wait: {e}")
transfer_event = None
try:
# Mute the pipeline to prevent further LLM generations during transfer
logger.info("Muting pipeline during transfer call")
self._engine.set_mute_pipeline(True)
finally:
# Single cleanup point: stop hold music, unmute pipeline, remove context
logger.info(
"Transfer wait ended, cleaning up hold music, pipeline state, and transfer context"
)
hold_music_stop_event.set()
if hold_music_task:
await hold_music_task
self._engine.set_transferring_call(False)
self._engine.set_mute_pipeline(False)
await call_transfer_manager.remove_transfer_context(transfer_id)
# Determine sample rate from transport (default to 8000Hz for Twilio)
sample_rate = 8000
if hasattr(self._engine.transport, "output") and hasattr(
self._engine.transport.output(), "sample_rate"
):
sample_rate = getattr(
self._engine.transport.output(), "sample_rate", 8000
)
logger.info(
f"Starting hold music at {sample_rate}Hz while waiting for transfer"
)
# Start hold music as background task
hold_music_task = asyncio.create_task(
self.play_hold_music_loop(
hold_music_stop_event, sample_rate
)
)
# Wait for transfer completion using Redis pub/sub
logger.info(
"Waiting for transfer completion via Redis pub/sub..."
)
transfer_event = (
await call_transfer_manager.wait_for_transfer_completion(
transfer_data["tool_call_id"], timeout_seconds
)
)
# Stop hold music and unmute pipeline
logger.info(
"Transfer completed, stopping hold music and unmuting pipeline"
)
hold_music_stop_event.set()
if hold_music_task:
await hold_music_task
self._engine.set_mute_pipeline(False)
if transfer_event:
# Get result from transfer event
final_result = transfer_event.to_result_dict()
# Get transfer context for caller number
transfer_context = (
await call_transfer_manager.get_transfer_context(
transfer_data["tool_call_id"]
)
)
if transfer_context and transfer_context.caller_number:
final_result["caller_number"] = (
transfer_context.caller_number
)
# Handle the transfer result and inform user appropriately
await self._handle_transfer_result(
final_result, function_call_params, properties
)
else:
# Handle timeout case
logger.error(
f"Transfer call timed out after {timeout_seconds} seconds"
)
# Create timeout result and handle it through the same flow
timeout_result = {
"status": "failed",
"message": "I'm sorry, but the call is taking longer than expected to connect. The person might not be available right now. Please try calling back later.",
"action": "transfer_failed",
"reason": "timeout",
"end_call": True,
}
await self._handle_transfer_result(
timeout_result, function_call_params, properties
)
except Exception as e:
logger.error(f"Error during transfer wait: {e}")
# Stop hold music and unmute pipeline on error
logger.info(
"Transfer error, stopping hold music and unmuting pipeline"
)
hold_music_stop_event.set()
if hold_music_task:
await hold_music_task
self._engine.set_mute_pipeline(False)
# Handle error case
error_result = {
"status": "failed",
"message": "I'm sorry, but there was an issue processing the transfer. Please try again.",
"action": "transfer_failed",
"reason": "system_error",
"end_call": True,
}
await self._handle_transfer_result(
error_result, function_call_params, properties
)
else:
error_data = (
response.json()
if response.content
else {"error": "Unknown error"}
)
logger.error(
f"Transfer initiation failed: {response.status_code} - {error_data}"
)
# Handle initiation failure with user-friendly message
initiation_failure_result = {
"status": "failed",
"message": "I'm sorry, but I'm having trouble setting up the call transfer right now. There might be a technical issue. Please try again later or contact support.",
"action": "transfer_failed",
"reason": "initiation_failed",
"end_call": True,
}
await self._handle_transfer_result(
initiation_failure_result, function_call_params, properties
)
except httpx.TimeoutException:
logger.error(f"Transfer call '{function_name}' HTTP request timed out")
# Handle HTTP timeout with user-friendly message
http_timeout_result = {
"status": "failed",
"message": "I'm sorry, but there seems to be a network issue preventing me from setting up the call transfer. Please try again in a moment.",
"action": "transfer_failed",
"reason": "network_timeout",
"end_call": True,
}
await self._handle_transfer_result(
http_timeout_result, function_call_params, properties
)
# Handle result (after cleanup)
if transfer_event:
final_result = transfer_event.to_result_dict()
await self._handle_transfer_result(
final_result, function_call_params, properties
)
else:
logger.error(
f"Transfer call timed out or failed after {timeout_seconds} seconds"
)
timeout_result = {
"status": "failed",
"message": "I'm sorry, but the call is taking longer than expected to connect. The person might not be available right now. Please try calling back later.",
"action": "transfer_failed",
"reason": "timeout",
"end_call": True,
}
await self._handle_transfer_result(
timeout_result, function_call_params, properties
)
except Exception as e:
logger.error(
f"Transfer call tool '{function_name}' execution failed: {e}"
)
self._engine.set_transferring_call(False)
self._engine.set_mute_pipeline(False)
# Handle generic exception with user-friendly message
exception_result = {
@ -550,8 +496,6 @@ class CustomToolManager:
"""Handle different transfer call outcomes and take appropriate action."""
action = result.get("action", "")
status = result.get("status", "")
message = result.get("message", "")
should_end_call = result.get("end_call", False)
logger.info(f"Handling transfer result: action={action}, status={status}")
@ -566,9 +510,7 @@ class CustomToolManager:
)
# Inform LLM of success and end the call with Transfer call reason
response_properties = FunctionCallResultProperties(
run_llm=False
)
response_properties = FunctionCallResultProperties(run_llm=False)
await function_call_params.result_callback(
{
"status": "transfer_success",
@ -585,83 +527,19 @@ class CustomToolManager:
elif action == "transfer_failed":
# Transfer failed - inform user via LLM and then end the call
reason = result.get("reason", "unknown")
logger.info(f"Transfer failed ({reason}), informing user and ending call")
logger.info(f"Transfer failed ({reason}), informing user")
from pipecat.frames.frames import LLMMessagesAppendFrame
# Create system message with clear instructions for transfer failure
failure_instruction = {
"role": "system",
"content": f"IMPORTANT: The transfer call has FAILED. Reason: {reason}. You must inform the customer about this failure using this message: '{message}' Then immediately say goodbye and end the conversation. Do NOT ask if they need anything else or continue the conversation. Do NOT continue with transfer language.",
}
# Push the system message to LLM context
await self._engine.task.queue_frame(
LLMMessagesAppendFrame([failure_instruction], run_llm=True)
)
# Also send the function call result for consistency
response_properties = FunctionCallResultProperties(
run_llm=False
) # LLM will be triggered by system message
await function_call_params.result_callback(
{"status": "transfer_failed", "reason": reason, "message": message},
properties=response_properties,
{
"status": "transfer_failed",
"reason": reason,
"message": "Transfer failed",
}
)
# Set appropriate disposition for analytics
disposition_map = {
"no_answer": "transfer_no_answer",
"busy": "transfer_busy",
"call_failed": "transfer_failed",
"timeout": "transfer_timeout",
"no_destination": "transfer_config_error",
"invalid_destination": "transfer_config_error",
"initiation_failed": "transfer_system_error",
"network_timeout": "transfer_system_error",
"execution_error": "transfer_system_error",
}
disposition = disposition_map.get(reason, "transfer_failed")
logger.info(
f"Setting disposition: {disposition} for transfer failure reason: {reason}"
)
# Give the LLM time to speak the message, then end the call with disposition
# We'll schedule the end call after a brief delay to allow TTS
logger.info("Scheduling call end after LLM delivers failure message")
import asyncio
# Schedule call end after 3 seconds to allow LLM to speak
async def delayed_end_call():
import asyncio
await asyncio.sleep(3)
await self._engine.end_call_with_reason(
f"transfer_failed_{reason}", # Include specific reason in end reason
abort_immediately=False, # Allow any queued speech to complete
)
# Create task to end call asynchronously
asyncio.create_task(delayed_end_call())
elif action == "transfer_completed":
# This should no longer happen since we ignore "completed" status in webhook
# to avoid overriding successful transfers
logger.warning(
"Received unexpected 'transfer_completed' action - this should be ignored by webhook now"
)
logger.warning(
"If you see this message, there might be an issue with the webhook status filtering"
)
# For safety, treat it as a generic result without ending the call
await function_call_params.result_callback(result, properties=properties)
else:
# Unknown action, treat as generic success
logger.warning(f"Unknown transfer action: {action}, treating as success")
await function_call_params.result_callback(result, properties=properties)
await function_call_params.result_callback(result)
async def play_hold_music_loop(
self, stop_event: asyncio.Event, sample_rate: int = 8000

View file

@ -6,4 +6,3 @@ class FunctionNames:
SYNC_CAMPAIGN_SOURCE = "sync_campaign_source"
PROCESS_CAMPAIGN_BATCH = "process_campaign_batch"
PROCESS_KNOWLEDGE_BASE_DOCUMENT = "process_knowledge_base_document"

View file

@ -5,9 +5,8 @@ This module provides functionality to load hold music audio files at specific sa
with caching to improve performance during multiple calls.
"""
import io
import wave
from typing import Optional, Dict, Tuple
from typing import Dict, Optional, Tuple
import numpy as np
from loguru import logger
@ -25,46 +24,52 @@ _hold_audio_cache: Dict[Tuple[str, int], np.ndarray] = {}
def load_hold_audio(file_path: str, sample_rate: int) -> Optional[bytes]:
"""Load hold music audio file at the specified sample rate with caching.
Args:
file_path: Path to the hold music audio file
sample_rate: Target sample rate (8000 or 16000 Hz supported)
Returns:
Audio data as bytes (PCM16) or None if loading failed
"""
cache_key = (file_path, sample_rate)
# Check cache first
if cache_key in _hold_audio_cache:
logger.debug(f"Using cached hold audio for {file_path} at {sample_rate}Hz")
audio_data = _hold_audio_cache[cache_key]
return audio_data.tobytes()
try:
logger.info(f"Loading hold audio from {file_path} at {sample_rate}Hz")
# Load audio file
sound, file_sample_rate = sf.read(file_path, dtype="int16")
logger.info(f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}")
logger.info(
f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}"
)
# Ensure mono audio (take first channel if stereo)
if len(sound.shape) > 1:
sound = sound[:, 0]
# Resample if needed
if file_sample_rate != sample_rate:
logger.warning(f"Hold music file has sample rate {file_sample_rate}, expected {sample_rate}")
logger.warning(
f"Hold music file has sample rate {file_sample_rate}, expected {sample_rate}"
)
# For now, we'll use the audio as-is and let the transport handle resampling
# In a production system, you might want to use librosa or scipy for proper resampling
# Convert to int16 and cache
audio_data = sound.astype(np.int16)
_hold_audio_cache[cache_key] = audio_data
logger.info(f"Hold audio loaded successfully: {len(audio_data)} samples at {sample_rate}Hz")
logger.info(
f"Hold audio loaded successfully: {len(audio_data)} samples at {sample_rate}Hz"
)
return audio_data.tobytes()
except Exception as e:
logger.error(f"Failed to load hold audio file {file_path}: {e}")
return None
@ -79,11 +84,11 @@ def clear_hold_audio_cache():
def get_cache_info() -> Dict[str, int]:
"""Get information about the current cache state.
Returns:
Dictionary with cache statistics
"""
return {
"cached_files": len(_hold_audio_cache),
"total_cache_size": sum(len(data) for data in _hold_audio_cache.values())
}
"total_cache_size": sum(len(data) for data in _hold_audio_cache.values()),
}

@ -1 +1 @@
Subproject commit 0019ba697e4d90dbe70902f1e7df751323960d53
Subproject commit ff7d4c19bf02bf0e14bfe7ae20e016e04b8ba27d

View file

@ -167,4 +167,4 @@ export function TransferCallToolConfig({
</CardContent>
</Card>
);
}
}

View file

@ -410,7 +410,7 @@ const data = await response.json();`;
</div>
</div>
<div className="flex items-center gap-2">
{!isEndCallTool && (
{!isEndCallTool && !isTransferCallTool && (
<Button
variant="outline"
onClick={() => setShowCodeDialog(true)}
@ -419,34 +419,9 @@ const data = await response.json();`;
View Code
</Button>
)}
<Button onClick={handleSave} disabled={isSaving}>
{isSaving ? (
<>
<Loader2 className="w-4 h-4 mr-2 animate-spin" />
Saving...
</>
) : (
<>
<Save className="w-4 h-4 mr-2" />
Save
</>
)}
</Button>
</div>
</div>
{error && (
<div className="mb-4 p-4 bg-destructive/10 border border-destructive/20 rounded-lg text-destructive">
{error}
</div>
)}
{saveSuccess && (
<div className="mb-4 p-4 bg-green-500/10 border border-green-500/20 rounded-lg text-green-600">
Tool saved successfully!
</div>
)}
{isEndCallTool ? (
<EndCallToolConfig
name={name}
@ -493,6 +468,34 @@ const data = await response.json();`;
onTimeoutMsChange={setTimeoutMs}
/>
)}
{error && (
<div className="mt-4 p-4 bg-destructive/10 border border-destructive/20 rounded-lg text-destructive">
{error}
</div>
)}
{saveSuccess && (
<div className="mt-4 p-4 bg-green-500/10 border border-green-500/20 rounded-lg text-green-600">
Tool saved successfully!
</div>
)}
<div className="flex justify-end mt-6">
<Button onClick={handleSave} disabled={isSaving}>
{isSaving ? (
<>
<Loader2 className="w-4 h-4 mr-2 animate-spin" />
Saving...
</>
) : (
<>
<Save className="w-4 h-4 mr-2" />
Save
</>
)}
</Button>
</div>
</div>
</div>

View file

@ -1,6 +1,6 @@
"use client";
import { Cog, Globe, type LucideIcon, PhoneOff, PhoneForwarded, Puzzle } from "lucide-react";
import { Cog, Globe, type LucideIcon, PhoneForwarded, PhoneOff, Puzzle } from "lucide-react";
import { type ReactNode } from "react";
export type ToolCategory = "http_api" | "end_call" | "transfer_call" | "native" | "integration";

View file

@ -1,8 +1,9 @@
// This file is auto-generated by @hey-api/openapi-ts
import type { ClientOptions } from './types.gen';
import { type Config, type ClientOptions as DefaultClientOptions, createClient, createConfig } from '@hey-api/client-fetch';
import { type ClientOptions as DefaultClientOptions, type Config, createClient, createConfig } from '@hey-api/client-fetch';
import { createClientConfig } from '../lib/apiClient';
import type { ClientOptions } from './types.gen';
/**
* The `createClientConfig()` function will be called on client initialization
@ -16,4 +17,4 @@ export type CreateClientConfig<T extends DefaultClientOptions = ClientOptions> =
export const client = createClient(createClientConfig(createConfig<ClientOptions>({
baseUrl: 'http://127.0.0.1:8000'
})));
})));

View file

@ -1,3 +1,3 @@
// This file is auto-generated by @hey-api/openapi-ts
export * from './sdk.gen';
export * from './types.gen';
export * from './sdk.gen';

File diff suppressed because one or more lines are too long

View file

@ -882,16 +882,14 @@ export type TransferCallConfig = {
};
/**
* Request model for initiating call transfer using webhook-driven completion
* Request model for initiating a call transfer.
*/
export type TransferCallRequest = {
destination: string;
organization_id: number;
transfer_id: string;
conference_name: string;
timeout?: number | null;
tool_call_id?: string | null;
tool_uuid?: string | null;
original_call_sid?: string | null;
caller_number?: string | null;
};
/**
@ -1611,16 +1609,16 @@ export type InitiateCallTransferApiV1TelephonyCallTransferPostResponses = {
200: unknown;
};
export type HandleTransferCallAnsweredApiV1TelephonyTransferCallHandlerToolCallIdPostData = {
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultTransferIdPostData = {
body?: never;
path: {
tool_call_id: string;
transfer_id: string;
};
query?: never;
url: '/api/v1/telephony/transfer-call-handler/{tool_call_id}';
url: '/api/v1/telephony/transfer-result/{transfer_id}';
};
export type HandleTransferCallAnsweredApiV1TelephonyTransferCallHandlerToolCallIdPostErrors = {
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultTransferIdPostErrors = {
/**
* Not found
*/
@ -1631,59 +1629,9 @@ export type HandleTransferCallAnsweredApiV1TelephonyTransferCallHandlerToolCallI
422: HttpValidationError;
};
export type HandleTransferCallAnsweredApiV1TelephonyTransferCallHandlerToolCallIdPostError = HandleTransferCallAnsweredApiV1TelephonyTransferCallHandlerToolCallIdPostErrors[keyof HandleTransferCallAnsweredApiV1TelephonyTransferCallHandlerToolCallIdPostErrors];
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultTransferIdPostError = CompleteTransferFunctionCallApiV1TelephonyTransferResultTransferIdPostErrors[keyof CompleteTransferFunctionCallApiV1TelephonyTransferResultTransferIdPostErrors];
export type HandleTransferCallAnsweredApiV1TelephonyTransferCallHandlerToolCallIdPostResponses = {
/**
* Successful Response
*/
200: unknown;
};
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultToolCallIdPostData = {
body?: never;
path: {
tool_call_id: string;
};
query?: never;
url: '/api/v1/telephony/transfer-result/{tool_call_id}';
};
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultToolCallIdPostErrors = {
/**
* Not found
*/
404: unknown;
/**
* Validation Error
*/
422: HttpValidationError;
};
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultToolCallIdPostError = CompleteTransferFunctionCallApiV1TelephonyTransferResultToolCallIdPostErrors[keyof CompleteTransferFunctionCallApiV1TelephonyTransferResultToolCallIdPostErrors];
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultToolCallIdPostResponses = {
/**
* Successful Response
*/
200: unknown;
};
export type RegisterTransferToolCallApiV1TelephonyRegisterTransferToolCallPostData = {
body?: never;
path?: never;
query?: never;
url: '/api/v1/telephony/register-transfer-tool-call';
};
export type RegisterTransferToolCallApiV1TelephonyRegisterTransferToolCallPostErrors = {
/**
* Not found
*/
404: unknown;
};
export type RegisterTransferToolCallApiV1TelephonyRegisterTransferToolCallPostResponses = {
export type CompleteTransferFunctionCallApiV1TelephonyTransferResultTransferIdPostResponses = {
/**
* Successful Response
*/
@ -4982,4 +4930,4 @@ export type HealthApiV1HealthGetResponse = HealthApiV1HealthGetResponses[keyof H
export type ClientOptions = {
baseUrl: 'http://127.0.0.1:8000' | (string & {});
};
};