From 182d8a27db47b0de333f40b5359b7e1bfdd564e4 Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Thu, 25 Dec 2025 15:19:35 +0530 Subject: [PATCH] fix: vobiz pipeline cleanup --- api/routes/telephony.py | 22 ++++ api/services/pipecat/run_pipeline.py | 73 +++++++++++ .../telephony/providers/vobiz_provider.py | 21 +++- api/services/telephony/vobiz_api_client.py | 117 ++++++++++++++++++ pipecat | 2 +- 5 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 api/services/telephony/vobiz_api_client.py diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 239a32f..cdaf6b4 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -240,6 +240,28 @@ async def websocket_endpoint( logger.warning( f"Workflow run {workflow_run_id} not in initialized state: {workflow_run.state}" ) + + # For Vobiz calls, stop audio streams to prevent WebSocket retries + if workflow_run.gathered_context and workflow_run.gathered_context.get("provider") == "vobiz": + try: + logger.info(f"[run {workflow_run_id}] Stopping Vobiz audio streams due to completed workflow state") + provider = await get_telephony_provider(workflow.organization_id) + call_id = workflow_run.gathered_context.get("call_id") + if call_id: + logger.debug(f"[run {workflow_run_id}] Using call_id from gathered_context: {call_id}") + + # 1: Stop audio streams + stream_result = await provider.stop_audio_stream(call_id) + logger.info(f"[run {workflow_run_id}] Vobiz stop audio streams - call {call_id}: {stream_result}") + + # 2: Hang up the call as well + hangup_result = await provider.hangup_call(call_id) + logger.info(f"[run {workflow_run_id}] Vobiz hangup call - call {call_id}: {hangup_result}") + else: + logger.warning(f"[run {workflow_run_id}] No call_id found in gathered_context") + except Exception as e: + logger.warning(f"[run {workflow_run_id}] Failed to stop Vobiz streams during state check: {e}") + await websocket.close( code=4409, reason="Workflow run not available for connection" ) diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 59ecec6..96fffc8 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -176,6 +176,67 @@ async def run_pipeline_vonage( raise +async def _send_vobiz_termination_signal(transport, workflow_run_id: int, reason: str = "unknown"): + """Send termination signal to Vobiz and cleanup transport resources when pipeline fails to start. + + This function performs complete pipeline cleanup by: + 1. Sending EndFrame to VobizFrameSerializer to trigger cleanup + 2. Cleaning up transport input/output processors and WebSocket connections + + Args: + transport: The Vobiz transport containing the serializer + workflow_run_id: The workflow run ID for logging + reason: Reason for termination (for logging) + """ + try: + from pipecat.frames.frames import EndFrame, CancelFrame + + logger.info( + f"[run {workflow_run_id}] Starting complete Vobiz termination - reason: {reason}" + ) + + # Send termination signal to Vobiz API via serializer (handles both stream stop and hangup) + if hasattr(transport, '_params') and hasattr(transport._params, 'serializer'): + serializer = transport._params.serializer + if serializer: + logger.debug(f"[run {workflow_run_id}] Sending Vobiz API termination signal") + end_frame = EndFrame() + await serializer.serialize(end_frame) + logger.debug(f"[run {workflow_run_id}] Vobiz API termination signal sent") + else: + logger.warning(f"[run {workflow_run_id}] No serializer found for Vobiz API termination") + else: + logger.warning(f"[run {workflow_run_id}] Transport missing params/serializer for Vobiz API termination") + + # 2: Cleanup transport input and output processors + cancel_frame = CancelFrame() + + # Cleanup input transport + if hasattr(transport, 'input') and callable(getattr(transport.input(), 'cancel', None)): + logger.debug(f"[run {workflow_run_id}] Cleaning up transport input processor") + await transport.input().cancel(cancel_frame) + + # Cleanup output transport + if hasattr(transport, 'output') and callable(getattr(transport.output(), 'cancel', None)): + logger.debug(f"[run {workflow_run_id}] Cleaning up transport output processor") + await transport.output().cancel(cancel_frame) + + # 3: Final transport cleanup + if hasattr(transport, 'cleanup') and callable(transport.cleanup): + logger.debug(f"[run {workflow_run_id}] Running final transport cleanup") + await transport.cleanup() + + logger.info( + f"[run {workflow_run_id}] Complete Vobiz termination successful - API hangup sent, transport cleaned up" + ) + + except Exception as cleanup_error: + logger.error( + f"[run {workflow_run_id}] Failed during Vobiz termination cleanup: {cleanup_error}", + exc_info=True + ) + + async def run_pipeline_vobiz( websocket_client: WebSocket, stream_id: str, @@ -237,6 +298,18 @@ async def run_pipeline_vobiz( logger.error( f"[run {workflow_run_id}] Error in Vobiz pipeline: {e}", exc_info=True ) + + # Attempt cleanup for Vobiz when transport exists + if "transport" in locals(): + logger.info( + f"[run {workflow_run_id}] Performing Vobiz cleanup due to pipeline failure: {type(e).__name__}" + ) + await _send_vobiz_termination_signal(transport, workflow_run_id, str(type(e).__name__)) + else: + logger.debug( + f"[run {workflow_run_id}] No transport to cleanup (error before transport creation)" + ) + raise diff --git a/api/services/telephony/providers/vobiz_provider.py b/api/services/telephony/providers/vobiz_provider.py index 894389e..c286f78 100644 --- a/api/services/telephony/providers/vobiz_provider.py +++ b/api/services/telephony/providers/vobiz_provider.py @@ -11,6 +11,7 @@ from loguru import logger from api.enums import WorkflowRunMode from api.services.telephony.base import CallInitiationResult, TelephonyProvider +from api.services.telephony.vobiz_api_client import VobizApiClient from api.utils.tunnel import TunnelURLProvider if TYPE_CHECKING: @@ -45,6 +46,9 @@ class VobizProvider(TelephonyProvider): self.from_numbers = [self.from_numbers] self.base_url = "https://api.vobiz.ai/api" + + # API client for centralized api calls + self._api_client = VobizApiClient(self.auth_id, self.auth_token) if self.validate_config() else None async def initiate_call( self, @@ -138,7 +142,7 @@ class VobizProvider(TelephonyProvider): return CallInitiationResult( call_id=call_id, status="queued", # Vobiz returns "message": "call fired" - provider_metadata={}, + provider_metadata={"call_id": call_id}, raw_response=response_data, ) @@ -267,6 +271,20 @@ class VobizProvider(TelephonyProvider): logger.error(f"Exception fetching Vobiz call cost: {e}") return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)} + async def stop_audio_stream(self, call_id: str, stream_id: Optional[str] = None) -> Dict[str, Any]: + """Stop Vobiz audio stream(s) using the centralized API client.""" + if not self._api_client: + return {"success": False, "error": "Vobiz api client missing in provider implementation"} + + return await self._api_client.stop_audio_stream(call_id, stream_id) + + async def hangup_call(self, call_id: str) -> Dict[str, Any]: + """Hangup Vobiz call using the centralized API client.""" + if not self._api_client: + return {"success": False, "error": "Vobiz api client missing in provider implementation"} + + return await self._api_client.hangup_call(call_id) + def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Parse Vobiz status callback data into generic format. @@ -338,4 +356,5 @@ class VobizProvider(TelephonyProvider): logger.error( f"[run {workflow_run_id}] Error in Vobiz WebSocket handler: {e}" ) + raise diff --git a/api/services/telephony/vobiz_api_client.py b/api/services/telephony/vobiz_api_client.py new file mode 100644 index 0000000..5147fdc --- /dev/null +++ b/api/services/telephony/vobiz_api_client.py @@ -0,0 +1,117 @@ +""" +Vobiz API client for call and stream management. +""" + +import aiohttp +from typing import Dict, Any, Optional + + +class VobizApiClient: + """Centralized client for Vobiz API operations.""" + + def __init__(self, auth_id: str, auth_token: str): + """Initialize the Vobiz API client. + + Args: + auth_id: Vobiz Account ID + auth_token: Vobiz Auth Token + """ + self.auth_id = auth_id + self.auth_token = auth_token + self.base_url = "https://api.vobiz.ai/api" + + @property + def headers(self) -> Dict[str, str]: + """Get authentication headers for Vobiz API.""" + return { + "X-Auth-ID": self.auth_id, + "X-Auth-Token": self.auth_token, + } + + async def stop_audio_stream(self, call_id: str, stream_id: Optional[str] = None) -> Dict[str, Any]: + """Stop Vobiz audio stream(s) for a call. + + Args: + call_id: The Vobiz call_uuid + stream_id: Optional specific stream ID. If teh stream ID is not available, stops all streams for the call. + + Returns: + Dict containing the API response + """ + if stream_id: + # Stop specific stream + endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/Stream/{stream_id}/" + else: + # Stop all streams for the call + endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/Stream/" + + try: + async with aiohttp.ClientSession() as session: + async with session.delete(endpoint, headers=self.headers) as response: + response_text = await response.text() + response_data = {} + + try: + response_data = await response.json() if response_text else {} + except: + # If JSON parsing fails, include raw text + response_data = {"raw_text": response_text} + + return { + "status_code": response.status, + "response_body": response_data, + "raw_text": response_text + } + + except Exception as e: + return {"exception": str(e)} + + async def hangup_call(self, call_id: str) -> Dict[str, Any]: + """Hang up a Vobiz call. + + Args: + call_id: The Vobiz call_uuid + + Returns: + Dict containing the API response + """ + endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/" + + try: + async with aiohttp.ClientSession() as session: + async with session.delete(endpoint, headers=self.headers) as response: + response_text = await response.text() + response_data = {} + + try: + response_data = await response.json() if response_text else {} + except: + # If JSON parsing fails, include raw text + response_data = {"raw_text": response_text} + + return { + "status_code": response.status, + "response_body": response_data, + "raw_text": response_text + } + + except Exception as e: + return {"exception": str(e)} + + async def stop_streams_and_hangup(self, call_id: str, stream_id: Optional[str] = None) -> tuple[Dict[str, Any], Dict[str, Any]]: + """Stop audio streams and hang up call in the same order. + + Args: + call_id: The Vobiz call_uuid + stream_id: Optional specific stream ID. If None, stops all streams for the call. + + Returns: + Tuple of (stream_result, hangup_result) + """ + # Step 1: Stop audio streams + stream_result = await self.stop_audio_stream(call_id, stream_id) + + # Step 2: Hang up the call + hangup_result = await self.hangup_call(call_id) + + return stream_result, hangup_result \ No newline at end of file diff --git a/pipecat b/pipecat index 06abb8e..6f3a97d 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 06abb8ef6b7d410cdd6ad37595dd9322204c3c7c +Subproject commit 6f3a97d70e166e57588d5dfef40260721a5f6384