diff --git a/api/routes/workflow.py b/api/routes/workflow.py index 6ce98a1..f8e47eb 100644 --- a/api/routes/workflow.py +++ b/api/routes/workflow.py @@ -611,6 +611,7 @@ async def get_workflow_run( "initial_context": run.initial_context, "gathered_context": run.gathered_context, "call_type": run.call_type, + "logs": run.logs, } diff --git a/api/schemas/workflow.py b/api/schemas/workflow.py index 5696e8b..f5c338c 100644 --- a/api/schemas/workflow.py +++ b/api/schemas/workflow.py @@ -20,3 +20,4 @@ class WorkflowRunResponseSchema(BaseModel): initial_context: dict | None = None gathered_context: dict | None = None call_type: CallType + logs: Dict[str, Any] | None = None diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 6a654ee..489707a 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -4,8 +4,9 @@ from api.db import db_client from api.enums import WorkflowRunState from api.services.campaign.call_dispatcher import campaign_call_dispatcher from api.services.pipecat.audio_config import AudioConfig -from api.services.pipecat.audio_transcript_buffers import ( +from api.services.pipecat.in_memory_buffers import ( InMemoryAudioBuffer, + InMemoryLogsBuffer, InMemoryTranscriptBuffer, ) from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator @@ -46,6 +47,7 @@ def register_transport_event_handlers( num_channels=num_channels, ) in_memory_transcript_buffer = InMemoryTranscriptBuffer(workflow_run_id) + in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id) @transport.event_handler("on_client_connected") async def on_client_connected(transport, participant): @@ -69,7 +71,7 @@ def register_transport_event_handlers( await task.cancel() # Return the buffers so they can be passed to other handlers - return in_memory_audio_buffer, in_memory_transcript_buffer + return in_memory_audio_buffer, in_memory_transcript_buffer, in_memory_logs_buffer def register_task_event_handler( @@ -80,6 +82,7 @@ def register_task_event_handler( audio_buffer: AudioBufferProcessor, in_memory_audio_buffer: InMemoryAudioBuffer, in_memory_transcript_buffer: InMemoryTranscriptBuffer, + in_memory_logs_buffer: InMemoryLogsBuffer, pipeline_metrics_aggregator: PipelineMetricsAggregator, ): @task.event_handler("on_pipeline_started") @@ -185,6 +188,22 @@ def register_task_event_handler( state=WorkflowRunState.COMPLETED.value, ) + # Save real-time feedback logs to workflow run + if not in_memory_logs_buffer.is_empty: + try: + feedback_events = in_memory_logs_buffer.get_events() + await db_client.update_workflow_run( + run_id=workflow_run_id, + logs={"realtime_feedback_events": feedback_events}, + ) + logger.debug( + f"Saved {len(feedback_events)} feedback events to workflow run logs" + ) + except Exception as e: + logger.error(f"Error saving realtime feedback logs: {e}", exc_info=True) + else: + logger.debug("Logs buffer is empty, skipping save") + # Release concurrent slot for campaign calls if workflow_run and workflow_run.campaign_id: await campaign_call_dispatcher.release_call_slot(workflow_run_id) diff --git a/api/services/pipecat/audio_transcript_buffers.py b/api/services/pipecat/in_memory_buffers.py similarity index 76% rename from api/services/pipecat/audio_transcript_buffers.py rename to api/services/pipecat/in_memory_buffers.py index d087ad8..c4274f4 100644 --- a/api/services/pipecat/audio_transcript_buffers.py +++ b/api/services/pipecat/in_memory_buffers.py @@ -2,6 +2,7 @@ import asyncio import re import tempfile import wave +from datetime import UTC, datetime from typing import List from loguru import logger @@ -120,3 +121,41 @@ class InMemoryTranscriptBuffer: if self._USER_SPEECH_RE.match(line): return True return False + + +class InMemoryLogsBuffer: + """Buffer real-time feedback events in memory during a call, then save to workflow run logs.""" + + def __init__(self, workflow_run_id: int): + self._workflow_run_id = workflow_run_id + self._events: List[dict] = [] + self._turn_counter = 0 + + async def append(self, event: dict): + """Append a feedback event to the buffer with timestamp.""" + # Add timestamp and turn tracking + timestamped_event = { + **event, + "timestamp": datetime.now(UTC).isoformat(), + "turn": self._turn_counter, + } + self._events.append(timestamped_event) + logger.trace( + f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}" + ) + + def increment_turn(self): + """Increment turn counter (called on user transcription completion).""" + self._turn_counter += 1 + logger.trace( + f"Incremented turn counter to {self._turn_counter} for workflow {self._workflow_run_id}" + ) + + def get_events(self) -> List[dict]: + """Get all events for final storage.""" + return self._events + + @property + def is_empty(self) -> bool: + """Check if the buffer is empty.""" + return len(self._events) == 0 diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index bf5275d..d9e2155 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -10,10 +10,13 @@ how base_output.py handles timed frames. import asyncio import time -from typing import Awaitable, Callable, Optional, Set +from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Set from loguru import logger +if TYPE_CHECKING: + from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer + from pipecat.frames.frames import ( CancelFrame, EndFrame, @@ -40,14 +43,17 @@ class RealtimeFeedbackObserver(BaseObserver): def __init__( self, ws_sender: Callable[[dict], Awaitable[None]], + logs_buffer: Optional["InMemoryLogsBuffer"] = None, ): """ Args: ws_sender: Async function to send messages over WebSocket. Expected signature: async def send(message: dict) -> None + logs_buffer: Optional InMemoryLogsBuffer to persist events for post-call analysis. """ super().__init__() self._ws_sender = ws_sender + self._logs_buffer = logs_buffer self._frames_seen: Set[str] = set() # Clock/timing for pts-based frames (similar to base_output.py) @@ -167,6 +173,9 @@ class RealtimeFeedbackObserver(BaseObserver): }, } ) + # Increment turn counter on final user transcription + if self._logs_buffer: + self._logs_buffer.increment_turn() # Handle bot TTS text - respect pts timing elif isinstance(frame, TTSTextFrame): message = { @@ -219,9 +228,17 @@ class RealtimeFeedbackObserver(BaseObserver): ) async def _send_message(self, message: dict): - """Send message via WebSocket, handling errors gracefully.""" + """Send message via WebSocket AND append to logs buffer, handling errors gracefully.""" + # Send via WebSocket try: await self._ws_sender(message) except Exception as e: # Log but don't fail - feedback is non-critical logger.debug(f"Failed to send real-time feedback message: {e}") + + # Also append to logs buffer + if self._logs_buffer: + try: + await self._logs_buffer.append(message) + except Exception as e: + logger.error(f"Failed to append to logs buffer: {e}") diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index b385a2c..101af1c 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -566,12 +566,6 @@ async def _run_pipeline( # Create pipeline task with audio configuration task = create_pipeline_task(pipeline, workflow_run_id, audio_config) - # Add real-time feedback observer if WebSocket sender is available - ws_sender = get_ws_sender(workflow_run_id) - if ws_sender: - feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender) - task.add_observer(feedback_observer) - # Now set the task on the engine engine.set_task(task) @@ -579,7 +573,7 @@ async def _run_pipeline( await engine.initialize() # Register event handlers - in_memory_audio_buffer, in_memory_transcript_buffer = ( + in_memory_audio_buffer, in_memory_transcript_buffer, in_memory_logs_buffer = ( register_transport_event_handlers( task, transport, @@ -590,6 +584,14 @@ async def _run_pipeline( ) ) + # Add real-time feedback observer if WebSocket sender is available + ws_sender = get_ws_sender(workflow_run_id) + if ws_sender: + feedback_observer = RealtimeFeedbackObserver( + ws_sender=ws_sender, logs_buffer=in_memory_logs_buffer + ) + task.add_observer(feedback_observer) + register_task_event_handler( workflow_run_id, engine, @@ -598,6 +600,7 @@ async def _run_pipeline( audio_buffer, in_memory_audio_buffer, in_memory_transcript_buffer, + in_memory_logs_buffer, pipeline_metrics_aggregator, ) diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackLogs.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackLogs.tsx new file mode 100644 index 0000000..60508e1 --- /dev/null +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/RealtimeFeedbackLogs.tsx @@ -0,0 +1,166 @@ +'use client'; + +import { CheckCircle, MessageSquare, MicOff,Wrench } from 'lucide-react'; + +import { cn } from '@/lib/utils'; + +interface RealtimeFeedbackEvent { + type: string; + payload: { + text?: string; + final?: boolean; + user_id?: string; + timestamp?: string; + function_name?: string; + tool_call_id?: string; + result?: string; + }; + timestamp: string; + turn: number; +} + +export interface WorkflowRunLogs { + realtime_feedback_events?: RealtimeFeedbackEvent[]; +} + +interface RealtimeFeedbackLogsProps { + logs: WorkflowRunLogs | null; +} + +const EventItem = ({ event }: { event: RealtimeFeedbackEvent }) => { + // Function call message - centered + if (event.type === 'rtf-function-call-start' || event.type === 'rtf-function-call-end') { + return ( +
No conversation recorded
++ Real-time feedback events were not captured for this call +
+