diff --git a/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py b/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py index e7c73562..9b651d54 100644 --- a/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py +++ b/api/alembic/versions/49a8fe6841e6_add_state_field_to_workflow_runs.py @@ -5,13 +5,14 @@ Revises: a188ff90e76f Create Date: 2025-12-10 17:34:31.232048 """ + from typing import Sequence, Union -from alembic import op import sqlalchemy as sa +from alembic import op -revision: str = '49a8fe6841e6' -down_revision: Union[str, None] = 'a188ff90e76f' +revision: str = "49a8fe6841e6" +down_revision: Union[str, None] = "a188ff90e76f" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -19,21 +20,20 @@ depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: # Create the workflow_run_state enum type workflow_run_state_enum = sa.Enum( - 'initialized', 'running', 'completed', - name='workflow_run_state' + "initialized", "running", "completed", name="workflow_run_state" ) workflow_run_state_enum.create(op.get_bind()) - + # Add the state column to workflow_runs table (nullable first) op.add_column( - 'workflow_runs', + "workflow_runs", sa.Column( - 'state', - sa.Enum('initialized', 'running', 'completed', name='workflow_run_state'), - nullable=True - ) + "state", + sa.Enum("initialized", "running", "completed", name="workflow_run_state"), + nullable=True, + ), ) - + # Set appropriate state values for existing records # Completed workflows should be marked as 'completed' # Non-completed workflows should be marked as 'initialized' @@ -44,19 +44,16 @@ def upgrade() -> None: ELSE 'initialized'::workflow_run_state END """) - + # Now make the column non-nullable with 'initialized' as default for new records op.alter_column( - 'workflow_runs', - 'state', - nullable=False, - server_default='initialized' + "workflow_runs", "state", nullable=False, server_default="initialized" ) def downgrade() -> None: # Drop the state column - op.drop_column('workflow_runs', 'state') - + op.drop_column("workflow_runs", "state") + # Drop the enum type - sa.Enum(name='workflow_run_state').drop(op.get_bind()) + sa.Enum(name="workflow_run_state").drop(op.get_bind()) diff --git a/api/enums.py b/api/enums.py index 4510c76c..33b41646 100644 --- a/api/enums.py +++ b/api/enums.py @@ -56,8 +56,8 @@ class StorageBackend(Enum): class WorkflowRunState(Enum): INITIALIZED = "initialized" # Workflow run created, ready for connection - RUNNING = "running" # Websocket connected and pipeline active - COMPLETED = "completed" # Workflow run finished + RUNNING = "running" # Websocket connected and pipeline active + COMPLETED = "completed" # Workflow run finished class WorkflowRunStatus(Enum): diff --git a/api/routes/telephony.py b/api/routes/telephony.py index e4002565..488522f9 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -234,7 +234,9 @@ async def websocket_endpoint( logger.warning( f"Workflow run {workflow_run_id} not in initialized state: {workflow_run.state}" ) - await websocket.close(code=4409, reason="Workflow run not available for connection") + await websocket.close( + code=4409, reason="Workflow run not available for connection" + ) return # Extract provider type from workflow run context @@ -267,10 +269,9 @@ async def websocket_endpoint( # Set workflow run state to 'running' before starting the pipeline await db_client.update_workflow_run( - run_id=workflow_run_id, - state=WorkflowRunState.RUNNING.value + run_id=workflow_run_id, state=WorkflowRunState.RUNNING.value ) - + logger.info( f"[run {workflow_run_id}] Set workflow run state to 'running' for {provider_type} provider" ) @@ -382,9 +383,9 @@ async def _process_status_update( # Mark workflow run as completed await db_client.update_workflow_run( - run_id=workflow_run_id, + run_id=workflow_run_id, is_completed=True, - state=WorkflowRunState.COMPLETED.value + state=WorkflowRunState.COMPLETED.value, ) elif status.status in ["failed", "busy", "no-answer", "canceled"]: diff --git a/api/services/pipecat/service_factory.py b/api/services/pipecat/service_factory.py index fc025563..786ffecc 100644 --- a/api/services/pipecat/service_factory.py +++ b/api/services/pipecat/service_factory.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING from fastapi import HTTPException -from api.constants import MPS_API_URL +from api.constants import MPS_API_URL, REDIS_URL from api.services.configuration.registry import ServiceProviders from pipecat.services.azure.llm import AzureLLMService from pipecat.services.cartesia.stt import CartesiaSTTService @@ -11,6 +11,7 @@ from pipecat.services.deepgram.tts import DeepgramTTSService from pipecat.services.dograh.llm import DograhLLMService from pipecat.services.dograh.stt import DograhSTTService from pipecat.services.dograh.tts import DograhTTSService +from pipecat.services.elevenlabs.elevenlabs_cached_tts import ElevenLabsCachedTTSService from pipecat.services.elevenlabs.tts import ElevenLabsTTSService from pipecat.services.google.llm import GoogleLLMService from pipecat.services.groq.llm import GroqLLMService @@ -82,7 +83,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): ) elif user_config.tts.provider == ServiceProviders.ELEVENLABS.value: voice_id = user_config.tts.voice.split(" - ")[1] - return ElevenLabsTTSService( + return ElevenLabsCachedTTSService( reconnect_on_error=False, api_key=user_config.tts.api_key, voice_id=voice_id, @@ -91,6 +92,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): stability=0.8, speed=user_config.tts.speed, similarity_boost=0.75 ), text_filters=[xml_function_tag_filter], + cache_redis_url=REDIS_URL, ) elif user_config.tts.provider == ServiceProviders.DOGRAH.value: # Convert HTTP URL to WebSocket URL for TTS @@ -102,6 +104,8 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): model=user_config.tts.model.value, voice=user_config.tts.voice.value, text_filters=[xml_function_tag_filter], + cache_enabled=True, + redis_url=REDIS_URL, ) else: raise HTTPException( diff --git a/api/services/pipecat/transport_setup.py b/api/services/pipecat/transport_setup.py index b1ffd25b..eb8811d7 100644 --- a/api/services/pipecat/transport_setup.py +++ b/api/services/pipecat/transport_setup.py @@ -19,7 +19,6 @@ from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams -from pipecat.serializers.plivo import PlivoFrameSerializer from pipecat.serializers.twilio import TwilioFrameSerializer from pipecat.serializers.vobiz import VobizFrameSerializer from pipecat.serializers.vonage import VonageFrameSerializer diff --git a/api/services/telephony/providers/vobiz_provider.py b/api/services/telephony/providers/vobiz_provider.py index 46190487..894389e8 100644 --- a/api/services/telephony/providers/vobiz_provider.py +++ b/api/services/telephony/providers/vobiz_provider.py @@ -299,11 +299,11 @@ class VobizProvider(TelephonyProvider): message handling to VobizFrameSerializer. """ from api.services.pipecat.run_pipeline import run_pipeline_vobiz - + first_msg = await websocket.receive_text() start_msg = json.loads(first_msg) logger.debug(f"Received the first message: {start_msg}") - + # Validate that this is a start event if start_msg.get("event") != "start": logger.error(f"Expected 'start' event, got: {start_msg.get('event')}") @@ -317,7 +317,7 @@ class VobizProvider(TelephonyProvider): start_data = start_msg.get("start", {}) stream_id = start_data.get("streamId") call_id = start_data.get("callId") - + if not stream_id or not call_id: logger.error(f"Missing streamId or callId in start event: {start_data}") await websocket.close(code=4400, reason="Missing streamId or callId") diff --git a/pipecat b/pipecat index 322116d7..4a4eae4b 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 322116d74f4568804b0f11000e187b2b964bde56 +Subproject commit 4a4eae4beb9b2d9456a3ad9c99a8e635ea3100a8