From cac25879bf0ca62385bd7ba3444b911ef67494bc Mon Sep 17 00:00:00 2001 From: Abhishek Date: Thu, 15 Jan 2026 16:17:17 +0530 Subject: [PATCH] feat: add rtf in logs (#119) * feat: add rtf in logs * chore: unify the call logs and real time events --- api/routes/workflow.py | 1 + api/schemas/workflow.py | 1 + api/services/pipecat/event_handlers.py | 20 ++- ...script_buffers.py => in_memory_buffers.py} | 39 +++++ .../pipecat/realtime_feedback_observer.py | 59 ++++++- api/services/pipecat/run_pipeline.py | 51 +++++- api/services/workflow/pipecat_engine.py | 17 +- .../[workflowId]/run/[runId]/BrowserCall.tsx | 6 +- .../[runId]/components/RealtimeFeedback.tsx | 161 ++++++++++++++++++ .../components/RealtimeFeedbackPanel.tsx | 152 ----------------- .../[runId]/components/UnifiedTranscript.tsx | 96 +++++++++++ .../run/[runId]/components/index.ts | 4 +- .../components/shared/TranscriptContainer.tsx | 72 ++++++++ .../shared/TranscriptEmptyState.tsx | 20 +++ .../components/shared/TranscriptMessage.tsx | 110 ++++++++++++ .../run/[runId]/hooks/useWebSocketRTC.tsx | 75 +++++--- .../[workflowId]/run/[runId]/page.tsx | 33 ++-- .../[runId]/utils/processTranscriptEvents.ts | 147 ++++++++++++++++ ui/src/client/types.gen.ts | 3 + 19 files changed, 861 insertions(+), 206 deletions(-) rename api/services/pipecat/{audio_transcript_buffers.py => in_memory_buffers.py} (76%) create mode 100644 ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx delete mode 100644 ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackPanel.tsx create mode 100644 ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx create mode 100644 ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptContainer.tsx create mode 100644 ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptEmptyState.tsx create mode 100644 ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx create mode 100644 ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts diff --git a/api/routes/workflow.py b/api/routes/workflow.py index 6ce98a1..f8e47eb 100644 --- a/api/routes/workflow.py +++ b/api/routes/workflow.py @@ -611,6 +611,7 @@ async def get_workflow_run( "initial_context": run.initial_context, "gathered_context": run.gathered_context, "call_type": run.call_type, + "logs": run.logs, } diff --git a/api/schemas/workflow.py b/api/schemas/workflow.py index 5696e8b..f5c338c 100644 --- a/api/schemas/workflow.py +++ b/api/schemas/workflow.py @@ -20,3 +20,4 @@ class WorkflowRunResponseSchema(BaseModel): initial_context: dict | None = None gathered_context: dict | None = None call_type: CallType + logs: Dict[str, Any] | None = None diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 6a654ee..f96c36f 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -4,8 +4,9 @@ from api.db import db_client from api.enums import WorkflowRunState from api.services.campaign.call_dispatcher import campaign_call_dispatcher from api.services.pipecat.audio_config import AudioConfig -from api.services.pipecat.audio_transcript_buffers import ( +from api.services.pipecat.in_memory_buffers import ( InMemoryAudioBuffer, + InMemoryLogsBuffer, InMemoryTranscriptBuffer, ) from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator @@ -80,6 +81,7 @@ def register_task_event_handler( audio_buffer: AudioBufferProcessor, in_memory_audio_buffer: InMemoryAudioBuffer, in_memory_transcript_buffer: InMemoryTranscriptBuffer, + in_memory_logs_buffer: InMemoryLogsBuffer, pipeline_metrics_aggregator: PipelineMetricsAggregator, ): @task.event_handler("on_pipeline_started") @@ -185,6 +187,22 @@ def register_task_event_handler( state=WorkflowRunState.COMPLETED.value, ) + # Save real-time feedback logs to workflow run + if not in_memory_logs_buffer.is_empty: + try: + feedback_events = in_memory_logs_buffer.get_events() + await db_client.update_workflow_run( + run_id=workflow_run_id, + logs={"realtime_feedback_events": feedback_events}, + ) + logger.debug( + f"Saved {len(feedback_events)} feedback events to workflow run logs" + ) + except Exception as e: + logger.error(f"Error saving realtime feedback logs: {e}", exc_info=True) + else: + logger.debug("Logs buffer is empty, skipping save") + # Release concurrent slot for campaign calls if workflow_run and workflow_run.campaign_id: await campaign_call_dispatcher.release_call_slot(workflow_run_id) diff --git a/api/services/pipecat/audio_transcript_buffers.py b/api/services/pipecat/in_memory_buffers.py similarity index 76% rename from api/services/pipecat/audio_transcript_buffers.py rename to api/services/pipecat/in_memory_buffers.py index d087ad8..c4274f4 100644 --- a/api/services/pipecat/audio_transcript_buffers.py +++ b/api/services/pipecat/in_memory_buffers.py @@ -2,6 +2,7 @@ import asyncio import re import tempfile import wave +from datetime import UTC, datetime from typing import List from loguru import logger @@ -120,3 +121,41 @@ class InMemoryTranscriptBuffer: if self._USER_SPEECH_RE.match(line): return True return False + + +class InMemoryLogsBuffer: + """Buffer real-time feedback events in memory during a call, then save to workflow run logs.""" + + def __init__(self, workflow_run_id: int): + self._workflow_run_id = workflow_run_id + self._events: List[dict] = [] + self._turn_counter = 0 + + async def append(self, event: dict): + """Append a feedback event to the buffer with timestamp.""" + # Add timestamp and turn tracking + timestamped_event = { + **event, + "timestamp": datetime.now(UTC).isoformat(), + "turn": self._turn_counter, + } + self._events.append(timestamped_event) + logger.trace( + f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}" + ) + + def increment_turn(self): + """Increment turn counter (called on user transcription completion).""" + self._turn_counter += 1 + logger.trace( + f"Incremented turn counter to {self._turn_counter} for workflow {self._workflow_run_id}" + ) + + def get_events(self) -> List[dict]: + """Get all events for final storage.""" + return self._events + + @property + def is_empty(self) -> bool: + """Check if the buffer is empty.""" + return len(self._events) == 0 diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index bf5275d..94ce896 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -1,19 +1,27 @@ """Real-time feedback observer for sending pipeline events to the frontend. This observer watches pipeline frames and sends relevant events (transcriptions, -bot text) over WebSocket to provide real-time feedback in the UI. +bot text, function calls, TTFB metrics) over WebSocket to provide real-time +feedback in the UI. For frames with presentation timestamps (pts), like TTSTextFrame, we respect the timing by queuing them and sending at the appropriate time, similar to how base_output.py handles timed frames. + +Note: Node transition events are sent directly from PipecatEngine.set_node() +rather than being observed here, to ensure precise timing at the moment of +node changes. """ import asyncio import time -from typing import Awaitable, Callable, Optional, Set +from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Set from loguru import logger +if TYPE_CHECKING: + from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer + from pipecat.frames.frames import ( CancelFrame, EndFrame, @@ -21,33 +29,46 @@ from pipecat.frames.frames import ( FunctionCallResultFrame, InterimTranscriptionFrame, InterruptionFrame, + MetricsFrame, StopFrame, TranscriptionFrame, TTSTextFrame, ) +from pipecat.metrics.metrics import TTFBMetricsData from pipecat.observers.base_observer import BaseObserver, FramePushed from pipecat.processors.frame_processor import FrameDirection from pipecat.utils.time import nanoseconds_to_seconds class RealtimeFeedbackObserver(BaseObserver): - """Observer that sends real-time transcription and bot response events via WebSocket. + """Observer that sends real-time transcription, bot response, and metrics via WebSocket. + + Observes pipeline frames and sends events for: + - User transcriptions (interim and final) + - Bot TTS text (with pts-based timing) + - Function calls (start/end) + - TTFB metrics (LLM generation time only - filters to processors containing "LLM") For frames with pts (presentation timestamp), we queue them and send at the appropriate time to sync with audio playback. + + Note: Node transitions are handled by PipecatEngine.set_node() callback. """ def __init__( self, ws_sender: Callable[[dict], Awaitable[None]], + logs_buffer: Optional["InMemoryLogsBuffer"] = None, ): """ Args: ws_sender: Async function to send messages over WebSocket. Expected signature: async def send(message: dict) -> None + logs_buffer: Optional InMemoryLogsBuffer to persist events for post-call analysis. """ super().__init__() self._ws_sender = ws_sender + self._logs_buffer = logs_buffer self._frames_seen: Set[str] = set() # Clock/timing for pts-based frames (similar to base_output.py) @@ -126,6 +147,8 @@ class RealtimeFeedbackObserver(BaseObserver): frame = data.frame frame_direction = data.direction + logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}") + # Handle pipeline termination - stop clock task if isinstance(frame, (EndFrame, CancelFrame, StopFrame)): await self._cancel_clock_task() @@ -167,6 +190,9 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) + # Increment turn counter on final user transcription + if self._logs_buffer: + self._logs_buffer.increment_turn() # Handle bot TTS text - respect pts timing elif isinstance(frame, TTSTextFrame): message = { @@ -217,11 +243,36 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) + # Handle TTFB metrics - capture LLM generation time only + elif isinstance(frame, MetricsFrame): + # Check if this MetricsFrame contains TTFB data from an LLM processor + for metric_data in frame.data: + if isinstance(metric_data, TTFBMetricsData): + # Only send TTFB if it's from an LLM processor + if metric_data.processor and "LLM" in metric_data.processor: + await self._send_message( + { + "type": "rtf-ttfb-metric", + "payload": { + "ttfb_seconds": metric_data.value, + "processor": metric_data.processor, + "model": metric_data.model, + }, + } + ) async def _send_message(self, message: dict): - """Send message via WebSocket, handling errors gracefully.""" + """Send message via WebSocket AND append to logs buffer, handling errors gracefully.""" + # Send via WebSocket try: await self._ws_sender(message) except Exception as e: # Log but don't fail - feedback is non-critical logger.debug(f"Failed to send real-time feedback message: {e}") + + # Also append to logs buffer + if self._logs_buffer: + try: + await self._logs_buffer.append(message) + except Exception as e: + logger.error(f"Failed to append to logs buffer: {e}") diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index b385a2c..f056554 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -14,6 +14,7 @@ from api.services.pipecat.event_handlers import ( register_transcript_handler, register_transport_event_handlers, ) +from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer from api.services.pipecat.pipeline_builder import ( build_pipeline, create_pipeline_components, @@ -467,11 +468,45 @@ async def _run_pipeline( ReactFlowDTO.model_validate(workflow.workflow_definition_with_fallback) ) + # Create in-memory logs buffer early so it can be used by engine callbacks + in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id) + + # Create node transition callback if WebSocket sender is available + node_transition_callback = None + ws_sender = get_ws_sender(workflow_run_id) + if ws_sender: + + async def send_node_transition( + node_name: str, previous_node: Optional[str] + ) -> None: + """Send node transition event via WebSocket AND log to buffer.""" + message = { + "type": "rtf-node-transition", + "payload": { + "node_name": node_name, + "previous_node": previous_node, + }, + } + # Send via WebSocket + try: + await ws_sender(message) + except Exception as e: + logger.debug(f"Failed to send node transition via WebSocket: {e}") + + # Log to in-memory buffer + try: + await in_memory_logs_buffer.append(message) + except Exception as e: + logger.error(f"Failed to append node transition to logs buffer: {e}") + + node_transition_callback = send_node_transition + engine = PipecatEngine( llm=llm, workflow=workflow_graph, call_context_vars=merged_call_context_vars, workflow_run_id=workflow_run_id, + node_transition_callback=node_transition_callback, ) # Create pipeline components with audio configuration and engine @@ -566,12 +601,6 @@ async def _run_pipeline( # Create pipeline task with audio configuration task = create_pipeline_task(pipeline, workflow_run_id, audio_config) - # Add real-time feedback observer if WebSocket sender is available - ws_sender = get_ws_sender(workflow_run_id) - if ws_sender: - feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender) - task.add_observer(feedback_observer) - # Now set the task on the engine engine.set_task(task) @@ -590,6 +619,15 @@ async def _run_pipeline( ) ) + # Add real-time feedback observer if WebSocket sender is available + # Note: ws_sender was already fetched earlier for node_transition_callback + if ws_sender: + feedback_observer = RealtimeFeedbackObserver( + ws_sender=ws_sender, + logs_buffer=in_memory_logs_buffer, + ) + task.add_observer(feedback_observer) + register_task_event_handler( workflow_run_id, engine, @@ -598,6 +636,7 @@ async def _run_pipeline( audio_buffer, in_memory_audio_buffer, in_memory_transcript_buffer, + in_memory_logs_buffer, pipeline_metrics_aggregator, ) diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index b8bd59f..6ebc2ca 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -62,6 +62,9 @@ class PipecatEngine: call_context_vars: dict, audio_buffer: Optional["AudioBuffer"] = None, workflow_run_id: Optional[int] = None, + node_transition_callback: Optional[ + Callable[[str, Optional[str]], Awaitable[None]] + ] = None, ): self.task = task self.llm = llm @@ -71,6 +74,7 @@ class PipecatEngine: self._call_context_vars = call_context_vars self._audio_buffer = audio_buffer self._workflow_run_id = workflow_run_id + self._node_transition_callback = node_transition_callback self._initialized = False self._client_disconnected = False self._call_disposed = False @@ -359,9 +363,20 @@ class PipecatEngine: f"Executing node: name: {node.name} is_static: {node.is_static} allow_interrupt: {node.allow_interrupt} is_end: {node.is_end}" ) + # Track previous node for transition event + previous_node_name = self._current_node.name if self._current_node else None + # Set current node for all nodes (including static ones) so STT mute filter works self._current_node = node + # Send node transition event if callback is provided + if self._node_transition_callback: + try: + await self._node_transition_callback(node.name, previous_node_name) + except Exception as e: + # Log but don't fail - feedback is non-critical + logger.debug(f"Failed to send node transition event: {e}") + # Handle start nodes if node.is_start: await self._handle_start_node(node) @@ -693,5 +708,3 @@ class PipecatEngine: and not self._user_response_timeout_task.done() ): self._user_response_timeout_task.cancel() - - # Note: Native VoicemailDetector cleanup is handled by the pipeline diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx index 4bfd490..8bd9d20 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/BrowserCall.tsx @@ -9,7 +9,7 @@ import { ApiKeyErrorDialog, AudioControls, ConnectionStatus, - RealtimeFeedbackPanel, + RealtimeFeedback, WorkflowConfigErrorDialog } from "./components"; import { useWebSocketRTC } from "./hooks"; @@ -142,9 +142,9 @@ const BrowserCall = ({ workflowId, workflowRunId, accessToken, initialContextVar {/* Show transcript panel */}
- diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx new file mode 100644 index 0000000..5cc5e67 --- /dev/null +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback.tsx @@ -0,0 +1,161 @@ +'use client'; + +import { FeedbackMessage } from '../hooks/useWebSocketRTC'; +import { processLiveMessages, processTranscriptEvents, TranscriptEvent } from '../utils/processTranscriptEvents'; +import { UnifiedTranscript } from './UnifiedTranscript'; + +// Historical log event format from the backend +interface RealtimeFeedbackEvent { + type: string; + payload: { + text?: string; + final?: boolean; + user_id?: string; + timestamp?: string; + function_name?: string; + tool_call_id?: string; + result?: string; + node_name?: string; + previous_node?: string; + ttfb_seconds?: number; + processor?: string; + model?: string; + }; + timestamp: string; + turn: number; +} + +export interface WorkflowRunLogs { + realtime_feedback_events?: RealtimeFeedbackEvent[]; +} + +// Props for live mode (WebSocket messages) +interface LiveModeProps { + mode: 'live'; + messages: FeedbackMessage[]; + isCallActive: boolean; + isCallCompleted: boolean; +} + +// Props for historical mode (API logs) +interface HistoricalModeProps { + mode: 'historical'; + logs: WorkflowRunLogs | null; +} + +type RealtimeFeedbackProps = LiveModeProps | HistoricalModeProps; + +/** + * Convert backend log events to unified TranscriptEvent format + */ +function convertLogEventsToTranscriptEvents(events: RealtimeFeedbackEvent[]): TranscriptEvent[] { + return events.map(event => { + let type: TranscriptEvent['type']; + let status: TranscriptEvent['status']; + + switch (event.type) { + case 'rtf-user-transcription': + type = 'user-transcription'; + break; + case 'rtf-bot-text': + type = 'bot-text'; + break; + case 'rtf-function-call-start': + type = 'function-call'; + status = 'running'; + break; + case 'rtf-function-call-end': + type = 'function-call'; + status = 'completed'; + break; + case 'rtf-node-transition': + type = 'node-transition'; + break; + case 'rtf-ttfb-metric': + type = 'ttfb-metric'; + break; + default: + type = 'bot-text'; + } + + return { + type, + text: event.payload.text || event.payload.result || event.payload.function_name || event.payload.node_name || '', + final: event.payload.final, + timestamp: event.timestamp, + turn: event.turn, + functionName: event.payload.function_name, + status, + nodeName: event.payload.node_name, + previousNode: event.payload.previous_node, + ttfbSeconds: event.payload.ttfb_seconds, + processor: event.payload.processor, + model: event.payload.model, + }; + }); +} + +/** + * Convert live WebSocket messages to unified TranscriptEvent format + */ +function convertLiveMessagesToTranscriptEvents(messages: FeedbackMessage[]): TranscriptEvent[] { + return messages.map(msg => ({ + type: msg.type, + text: msg.text, + final: msg.final, + timestamp: msg.timestamp, + functionName: msg.functionName, + status: msg.status, + nodeName: msg.nodeName, + previousNode: msg.previousNode, + ttfbSeconds: msg.ttfbSeconds, + processor: msg.processor, + model: msg.model, + })); +} + +/** + * Single unified component that handles both live WebSocket messages + * and historical logs from the API. + */ +export const RealtimeFeedback = (props: RealtimeFeedbackProps) => { + if (props.mode === 'historical') { + // Historical mode - process logs from API + const rawEvents = props.logs?.realtime_feedback_events; + const messages = rawEvents + ? processTranscriptEvents(convertLogEventsToTranscriptEvents(rawEvents)) + : []; + + return ( + + ); + } + + // Live mode - process WebSocket messages (optimized - messages already accumulated) + const { messages, isCallActive, isCallCompleted } = props; + const status = isCallActive ? 'live' : isCallCompleted ? 'ended' : 'ready'; + const processedMessages = processLiveMessages(convertLiveMessagesToTranscriptEvents(messages)); + + return ( + + ); +}; diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackPanel.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackPanel.tsx deleted file mode 100644 index 5409599..0000000 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackPanel.tsx +++ /dev/null @@ -1,152 +0,0 @@ -"use client"; - -import { Loader2, MessageSquare, Mic, MicOff, Wrench } from "lucide-react"; -import { useEffect, useRef } from "react"; - -import { cn } from "@/lib/utils"; - -import { FeedbackMessage } from "../hooks/useWebSocketRTC"; - -interface RealtimeFeedbackPanelProps { - messages: FeedbackMessage[]; - isVisible: boolean; - isCallActive: boolean; - isCallCompleted: boolean; -} - -const MessageItem = ({ msg }: { msg: FeedbackMessage }) => { - // Function call message - centered - if (msg.type === 'function-call') { - return ( -
-
- {msg.status === 'running' ? ( - - ) : ( - - )} - - {msg.functionName}() - - {msg.status === 'completed' && ( - - )} -
-
- ); - } - - const isUser = msg.type === 'user-transcription'; - - // User messages on right, bot messages on left - return ( -
-
-
{msg.text}
- {!msg.final && ( -
- speaking... -
- )} -
-
- ); -}; - -export const RealtimeFeedbackPanel = ({ - messages, - isVisible, - isCallActive, - isCallCompleted -}: RealtimeFeedbackPanelProps) => { - const scrollRef = useRef(null); - - // Auto-scroll to bottom when new messages arrive - useEffect(() => { - if (scrollRef.current) { - scrollRef.current.scrollTop = scrollRef.current.scrollHeight; - } - }, [messages]); - - if (!isVisible) return null; - - return ( -
- {/* Header */} -
-
- - Live Transcript -
- {isCallActive ? ( - <> - - Live - - ) : isCallCompleted ? ( - <> - - Ended - - ) : ( - <> - - Ready - - )} -
-
-
- - {/* Messages */} -
- {messages.length === 0 ? ( -
- -

No messages yet

-

- {isCallActive - ? "Start speaking to see the transcript" - : "Start the call to begin the conversation" - } -

-
- ) : ( -
- {messages.map((msg) => ( - - ))} -
- )} -
- - {/* Footer with message count */} - {messages.length > 0 && ( -
- {messages.filter(m => m.type !== 'function-call').length} messages -
- )} -
- ); -}; diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx new file mode 100644 index 0000000..635cb07 --- /dev/null +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx @@ -0,0 +1,96 @@ +"use client"; + +import { useEffect, useRef } from "react"; + +import { ProcessedMessage } from "../utils/processTranscriptEvents"; +import { TranscriptContainer } from "./shared/TranscriptContainer"; +import { TranscriptEmptyState } from "./shared/TranscriptEmptyState"; +import { TranscriptMessage, TranscriptMessageData } from "./shared/TranscriptMessage"; + +interface UnifiedTranscriptProps { + messages: ProcessedMessage[]; + status: 'ready' | 'live' | 'ended'; + title?: string; + autoScroll?: boolean; + emptyState?: { + title: string; + subtitle: string; + }; +} + +export const UnifiedTranscript = ({ + messages, + status, + title, + autoScroll = false, + emptyState +}: UnifiedTranscriptProps) => { + const scrollRef = useRef(null); + + // Auto-scroll to bottom when new messages arrive (for live mode) + useEffect(() => { + if (autoScroll && scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + }, [messages, autoScroll]); + + // Calculate message count (exclude system messages like function calls, node transitions, TTFB) + const messageCount = messages.filter( + m => m.type === 'user-transcription' || m.type === 'bot-text' + ).length; + + // Convert ProcessedMessage to TranscriptMessageData + const transcriptMessages: TranscriptMessageData[] = messages.map(msg => ({ + id: msg.id, + type: msg.type, + text: msg.text, + final: msg.final, + functionName: msg.functionName, + status: msg.status, + nodeName: msg.nodeName, + ttfbSeconds: msg.ttfbSeconds, + })); + + // Default empty state + const defaultEmptyState = { + title: status === 'live' ? "No messages yet" : "No conversation recorded", + subtitle: status === 'live' + ? "Start speaking to see the transcript" + : "Real-time feedback events were not captured" + }; + + const emptyStateToShow = emptyState || defaultEmptyState; + + return ( + 0 ? messageCount : undefined} + > +
+ {messages.length === 0 ? ( + + ) : ( +
+ {transcriptMessages.map((msg, index) => { + // Skip standalone TTFB metrics (they're rendered inline with bot text) + if (msg.type === 'ttfb-metric') { + return null; + } + return ( + + ); + })} +
+ )} +
+
+ ); +}; diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts b/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts index 6f94ffc..dedd126 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/index.ts @@ -2,5 +2,5 @@ export * from './ApiKeyErrorDialog'; export * from './AudioControls'; export * from './ConnectionStatus'; export * from './ContextDisplay'; -export * from './RealtimeFeedbackPanel'; -export * from './WorkflowConfigErrorDialog' +export * from './RealtimeFeedback'; +export * from './WorkflowConfigErrorDialog'; diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptContainer.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptContainer.tsx new file mode 100644 index 0000000..f62b752 --- /dev/null +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptContainer.tsx @@ -0,0 +1,72 @@ +'use client'; + +import { MessageSquare, Mic, MicOff } from 'lucide-react'; +import { ReactNode } from 'react'; + +import { cn } from '@/lib/utils'; + +type CallStatus = 'ready' | 'live' | 'ended'; + +interface TranscriptContainerProps { + title: string; + status: CallStatus; + children: ReactNode; + messageCount?: number; +} + +const STATUS_CONFIG = { + ready: { + icon: MicOff, + label: 'Ready', + className: 'bg-muted text-muted-foreground', + }, + live: { + icon: Mic, + label: 'Live', + className: 'bg-green-500/10 text-green-600 dark:text-green-400', + }, + ended: { + icon: MicOff, + label: 'Ended', + className: 'bg-muted text-muted-foreground', + }, +}; + +export function TranscriptContainer({ + title, + status, + children, + messageCount +}: TranscriptContainerProps) { + const statusConfig = STATUS_CONFIG[status]; + const StatusIcon = statusConfig.icon; + + return ( +
+ {/* Header */} +
+
+ + {title} +
+ + {statusConfig.label} +
+
+
+ + {/* Content */} + {children} + + {/* Footer with message count */} + {messageCount !== undefined && messageCount > 0 && ( +
+ {messageCount} messages +
+ )} +
+ ); +} diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptEmptyState.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptEmptyState.tsx new file mode 100644 index 0000000..740e431 --- /dev/null +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptEmptyState.tsx @@ -0,0 +1,20 @@ +'use client'; + +import { MessageSquare } from 'lucide-react'; + +interface TranscriptEmptyStateProps { + title: string; + subtitle: string; +} + +export function TranscriptEmptyState({ title, subtitle }: TranscriptEmptyStateProps) { + return ( +
+ +

{title}

+

+ {subtitle} +

+
+ ); +} diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx new file mode 100644 index 0000000..c41d4ee --- /dev/null +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/shared/TranscriptMessage.tsx @@ -0,0 +1,110 @@ +'use client'; + +import { Brain, GitBranch, Wrench } from 'lucide-react'; + +import { cn } from '@/lib/utils'; + +export interface TranscriptMessageData { + id: string; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric'; + text: string; + final?: boolean; + functionName?: string; + nodeName?: string; + ttfbSeconds?: number; +} + +interface TranscriptMessageProps { + message: TranscriptMessageData; + nextMessage?: TranscriptMessageData; +} + +export function TranscriptMessage({ message, nextMessage }: TranscriptMessageProps) { + // Node transition - show as section divider + if (message.type === 'node-transition') { + return ( +
+
+
+ + + {message.nodeName} + +
+
+
+ ); + } + + // TTFB metric - don't render standalone, it'll be shown with bot messages and function calls + if (message.type === 'ttfb-metric') { + return null; + } + + // Function call message - centered with TTFB if present + if (message.type === 'function-call') { + const ttfbMetric = nextMessage?.type === 'ttfb-metric' ? nextMessage : null; + return ( +
+ {/* Show TTFB metric above function call */} + {ttfbMetric && ttfbMetric.ttfbSeconds !== undefined && ( +
+ + Reasoning Delay: + {(ttfbMetric.ttfbSeconds * 1000).toFixed(0)}ms +
+ )} +
+ + + {message.functionName}() + +
+
+ ); + } + + const isUser = message.type === 'user-transcription'; + const isBot = message.type === 'bot-text'; + + // Check if next message is a TTFB metric (for bot messages) + const ttfbMetric = isBot && nextMessage?.type === 'ttfb-metric' ? nextMessage : null; + + // User messages on right, bot messages on left + return ( +
+
+ {/* Show TTFB metric above bot messages */} + {ttfbMetric && ttfbMetric.ttfbSeconds !== undefined && ( +
+ + Reasoning Delay: + {(ttfbMetric.ttfbSeconds * 1000).toFixed(0)}ms +
+ )} +
+
{message.text}
+ {!message.final && ( +
+ speaking... +
+ )} +
+
+
+ ); +} diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx index 38c0749..e923cd3 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx @@ -17,12 +17,19 @@ interface UseWebSocketRTCProps { export interface FeedbackMessage { id: string; - type: 'user-transcription' | 'bot-text' | 'function-call'; + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric'; text: string; final?: boolean; timestamp: string; functionName?: string; status?: 'running' | 'completed'; + // Node transition fields + nodeName?: string; + previousNode?: string; + // TTFB metric fields + ttfbSeconds?: number; + processor?: string; + model?: string; } export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initialContextVariables }: UseWebSocketRTCProps) => { @@ -285,35 +292,26 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia case 'rtf-user-transcription': { const transcription = message.payload; setFeedbackMessages(prev => { - // Mark last bot message as final (user started speaking) - const withBotFinalized = prev.map((m, i) => - i === prev.length - 1 && m.type === 'bot-text' && !m.final - ? { ...m, final: true } - : m + // Step 1: Finalize the last bot message (user started speaking) + const messagesWithBotFinalized = prev.map((msg, idx) => { + const isLastMessage = idx === prev.length - 1; + const isUnfinalizedBotMessage = msg.type === 'bot-text' && !msg.final; + return isLastMessage && isUnfinalizedBotMessage + ? { ...msg, final: true } + : msg; + }); + + // Step 2: Remove any previous interim transcription + const messagesWithoutInterim = messagesWithBotFinalized.filter( + msg => !(msg.type === 'user-transcription' && !msg.final) ); - // For interim transcriptions, replace the last interim - if (!transcription.final) { - const withoutLastInterim = withBotFinalized.filter( - m => !(m.type === 'user-transcription' && !m.final) - ); - return [...withoutLastInterim, { - id: `user-${Date.now()}`, - type: 'user-transcription', - text: transcription.text, - final: false, - timestamp: new Date().toISOString(), - }]; - } - // For final transcriptions, replace interim with final - const withoutInterim = withBotFinalized.filter( - m => !(m.type === 'user-transcription' && !m.final) - ); - return [...withoutInterim, { + // Step 3: Add new transcription (interim or final) + return [...messagesWithoutInterim, { id: `user-${Date.now()}`, type: 'user-transcription', text: transcription.text, - final: true, + final: transcription.final, timestamp: new Date().toISOString(), }]; }); @@ -381,6 +379,33 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia break; } + case 'rtf-node-transition': { + const { node_name, previous_node } = message.payload; + setFeedbackMessages(prev => [...prev, { + id: `node-${Date.now()}`, + type: 'node-transition', + text: node_name, + nodeName: node_name, + previousNode: previous_node, + timestamp: new Date().toISOString(), + }]); + break; + } + + case 'rtf-ttfb-metric': { + const { ttfb_seconds, processor, model } = message.payload; + setFeedbackMessages(prev => [...prev, { + id: `ttfb-${Date.now()}`, + type: 'ttfb-metric', + text: `${(ttfb_seconds * 1000).toFixed(0)}ms`, + ttfbSeconds: ttfb_seconds, + processor, + model, + timestamp: new Date().toISOString(), + }]); + break; + } + default: logger.warn('Unknown message type:', message.type); } diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx index 0c055c4..27b82f6 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/page.tsx @@ -6,6 +6,7 @@ import { useParams } from 'next/navigation'; import { useEffect, useRef, useState } from 'react'; import BrowserCall from '@/app/workflow/[workflowId]/run/[runId]/BrowserCall'; +import { RealtimeFeedback, WorkflowRunLogs } from '@/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedback'; import WorkflowLayout from '@/app/workflow/WorkflowLayout'; import { getWorkflowRunApiV1WorkflowWorkflowIdRunsRunIdGet } from '@/client/sdk.gen'; import { MediaPreviewButtons, MediaPreviewDialog } from '@/components/MediaPreviewDialog'; @@ -23,6 +24,7 @@ interface WorkflowRunResponse { recording_url: string | null; initial_context: Record | null; gathered_context: Record | null; + logs: WorkflowRunLogs | null; } function ContextDisplay({ title, context }: { title: string; context: Record | null }) { @@ -116,6 +118,7 @@ export default function WorkflowRunPage() { recording_url: response.data?.recording_url ?? null, initial_context: response.data?.initial_context as Record | null ?? null, gathered_context: response.data?.gathered_context as Record | null ?? null, + logs: response.data?.logs as WorkflowRunLogs | null ?? null, }); }; fetchWorkflowRun(); @@ -147,8 +150,10 @@ export default function WorkflowRunPage() { } else if (workflowRun?.is_completed) { returnValue = ( -
-
+
+ {/* Main content - 2/3 width */} +
+
@@ -235,17 +240,23 @@ export default function WorkflowRunPage() { -
- - +
+ + +
+ + {/* Transcript panel - 1/3 width */} +
+ +
); } diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts new file mode 100644 index 0000000..25a6f36 --- /dev/null +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/utils/processTranscriptEvents.ts @@ -0,0 +1,147 @@ +/** + * Utility to process realtime feedback events into a unified transcript format. + * Used by both live WebSocket messages and post-call logs. + */ + +export interface TranscriptEvent { + type: 'user-transcription' | 'bot-text' | 'function-call' | 'node-transition' | 'ttfb-metric'; + text: string; + final?: boolean; + timestamp: string; + turn?: number; + functionName?: string; + status?: 'running' | 'completed'; + nodeName?: string; + previousNode?: string; + ttfbSeconds?: number; + processor?: string; + model?: string; +} + +export interface ProcessedMessage { + id: string; + type: TranscriptEvent['type']; + text: string; + final?: boolean; + timestamp: string; + functionName?: string; + status?: 'running' | 'completed'; + nodeName?: string; + ttfbSeconds?: number; +} + +/** + * Process transcript events (both live and historical). + * Combines consecutive bot-text by turn and associates TTFB metrics. + */ +export function processTranscriptEvents(events: TranscriptEvent[]): ProcessedMessage[] { + // Filter out interim transcriptions and function-call-start events + const filteredEvents = events.filter(event => { + if (event.type === 'user-transcription' && !event.final) return false; + if (event.type === 'function-call' && event.status === 'running') return false; + return true; + }); + + const processed: ProcessedMessage[] = []; + let currentBotText: { event: TranscriptEvent; text: string } | null = null; + let pendingTtfb: TranscriptEvent | null = null; + + const flushBotText = () => { + if (!currentBotText) return; + + processed.push(convertToProcessedMessage(currentBotText.event, currentBotText.text)); + + // Add the pending TTFB metric if it exists + if (pendingTtfb) { + processed.push(convertToProcessedMessage(pendingTtfb)); + pendingTtfb = null; + } + + currentBotText = null; + }; + + for (const event of filteredEvents) { + if (event.type === 'ttfb-metric') { + // Store TTFB to associate with the next bot-text or function-call + pendingTtfb = event; + } else if (event.type === 'bot-text') { + // Combine consecutive bot-text from the same turn + if (currentBotText && currentBotText.event.turn === event.turn) { + currentBotText.text = currentBotText.text + ' ' + event.text; + } else { + flushBotText(); + currentBotText = { event, text: event.text }; + } + } else { + // Handle other events (user-transcription, function-call, node-transition) + flushBotText(); + processed.push(convertToProcessedMessage(event)); + + // Add pending TTFB after function calls + if (event.type === 'function-call' && pendingTtfb) { + processed.push(convertToProcessedMessage(pendingTtfb)); + pendingTtfb = null; + } + } + } + + // Flush any remaining bot text + flushBotText(); + + return processed; +} + +/** + * Process live messages - optimized version. + * + * Optimizations rely on useWebSocketRTC.tsx already handling: + * - Bot text accumulation (consecutive chunks combined with spaces) + * - Interim transcription filtering (only final transcriptions kept) + * - Function call status (start events filtered, only completed kept) + * + * This function only needs to: + * - Associate TTFB metrics with the preceding bot-text or function-call + * - Convert to ProcessedMessage format + */ +export function processLiveMessages(messages: TranscriptEvent[]): ProcessedMessage[] { + const processed: ProcessedMessage[] = []; + let pendingTtfb: TranscriptEvent | null = null; + + for (const msg of messages) { + if (msg.type === 'ttfb-metric') { + // Store TTFB to associate with next message + pendingTtfb = msg; + } else { + // Add the message + processed.push(convertToProcessedMessage(msg)); + + // Add pending TTFB after final bot-text or completed function calls + if ((msg.type === 'bot-text' && msg.final) || + (msg.type === 'function-call' && msg.status === 'completed')) { + if (pendingTtfb) { + processed.push(convertToProcessedMessage(pendingTtfb)); + pendingTtfb = null; + } + } + } + } + + return processed; +} + +// Alias for backward compatibility +export const processHistoricalEvents = processTranscriptEvents; + +function convertToProcessedMessage(event: TranscriptEvent, overrideText?: string): ProcessedMessage { + return { + id: `${event.type}-${event.timestamp}`, + type: event.type, + text: overrideText ?? event.text, + final: event.final ?? true, + timestamp: event.timestamp, + functionName: event.functionName, + status: event.status, + nodeName: event.nodeName, + ttfbSeconds: event.ttfbSeconds, + }; +} diff --git a/ui/src/client/types.gen.ts b/ui/src/client/types.gen.ts index 67cd7dd..aee4bea 100644 --- a/ui/src/client/types.gen.ts +++ b/ui/src/client/types.gen.ts @@ -964,6 +964,9 @@ export type WorkflowRunResponseSchema = { [key: string]: unknown; } | null; call_type: CallType; + logs?: { + [key: string]: unknown; + } | null; }; export type WorkflowRunUsageResponse = {