diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 3777db9..0304f9c 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -3,6 +3,7 @@ from loguru import logger from api.db import db_client from api.enums import WorkflowRunState from api.services.campaign.campaign_call_dispatcher import campaign_call_dispatcher +from api.services.campaign.circuit_breaker import circuit_breaker from api.services.pipecat.audio_config import AudioConfig from api.services.pipecat.in_memory_buffers import ( InMemoryAudioBuffer, @@ -95,6 +96,22 @@ def register_event_handlers( ready_state["pipeline_started"] = True await maybe_trigger_llm() + @task.event_handler("on_pipeline_error") + async def on_pipeline_error(_task: PipelineTask, frame: Frame): + logger.warning(f"Pipeline error for workflow run {workflow_run_id}: {frame}") + try: + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if workflow_run and workflow_run.campaign_id: + await circuit_breaker.record_and_evaluate( + campaign_id=workflow_run.campaign_id, is_failure=True + ) + except Exception as e: + logger.error(f"Error recording circuit breaker failure: {e}", exc_info=True) + + await engine.end_call_with_reason( + EndTaskReason.PIPELINE_ERROR.value, abort_immediately=True + ) + @task.event_handler("on_pipeline_finished") async def on_pipeline_finished( task: PipelineTask, diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index bde6c6a..0f3010e 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -32,6 +32,7 @@ if TYPE_CHECKING: from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, FunctionCallInProgressFrame, FunctionCallResultFrame, InterimTranscriptionFrame, @@ -272,6 +273,19 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) + # Handle pipeline errors + elif isinstance(frame, ErrorFrame): + processor_name = str(frame.processor) if frame.processor else None + await self._send_message( + { + "type": RealtimeFeedbackType.PIPELINE_ERROR.value, + "payload": { + "error": frame.error, + "fatal": frame.fatal, + "processor": processor_name, + }, + } + ) async def _send_ws(self, message: dict): """Send message via WebSocket only, handling errors gracefully.""" diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index 59c4d59..f0398a6 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -536,10 +536,11 @@ class PipecatEngine: # Mute the pipeline self._mute_pipeline = True - # Perform final variable extraction synchronously before ending - await self._perform_variable_extraction_if_needed( - self._current_node, run_in_background=False - ) + if reason != EndTaskReason.PIPELINE_ERROR.value: + # Perform final variable extraction synchronously before ending + await self._perform_variable_extraction_if_needed( + self._current_node, run_in_background=False + ) frame_to_push = ( CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason) diff --git a/pipecat b/pipecat index 791d241..927ed9b 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 791d24196f07b36d64beddda4a9c79aa6ccdb245 +Subproject commit 927ed9bae29e52c14230feb9f61e7ef2a551b0b8 diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx index 5cc5e67..9a0f031 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx @@ -20,6 +20,8 @@ interface RealtimeFeedbackEvent { ttfb_seconds?: number; processor?: string; model?: string; + error?: string; + fatal?: boolean; }; timestamp: string; turn: number; @@ -74,13 +76,16 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr case 'rtf-ttfb-metric': type = 'ttfb-metric'; break; + case 'rtf-pipeline-error': + type = 'pipeline-error'; + break; default: type = 'bot-text'; } return { type, - text: event.payload.text || event.payload.result || event.payload.function_name || event.payload.node_name || '', + text: event.payload.text || event.payload.error || event.payload.result || event.payload.function_name || event.payload.node_name || '', final: event.payload.final, timestamp: event.timestamp, turn: event.turn, @@ -91,6 +96,7 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr ttfbSeconds: event.payload.ttfb_seconds, processor: event.payload.processor, model: event.payload.model, + fatal: event.payload.fatal, }; }); } @@ -111,6 +117,7 @@ function convertLiveMessagesToTranscriptEvents(messages: FeedbackMessage[]): Tra ttfbSeconds: msg.ttfbSeconds, processor: msg.processor, model: msg.model, + fatal: msg.fatal, })); } diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx index 635cb07..3fff78b 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx @@ -49,6 +49,7 @@ export const UnifiedTranscript = ({ status: msg.status, nodeName: msg.nodeName, ttfbSeconds: msg.ttfbSeconds, + fatal: msg.fatal, })); // Default empty state diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx index c41d4ee..19b430a 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx @@ -1,17 +1,18 @@ 'use client'; -import { Brain, GitBranch, Wrench } from 'lucide-react'; +import { AlertTriangle, Brain, GitBranch, Wrench } from 'lucide-react'; import { cn } from '@/lib/utils'; export interface TranscriptMessageData { id: string; - type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error'; text: string; final?: boolean; functionName?: string; nodeName?: string; ttfbSeconds?: number; + fatal?: boolean; } interface TranscriptMessageProps { @@ -36,6 +37,23 @@ export function TranscriptMessage({ message, nextMessage }: TranscriptMessagePro ); } + // Pipeline error - show as a red alert + if (message.type === 'pipeline-error') { + return ( +