mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
fix: vobiz pipeline cleanup
This commit is contained in:
parent
e83f3a36d2
commit
182d8a27db
5 changed files with 233 additions and 2 deletions
|
|
@ -240,6 +240,28 @@ async def websocket_endpoint(
|
|||
logger.warning(
|
||||
f"Workflow run {workflow_run_id} not in initialized state: {workflow_run.state}"
|
||||
)
|
||||
|
||||
# For Vobiz calls, stop audio streams to prevent WebSocket retries
|
||||
if workflow_run.gathered_context and workflow_run.gathered_context.get("provider") == "vobiz":
|
||||
try:
|
||||
logger.info(f"[run {workflow_run_id}] Stopping Vobiz audio streams due to completed workflow state")
|
||||
provider = await get_telephony_provider(workflow.organization_id)
|
||||
call_id = workflow_run.gathered_context.get("call_id")
|
||||
if call_id:
|
||||
logger.debug(f"[run {workflow_run_id}] Using call_id from gathered_context: {call_id}")
|
||||
|
||||
# 1: Stop audio streams
|
||||
stream_result = await provider.stop_audio_stream(call_id)
|
||||
logger.info(f"[run {workflow_run_id}] Vobiz stop audio streams - call {call_id}: {stream_result}")
|
||||
|
||||
# 2: Hang up the call as well
|
||||
hangup_result = await provider.hangup_call(call_id)
|
||||
logger.info(f"[run {workflow_run_id}] Vobiz hangup call - call {call_id}: {hangup_result}")
|
||||
else:
|
||||
logger.warning(f"[run {workflow_run_id}] No call_id found in gathered_context")
|
||||
except Exception as e:
|
||||
logger.warning(f"[run {workflow_run_id}] Failed to stop Vobiz streams during state check: {e}")
|
||||
|
||||
await websocket.close(
|
||||
code=4409, reason="Workflow run not available for connection"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -176,6 +176,67 @@ async def run_pipeline_vonage(
|
|||
raise
|
||||
|
||||
|
||||
async def _send_vobiz_termination_signal(transport, workflow_run_id: int, reason: str = "unknown"):
|
||||
"""Send termination signal to Vobiz and cleanup transport resources when pipeline fails to start.
|
||||
|
||||
This function performs complete pipeline cleanup by:
|
||||
1. Sending EndFrame to VobizFrameSerializer to trigger cleanup
|
||||
2. Cleaning up transport input/output processors and WebSocket connections
|
||||
|
||||
Args:
|
||||
transport: The Vobiz transport containing the serializer
|
||||
workflow_run_id: The workflow run ID for logging
|
||||
reason: Reason for termination (for logging)
|
||||
"""
|
||||
try:
|
||||
from pipecat.frames.frames import EndFrame, CancelFrame
|
||||
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Starting complete Vobiz termination - reason: {reason}"
|
||||
)
|
||||
|
||||
# Send termination signal to Vobiz API via serializer (handles both stream stop and hangup)
|
||||
if hasattr(transport, '_params') and hasattr(transport._params, 'serializer'):
|
||||
serializer = transport._params.serializer
|
||||
if serializer:
|
||||
logger.debug(f"[run {workflow_run_id}] Sending Vobiz API termination signal")
|
||||
end_frame = EndFrame()
|
||||
await serializer.serialize(end_frame)
|
||||
logger.debug(f"[run {workflow_run_id}] Vobiz API termination signal sent")
|
||||
else:
|
||||
logger.warning(f"[run {workflow_run_id}] No serializer found for Vobiz API termination")
|
||||
else:
|
||||
logger.warning(f"[run {workflow_run_id}] Transport missing params/serializer for Vobiz API termination")
|
||||
|
||||
# 2: Cleanup transport input and output processors
|
||||
cancel_frame = CancelFrame()
|
||||
|
||||
# Cleanup input transport
|
||||
if hasattr(transport, 'input') and callable(getattr(transport.input(), 'cancel', None)):
|
||||
logger.debug(f"[run {workflow_run_id}] Cleaning up transport input processor")
|
||||
await transport.input().cancel(cancel_frame)
|
||||
|
||||
# Cleanup output transport
|
||||
if hasattr(transport, 'output') and callable(getattr(transport.output(), 'cancel', None)):
|
||||
logger.debug(f"[run {workflow_run_id}] Cleaning up transport output processor")
|
||||
await transport.output().cancel(cancel_frame)
|
||||
|
||||
# 3: Final transport cleanup
|
||||
if hasattr(transport, 'cleanup') and callable(transport.cleanup):
|
||||
logger.debug(f"[run {workflow_run_id}] Running final transport cleanup")
|
||||
await transport.cleanup()
|
||||
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Complete Vobiz termination successful - API hangup sent, transport cleaned up"
|
||||
)
|
||||
|
||||
except Exception as cleanup_error:
|
||||
logger.error(
|
||||
f"[run {workflow_run_id}] Failed during Vobiz termination cleanup: {cleanup_error}",
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
|
||||
async def run_pipeline_vobiz(
|
||||
websocket_client: WebSocket,
|
||||
stream_id: str,
|
||||
|
|
@ -237,6 +298,18 @@ async def run_pipeline_vobiz(
|
|||
logger.error(
|
||||
f"[run {workflow_run_id}] Error in Vobiz pipeline: {e}", exc_info=True
|
||||
)
|
||||
|
||||
# Attempt cleanup for Vobiz when transport exists
|
||||
if "transport" in locals():
|
||||
logger.info(
|
||||
f"[run {workflow_run_id}] Performing Vobiz cleanup due to pipeline failure: {type(e).__name__}"
|
||||
)
|
||||
await _send_vobiz_termination_signal(transport, workflow_run_id, str(type(e).__name__))
|
||||
else:
|
||||
logger.debug(
|
||||
f"[run {workflow_run_id}] No transport to cleanup (error before transport creation)"
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ from loguru import logger
|
|||
|
||||
from api.enums import WorkflowRunMode
|
||||
from api.services.telephony.base import CallInitiationResult, TelephonyProvider
|
||||
from api.services.telephony.vobiz_api_client import VobizApiClient
|
||||
from api.utils.tunnel import TunnelURLProvider
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -45,6 +46,9 @@ class VobizProvider(TelephonyProvider):
|
|||
self.from_numbers = [self.from_numbers]
|
||||
|
||||
self.base_url = "https://api.vobiz.ai/api"
|
||||
|
||||
# API client for centralized api calls
|
||||
self._api_client = VobizApiClient(self.auth_id, self.auth_token) if self.validate_config() else None
|
||||
|
||||
async def initiate_call(
|
||||
self,
|
||||
|
|
@ -138,7 +142,7 @@ class VobizProvider(TelephonyProvider):
|
|||
return CallInitiationResult(
|
||||
call_id=call_id,
|
||||
status="queued", # Vobiz returns "message": "call fired"
|
||||
provider_metadata={},
|
||||
provider_metadata={"call_id": call_id},
|
||||
raw_response=response_data,
|
||||
)
|
||||
|
||||
|
|
@ -267,6 +271,20 @@ class VobizProvider(TelephonyProvider):
|
|||
logger.error(f"Exception fetching Vobiz call cost: {e}")
|
||||
return {"cost_usd": 0.0, "duration": 0, "status": "error", "error": str(e)}
|
||||
|
||||
async def stop_audio_stream(self, call_id: str, stream_id: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""Stop Vobiz audio stream(s) using the centralized API client."""
|
||||
if not self._api_client:
|
||||
return {"success": False, "error": "Vobiz api client missing in provider implementation"}
|
||||
|
||||
return await self._api_client.stop_audio_stream(call_id, stream_id)
|
||||
|
||||
async def hangup_call(self, call_id: str) -> Dict[str, Any]:
|
||||
"""Hangup Vobiz call using the centralized API client."""
|
||||
if not self._api_client:
|
||||
return {"success": False, "error": "Vobiz api client missing in provider implementation"}
|
||||
|
||||
return await self._api_client.hangup_call(call_id)
|
||||
|
||||
def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Parse Vobiz status callback data into generic format.
|
||||
|
|
@ -338,4 +356,5 @@ class VobizProvider(TelephonyProvider):
|
|||
logger.error(
|
||||
f"[run {workflow_run_id}] Error in Vobiz WebSocket handler: {e}"
|
||||
)
|
||||
|
||||
raise
|
||||
|
|
|
|||
117
api/services/telephony/vobiz_api_client.py
Normal file
117
api/services/telephony/vobiz_api_client.py
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
"""
|
||||
Vobiz API client for call and stream management.
|
||||
"""
|
||||
|
||||
import aiohttp
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
|
||||
class VobizApiClient:
|
||||
"""Centralized client for Vobiz API operations."""
|
||||
|
||||
def __init__(self, auth_id: str, auth_token: str):
|
||||
"""Initialize the Vobiz API client.
|
||||
|
||||
Args:
|
||||
auth_id: Vobiz Account ID
|
||||
auth_token: Vobiz Auth Token
|
||||
"""
|
||||
self.auth_id = auth_id
|
||||
self.auth_token = auth_token
|
||||
self.base_url = "https://api.vobiz.ai/api"
|
||||
|
||||
@property
|
||||
def headers(self) -> Dict[str, str]:
|
||||
"""Get authentication headers for Vobiz API."""
|
||||
return {
|
||||
"X-Auth-ID": self.auth_id,
|
||||
"X-Auth-Token": self.auth_token,
|
||||
}
|
||||
|
||||
async def stop_audio_stream(self, call_id: str, stream_id: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""Stop Vobiz audio stream(s) for a call.
|
||||
|
||||
Args:
|
||||
call_id: The Vobiz call_uuid
|
||||
stream_id: Optional specific stream ID. If teh stream ID is not available, stops all streams for the call.
|
||||
|
||||
Returns:
|
||||
Dict containing the API response
|
||||
"""
|
||||
if stream_id:
|
||||
# Stop specific stream
|
||||
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/Stream/{stream_id}/"
|
||||
else:
|
||||
# Stop all streams for the call
|
||||
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/Stream/"
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.delete(endpoint, headers=self.headers) as response:
|
||||
response_text = await response.text()
|
||||
response_data = {}
|
||||
|
||||
try:
|
||||
response_data = await response.json() if response_text else {}
|
||||
except:
|
||||
# If JSON parsing fails, include raw text
|
||||
response_data = {"raw_text": response_text}
|
||||
|
||||
return {
|
||||
"status_code": response.status,
|
||||
"response_body": response_data,
|
||||
"raw_text": response_text
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"exception": str(e)}
|
||||
|
||||
async def hangup_call(self, call_id: str) -> Dict[str, Any]:
|
||||
"""Hang up a Vobiz call.
|
||||
|
||||
Args:
|
||||
call_id: The Vobiz call_uuid
|
||||
|
||||
Returns:
|
||||
Dict containing the API response
|
||||
"""
|
||||
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/{call_id}/"
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.delete(endpoint, headers=self.headers) as response:
|
||||
response_text = await response.text()
|
||||
response_data = {}
|
||||
|
||||
try:
|
||||
response_data = await response.json() if response_text else {}
|
||||
except:
|
||||
# If JSON parsing fails, include raw text
|
||||
response_data = {"raw_text": response_text}
|
||||
|
||||
return {
|
||||
"status_code": response.status,
|
||||
"response_body": response_data,
|
||||
"raw_text": response_text
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"exception": str(e)}
|
||||
|
||||
async def stop_streams_and_hangup(self, call_id: str, stream_id: Optional[str] = None) -> tuple[Dict[str, Any], Dict[str, Any]]:
|
||||
"""Stop audio streams and hang up call in the same order.
|
||||
|
||||
Args:
|
||||
call_id: The Vobiz call_uuid
|
||||
stream_id: Optional specific stream ID. If None, stops all streams for the call.
|
||||
|
||||
Returns:
|
||||
Tuple of (stream_result, hangup_result)
|
||||
"""
|
||||
# Step 1: Stop audio streams
|
||||
stream_result = await self.stop_audio_stream(call_id, stream_id)
|
||||
|
||||
# Step 2: Hang up the call
|
||||
hangup_result = await self.hangup_call(call_id)
|
||||
|
||||
return stream_result, hangup_result
|
||||
2
pipecat
2
pipecat
|
|
@ -1 +1 @@
|
|||
Subproject commit 06abb8ef6b7d410cdd6ad37595dd9322204c3c7c
|
||||
Subproject commit 6f3a97d70e166e57588d5dfef40260721a5f6384
|
||||
Loading…
Add table
Add a link
Reference in a new issue