feat: abort call on pipeline error and send rtf event

This commit is contained in:
Abhishek Kumar 2026-03-05 14:51:39 +05:30
parent 1614879ddd
commit dfb741e475
9 changed files with 86 additions and 10 deletions

View file

@ -3,6 +3,7 @@ from loguru import logger
from api.db import db_client
from api.enums import WorkflowRunState
from api.services.campaign.campaign_call_dispatcher import campaign_call_dispatcher
from api.services.campaign.circuit_breaker import circuit_breaker
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.in_memory_buffers import (
InMemoryAudioBuffer,
@ -95,6 +96,22 @@ def register_event_handlers(
ready_state["pipeline_started"] = True
await maybe_trigger_llm()
@task.event_handler("on_pipeline_error")
async def on_pipeline_error(_task: PipelineTask, frame: Frame):
logger.warning(f"Pipeline error for workflow run {workflow_run_id}: {frame}")
try:
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if workflow_run and workflow_run.campaign_id:
await circuit_breaker.record_and_evaluate(
campaign_id=workflow_run.campaign_id, is_failure=True
)
except Exception as e:
logger.error(f"Error recording circuit breaker failure: {e}", exc_info=True)
await engine.end_call_with_reason(
EndTaskReason.PIPELINE_ERROR.value, abort_immediately=True
)
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(
task: PipelineTask,

View file

@ -32,6 +32,7 @@ if TYPE_CHECKING:
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InterimTranscriptionFrame,
@ -272,6 +273,19 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
# Handle pipeline errors
elif isinstance(frame, ErrorFrame):
processor_name = str(frame.processor) if frame.processor else None
await self._send_message(
{
"type": RealtimeFeedbackType.PIPELINE_ERROR.value,
"payload": {
"error": frame.error,
"fatal": frame.fatal,
"processor": processor_name,
},
}
)
async def _send_ws(self, message: dict):
"""Send message via WebSocket only, handling errors gracefully."""

View file

@ -536,10 +536,11 @@ class PipecatEngine:
# Mute the pipeline
self._mute_pipeline = True
# Perform final variable extraction synchronously before ending
await self._perform_variable_extraction_if_needed(
self._current_node, run_in_background=False
)
if reason != EndTaskReason.PIPELINE_ERROR.value:
# Perform final variable extraction synchronously before ending
await self._perform_variable_extraction_if_needed(
self._current_node, run_in_background=False
)
frame_to_push = (
CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason)

@ -1 +1 @@
Subproject commit 791d24196f07b36d64beddda4a9c79aa6ccdb245
Subproject commit 927ed9bae29e52c14230feb9f61e7ef2a551b0b8

View file

@ -20,6 +20,8 @@ interface RealtimeFeedbackEvent {
ttfb_seconds?: number;
processor?: string;
model?: string;
error?: string;
fatal?: boolean;
};
timestamp: string;
turn: number;
@ -74,13 +76,16 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr
case 'rtf-ttfb-metric':
type = 'ttfb-metric';
break;
case 'rtf-pipeline-error':
type = 'pipeline-error';
break;
default:
type = 'bot-text';
}
return {
type,
text: event.payload.text || event.payload.result || event.payload.function_name || event.payload.node_name || '',
text: event.payload.text || event.payload.error || event.payload.result || event.payload.function_name || event.payload.node_name || '',
final: event.payload.final,
timestamp: event.timestamp,
turn: event.turn,
@ -91,6 +96,7 @@ function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): Tr
ttfbSeconds: event.payload.ttfb_seconds,
processor: event.payload.processor,
model: event.payload.model,
fatal: event.payload.fatal,
};
});
}
@ -111,6 +117,7 @@ function convertLiveMessagesToTranscriptEvents(messages: FeedbackMessage[]): Tra
ttfbSeconds: msg.ttfbSeconds,
processor: msg.processor,
model: msg.model,
fatal: msg.fatal,
}));
}

View file

@ -49,6 +49,7 @@ export const UnifiedTranscript = ({
status: msg.status,
nodeName: msg.nodeName,
ttfbSeconds: msg.ttfbSeconds,
fatal: msg.fatal,
}));
// Default empty state

View file

@ -1,17 +1,18 @@
'use client';
import { Brain, GitBranch, Wrench } from 'lucide-react';
import { AlertTriangle, Brain, GitBranch, Wrench } from 'lucide-react';
import { cn } from '@/lib/utils';
export interface TranscriptMessageData {
id: string;
type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric';
type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error';
text: string;
final?: boolean;
functionName?: string;
nodeName?: string;
ttfbSeconds?: number;
fatal?: boolean;
}
interface TranscriptMessageProps {
@ -36,6 +37,23 @@ export function TranscriptMessage({ message, nextMessage }: TranscriptMessagePro
);
}
// Pipeline error - show as a red alert
if (message.type === 'pipeline-error') {
return (
<div className="flex items-start gap-2 px-3 py-2 rounded-lg bg-red-500/10 border border-red-500/20">
<AlertTriangle className="h-4 w-4 text-red-500 mt-0.5 shrink-0" />
<div className="flex-1 min-w-0">
<div className="text-xs font-medium text-red-700 dark:text-red-400">
{message.fatal ? 'Fatal Pipeline Error' : 'Pipeline Error'}
</div>
<div className="text-sm text-red-600 dark:text-red-300 mt-0.5 break-words">
{message.text}
</div>
</div>
</div>
);
}
// TTFB metric - don't render standalone, it'll be shown with bot messages and function calls
if (message.type === 'ttfb-metric') {
return null;

View file

@ -18,7 +18,7 @@ interface UseWebSocketRTCProps {
export interface FeedbackMessage {
id: string;
type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric';
type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error';
text: string;
final?: boolean;
timestamp: string;
@ -31,6 +31,8 @@ export interface FeedbackMessage {
ttfbSeconds?: number;
processor?: string;
model?: string;
// Pipeline error fields
fatal?: boolean;
}
export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initialContextVariables }: UseWebSocketRTCProps) => {
@ -400,6 +402,19 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia
break;
}
case 'rtf-pipeline-error': {
const { error, fatal, processor: errorProcessor } = message.payload;
setFeedbackMessages(prev => [...prev, {
id: `error-${Date.now()}`,
type: 'pipeline-error',
text: error,
fatal,
processor: errorProcessor,
timestamp: new Date().toISOString(),
}]);
break;
}
default:
logger.warn('Unknown message type:', message.type);
}

View file

@ -4,7 +4,7 @@
*/
export interface TranscriptEvent {
type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric';
type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric' | 'pipeline-error';
text: string;
final?: boolean;
timestamp: string;
@ -16,6 +16,7 @@ export interface TranscriptEvent {
ttfbSeconds?: number;
processor?: string;
model?: string;
fatal?: boolean;
}
export interface ProcessedMessage {
@ -28,6 +29,7 @@ export interface ProcessedMessage {
status?: 'running' | 'completed';
nodeName?: string;
ttfbSeconds?: number;
fatal?: boolean;
}
/**
@ -143,5 +145,6 @@ function convertToProcessedMessage(event: TranscriptEvent, overrideText?: string
status: event.status,
nodeName: event.nodeName,
ttfbSeconds: event.ttfbSeconds,
fatal: event.fatal,
};
}