From 1e94a069422f1f6b3c4b61aee850262a22f5bd56 Mon Sep 17 00:00:00 2001 From: Sabiha Khan Date: Thu, 11 Dec 2025 15:38:41 +0530 Subject: [PATCH] fix: call and stream id for vobiz pipeline --- ...6841e6_add_state_field_to_workflow_runs.py | 4 +-- api/db/workflow_run_client.py | 3 ++ api/services/pipecat/transport_setup.py | 11 +++---- .../telephony/providers/vobiz_provider.py | 30 +++++++++++++++---- pipecat | 2 +- 5 files changed, 37 insertions(+), 13 deletions(-) diff --git a/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py b/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py index dd4934db..e7c73562 100644 --- a/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py +++ b/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py @@ -40,8 +40,8 @@ def upgrade() -> None: op.execute(""" UPDATE workflow_runs SET state = CASE - WHEN is_completed = true THEN 'completed' - ELSE 'initialized' + WHEN is_completed = true THEN 'completed'::workflow_run_state + ELSE 'initialized'::workflow_run_state END """) diff --git a/api/db/workflow_run_client.py b/api/db/workflow_run_client.py index fcd4f900..7179ae4e 100644 --- a/api/db/workflow_run_client.py +++ b/api/db/workflow_run_client.py @@ -306,6 +306,7 @@ class WorkflowRunClient(BaseDBClient): initial_context: dict | None = None, gathered_context: dict | None = None, logs: dict | None = None, + state: str | None = None, ) -> WorkflowRunModel: async with self.async_session() as session: result = await session.execute( @@ -337,6 +338,8 @@ class WorkflowRunClient(BaseDBClient): run.logs = {**run.logs, **logs} if is_completed: run.is_completed = is_completed + if state: + run.state = state try: await session.commit() except Exception as e: diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index 4137cce4..b1ffd25b 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -21,6 +21,7 @@ from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnal from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams from pipecat.serializers.plivo import PlivoFrameSerializer from pipecat.serializers.twilio import TwilioFrameSerializer +from pipecat.serializers.vobiz import VobizFrameSerializer from pipecat.serializers.vonage import VonageFrameSerializer from pipecat.transports.base_transport import TransportParams from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection @@ -256,20 +257,20 @@ async def create_vobiz_transport( turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config) - # Use PlivoFrameSerializer for Vobiz (Plivo-compatible protocol) - serializer = PlivoFrameSerializer( + # Use VobizFrameSerializer for Vobiz WebSocket protocol + serializer = VobizFrameSerializer( stream_id=stream_id, call_id=call_id, auth_id=auth_id, auth_token=auth_token, - params=PlivoFrameSerializer.InputParams( - plivo_sample_rate=8000, # Vobiz uses MULAW at 8kHz + params=VobizFrameSerializer.InputParams( + vobiz_sample_rate=8000, # Vobiz uses MULAW at 8kHz sample_rate=audio_config.pipeline_sample_rate, ), ) logger.debug( - f"[run {workflow_run_id}] PlivoFrameSerializer created for Vobiz - " + f"[run {workflow_run_id}] VobizFrameSerializer created for Vobiz - " f"transport_rate=8000Hz, pipeline_rate={audio_config.pipeline_sample_rate}Hz" ) diff --git a/api/services/telephony/providers/vobiz_provider.py b/api/services/telephony/providers/vobiz_provider.py index a38093f6..46190487 100644 --- a/api/services/telephony/providers/vobiz_provider.py +++ b/api/services/telephony/providers/vobiz_provider.py @@ -2,6 +2,7 @@ Vobiz implementation of the TelephonyProvider interface. """ +import json import random from typing import TYPE_CHECKING, Any, Dict, List, Optional @@ -292,16 +293,35 @@ class VobizProvider(TelephonyProvider): workflow_run_id: int, ) -> None: """ - Handle Vobiz WebSocket connection using Plivo-compatible protocol. + Handle Vobiz WebSocket connection using Vobiz WebSocket protocol. - Uses workflow_run_id as stream/call identifiers and delegates - message handling to PlivoFrameSerializer. + Extracts stream_id and call_id from the start event and delegates + message handling to VobizFrameSerializer. """ from api.services.pipecat.run_pipeline import run_pipeline_vobiz + + first_msg = await websocket.receive_text() + start_msg = json.loads(first_msg) + logger.debug(f"Received the first message: {start_msg}") + + # Validate that this is a start event + if start_msg.get("event") != "start": + logger.error(f"Expected 'start' event, got: {start_msg.get('event')}") + await websocket.close(code=4400, reason="Expected start event") + return + + logger.debug(f"Vobiz WebSocket connected for workflow_run {workflow_run_id}") try: - stream_id = f"vobiz-stream-{workflow_run_id}" - call_id = f"vobiz-call-{workflow_run_id}" + # Extract stream_id and call_id from the start event + start_data = start_msg.get("start", {}) + stream_id = start_data.get("streamId") + call_id = start_data.get("callId") + + if not stream_id or not call_id: + logger.error(f"Missing streamId or callId in start event: {start_data}") + await websocket.close(code=4400, reason="Missing streamId or callId") + return logger.info( f"[run {workflow_run_id}] Starting Vobiz WebSocket handler - " diff --git a/pipecat b/pipecat index 3987090b..e264bc36 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 3987090bfb3e1e4a0341f875c93ddee69a740d60 +Subproject commit e264bc3678b9466500f2284f1ca0f5f84dc7eaa8