From dfb741e475e618f6d526796db8fd7f5d7e11faf6 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Thu, 5 Mar 2026 14:51:39 +0530 Subject: [PATCH] feat: abort call on pipeline error and send rtf event --- api/services/pipecat/event_handlers.py | 17 ++++++++++++++ .../pipecat/realtime_feedback_observer.py | 14 ++++++++++++ api/services/workflow/pipecat_engine.py | 9 ++++---- pipecat | 2 +- .../[runId]/components/RealtimeFeedback.tsx | 9 +++++++- .../[runId]/components/UnifiedTranscript.tsx | 1 + .../components/shared/TranscriptMessage.tsx | 22 +++++++++++++++++-- .../run/[runId]/hooks/useWebSocketRTC.tsx | 17 +++++++++++++- .../[runId]/utils/processTranscriptEvents.ts | 5 ++++- 9 files changed, 86 insertions(+), 10 deletions(-) diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 3777db9..0304f9c 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -3,6 +3,7 @@ from loguru import logger from api.db import db_client from api.enums import WorkflowRunState from api.services.campaign.campaign_call_dispatcher import campaign_call_dispatcher +from api.services.campaign.circuit_breaker import circuit_breaker from api.services.pipecat.audio_config import AudioConfig from api.services.pipecat.in_memory_buffers import ( InMemoryAudioBuffer, @@ -95,6 +96,22 @@ def register_event_handlers( ready_state["pipeline_started"] = True await maybe_trigger_llm() + @task.event_handler("on_pipeline_error") + async def on_pipeline_error(_task: PipelineTask, frame: Frame): + logger.warning(f"Pipeline error for workflow run {workflow_run_id}: {frame}") + try: + workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) + if workflow_run and workflow_run.campaign_id: + await circuit_breaker.record_and_evaluate( + campaign_id=workflow_run.campaign_id, is_failure=True + ) + except Exception as e: + logger.error(f"Error recording circuit breaker failure: {e}", exc_info=True) + + await engine.end_call_with_reason( + EndTaskReason.PIPELINE_ERROR.value, abort_immediately=True + ) + @task.event_handler("on_pipeline_finished") async def on_pipeline_finished( task: PipelineTask, diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index bde6c6a..0f3010e 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -32,6 +32,7 @@ if TYPE_CHECKING: from pipecat.frames.frames import ( CancelFrame, EndFrame, + ErrorFrame, FunctionCallInProgressFrame, FunctionCallResultFrame, InterimTranscriptionFrame, @@ -272,6 +273,19 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) + # Handle pipeline errors + elif isinstance(frame, ErrorFrame): + processor_name = str(frame.processor) if frame.processor else None + await self._send_message( + { + "type": RealtimeFeedbackType.PIPELINE_ERROR.value, + "payload": { + "error": frame.error, + "fatal": frame.fatal, + "processor": processor_name, + }, + } + ) async def _send_ws(self, message: dict): """Send message via WebSocket only, handling errors gracefully.""" diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index 59c4d59..f0398a6 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -536,10 +536,11 @@ class PipecatEngine: # Mute the pipeline self._mute_pipeline = True - # Perform final variable extraction synchronously before ending - await self._perform_variable_extraction_if_needed( - self._current_node, run_in_background=False - ) + if reason != EndTaskReason.PIPELINE_ERROR.value: + # Perform final variable extraction synchronously before ending + await self._perform_variable_extraction_if_needed( + self._current_node, run_in_background=False + ) frame_to_push = ( CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason) diff --git a/pipecat b/pipecat index 791d241..927ed9b 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 791d24196f07b36d64beddda4a9c79aa6ccdb245 +Subproject commit 927ed9bae29e52c14230feb9f61e7ef2a551b0b8 diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx index 5cc5e67..9a0f031 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx @@ -20,6 +20,8 @@ interface RealtimeFeedbackEvent { ttfb_seconds?: number; processor?: string; model?: string; + error?: string; + fatal?: boolean; }; timestamp: string; turn: number; @@ -74,13 +76,16 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr case 'rtf-ttfb-metric': type = 'ttfb-metric'; break; + case 'rtf-pipeline-error': + type = 'pipeline-error'; + break; default: type = 'bot-text'; } return { type, - text: event.payload.text || event.payload.result || event.payload.function_name || event.payload.node_name || '', + text: event.payload.text || event.payload.error || event.payload.result || event.payload.function_name || event.payload.node_name || '', final: event.payload.final, timestamp: event.timestamp, turn: event.turn, @@ -91,6 +96,7 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr ttfbSeconds: event.payload.ttfb_seconds, processor: event.payload.processor, model: event.payload.model, + fatal: event.payload.fatal, }; }); } @@ -111,6 +117,7 @@ function convertLiveMessagesToTranscriptEvents(messages: FeedbackMessage[]): Tra ttfbSeconds: msg.ttfbSeconds, processor: msg.processor, model: msg.model, + fatal: msg.fatal, })); } diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx index 635cb07..3fff78b 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx @@ -49,6 +49,7 @@ export const UnifiedTranscript = ({ status: msg.status, nodeName: msg.nodeName, ttfbSeconds: msg.ttfbSeconds, + fatal: msg.fatal, })); // Default empty state diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx index c41d4ee..19b430a 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx @@ -1,17 +1,18 @@ 'use client'; -import { Brain, GitBranch, Wrench } from 'lucide-react'; +import { AlertTriangle, Brain, GitBranch, Wrench } from 'lucide-react'; import { cn } from '@/lib/utils'; export interface TranscriptMessageData { id: string; - type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error'; text: string; final?: boolean; functionName?: string; nodeName?: string; ttfbSeconds?: number; + fatal?: boolean; } interface TranscriptMessageProps { @@ -36,6 +37,23 @@ export function TranscriptMessage({ message, nextMessage }: TranscriptMessagePro ); } + // Pipeline error - show as a red alert + if (message.type === 'pipeline-error') { + return ( +
+ +
+
+ {message.fatal ? 'Fatal Pipeline Error' : 'Pipeline Error'} +
+
+ {message.text} +
+
+
+ ); + } + // TTFB metric - don't render standalone, it'll be shown with bot messages and function calls if (message.type === 'ttfb-metric') { return null; diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx index 8e6c120..1143732 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx @@ -18,7 +18,7 @@ interface UseWebSocketRTCProps { export interface FeedbackMessage { id: string; - type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error'; text: string; final?: boolean; timestamp: string; @@ -31,6 +31,8 @@ export interface FeedbackMessage { ttfbSeconds?: number; processor?: string; model?: string; + // Pipeline error fields + fatal?: boolean; } export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initialContextVariables }: UseWebSocketRTCProps) => { @@ -400,6 +402,19 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia break; } + case 'rtf-pipeline-error': { + const { error, fatal, processor: errorProcessor } = message.payload; + setFeedbackMessages(prev => [...prev, { + id: `error-${Date.now()}`, + type: 'pipeline-error', + text: error, + fatal, + processor: errorProcessor, + timestamp: new Date().toISOString(), + }]); + break; + } + default: logger.warn('Unknown message type:', message.type); } diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts index 25a6f36..76f6321 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts @@ -4,7 +4,7 @@ */ export interface TranscriptEvent { - type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error'; text: string; final?: boolean; timestamp: string; @@ -16,6 +16,7 @@ export interface TranscriptEvent { ttfbSeconds?: number; processor?: string; model?: string; + fatal?: boolean; } export interface ProcessedMessage { @@ -28,6 +29,7 @@ export interface ProcessedMessage { status?: 'running' | 'completed'; nodeName?: string; ttfbSeconds?: number; + fatal?: boolean; } /** @@ -143,5 +145,6 @@ function convertToProcessedMessage(event: TranscriptEvent, overrideText?: string status: event.status, nodeName: event.nodeName, ttfbSeconds: event.ttfbSeconds, + fatal: event.fatal, }; }