feat: add rtf in logs

This commit is contained in:
Abhishek Kumar 2026-01-14 18:35:18 +05:30
parent a172db8022
commit d25f898a8f
10 changed files with 284 additions and 23 deletions

View file

@ -611,6 +611,7 @@ async def get_workflow_run(
"initial_context": run.initial_context,
"gathered_context": run.gathered_context,
"call_type": run.call_type,
"logs": run.logs,
}

View file

@ -20,3 +20,4 @@ class WorkflowRunResponseSchema(BaseModel):
initial_context: dict | None = None
gathered_context: dict | None = None
call_type: CallType
logs: Dict[str, Any] | None = None

View file

@ -4,8 +4,9 @@ from api.db import db_client
from api.enums import WorkflowRunState
from api.services.campaign.call_dispatcher import campaign_call_dispatcher
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_transcript_buffers import (
from api.services.pipecat.in_memory_buffers import (
InMemoryAudioBuffer,
InMemoryLogsBuffer,
InMemoryTranscriptBuffer,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
@ -46,6 +47,7 @@ def register_transport_event_handlers(
num_channels=num_channels,
)
in_memory_transcript_buffer = InMemoryTranscriptBuffer(workflow_run_id)
in_memory_logs_buffer = InMemoryLogsBuffer(workflow_run_id)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, participant):
@ -69,7 +71,7 @@ def register_transport_event_handlers(
await task.cancel()
# Return the buffers so they can be passed to other handlers
return in_memory_audio_buffer, in_memory_transcript_buffer
return in_memory_audio_buffer, in_memory_transcript_buffer, in_memory_logs_buffer
def register_task_event_handler(
@ -80,6 +82,7 @@ def register_task_event_handler(
audio_buffer: AudioBufferProcessor,
in_memory_audio_buffer: InMemoryAudioBuffer,
in_memory_transcript_buffer: InMemoryTranscriptBuffer,
in_memory_logs_buffer: InMemoryLogsBuffer,
pipeline_metrics_aggregator: PipelineMetricsAggregator,
):
@task.event_handler("on_pipeline_started")
@ -185,6 +188,22 @@ def register_task_event_handler(
state=WorkflowRunState.COMPLETED.value,
)
# Save real-time feedback logs to workflow run
if not in_memory_logs_buffer.is_empty:
try:
feedback_events = in_memory_logs_buffer.get_events()
await db_client.update_workflow_run(
run_id=workflow_run_id,
logs={"realtime_feedback_events": feedback_events},
)
logger.debug(
f"Saved {len(feedback_events)} feedback events to workflow run logs"
)
except Exception as e:
logger.error(f"Error saving realtime feedback logs: {e}", exc_info=True)
else:
logger.debug("Logs buffer is empty, skipping save")
# Release concurrent slot for campaign calls
if workflow_run and workflow_run.campaign_id:
await campaign_call_dispatcher.release_call_slot(workflow_run_id)

View file

@ -2,6 +2,7 @@ import asyncio
import re
import tempfile
import wave
from datetime import UTC, datetime
from typing import List
from loguru import logger
@ -120,3 +121,41 @@ class InMemoryTranscriptBuffer:
if self._USER_SPEECH_RE.match(line):
return True
return False
class InMemoryLogsBuffer:
"""Buffer real-time feedback events in memory during a call, then save to workflow run logs."""
def __init__(self, workflow_run_id: int):
self._workflow_run_id = workflow_run_id
self._events: List[dict] = []
self._turn_counter = 0
async def append(self, event: dict):
"""Append a feedback event to the buffer with timestamp."""
# Add timestamp and turn tracking
timestamped_event = {
**event,
"timestamp": datetime.now(UTC).isoformat(),
"turn": self._turn_counter,
}
self._events.append(timestamped_event)
logger.trace(
f"Appended event {event.get('type')} to logs buffer for workflow {self._workflow_run_id}"
)
def increment_turn(self):
"""Increment turn counter (called on user transcription completion)."""
self._turn_counter += 1
logger.trace(
f"Incremented turn counter to {self._turn_counter} for workflow {self._workflow_run_id}"
)
def get_events(self) -> List[dict]:
"""Get all events for final storage."""
return self._events
@property
def is_empty(self) -> bool:
"""Check if the buffer is empty."""
return len(self._events) == 0

View file

@ -10,10 +10,13 @@ how base_output.py handles timed frames.
import asyncio
import time
from typing import Awaitable, Callable, Optional, Set
from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Set
from loguru import logger
if TYPE_CHECKING:
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
@ -40,14 +43,17 @@ class RealtimeFeedbackObserver(BaseObserver):
def __init__(
self,
ws_sender: Callable[[dict], Awaitable[None]],
logs_buffer: Optional["InMemoryLogsBuffer"] = None,
):
"""
Args:
ws_sender: Async function to send messages over WebSocket.
Expected signature: async def send(message: dict) -> None
logs_buffer: Optional InMemoryLogsBuffer to persist events for post-call analysis.
"""
super().__init__()
self._ws_sender = ws_sender
self._logs_buffer = logs_buffer
self._frames_seen: Set[str] = set()
# Clock/timing for pts-based frames (similar to base_output.py)
@ -167,6 +173,9 @@ class RealtimeFeedbackObserver(BaseObserver):
},
}
)
# Increment turn counter on final user transcription
if self._logs_buffer:
self._logs_buffer.increment_turn()
# Handle bot TTS text - respect pts timing
elif isinstance(frame, TTSTextFrame):
message = {
@ -219,9 +228,17 @@ class RealtimeFeedbackObserver(BaseObserver):
)
async def _send_message(self, message: dict):
"""Send message via WebSocket, handling errors gracefully."""
"""Send message via WebSocket AND append to logs buffer, handling errors gracefully."""
# Send via WebSocket
try:
await self._ws_sender(message)
except Exception as e:
# Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send real-time feedback message: {e}")
# Also append to logs buffer
if self._logs_buffer:
try:
await self._logs_buffer.append(message)
except Exception as e:
logger.error(f"Failed to append to logs buffer: {e}")

View file

@ -566,12 +566,6 @@ async def _run_pipeline(
# Create pipeline task with audio configuration
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
# Add real-time feedback observer if WebSocket sender is available
ws_sender = get_ws_sender(workflow_run_id)
if ws_sender:
feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender)
task.add_observer(feedback_observer)
# Now set the task on the engine
engine.set_task(task)
@ -579,7 +573,7 @@ async def _run_pipeline(
await engine.initialize()
# Register event handlers
in_memory_audio_buffer, in_memory_transcript_buffer = (
in_memory_audio_buffer, in_memory_transcript_buffer, in_memory_logs_buffer = (
register_transport_event_handlers(
task,
transport,
@ -590,6 +584,14 @@ async def _run_pipeline(
)
)
# Add real-time feedback observer if WebSocket sender is available
ws_sender = get_ws_sender(workflow_run_id)
if ws_sender:
feedback_observer = RealtimeFeedbackObserver(
ws_sender=ws_sender, logs_buffer=in_memory_logs_buffer
)
task.add_observer(feedback_observer)
register_task_event_handler(
workflow_run_id,
engine,
@ -598,6 +600,7 @@ async def _run_pipeline(
audio_buffer,
in_memory_audio_buffer,
in_memory_transcript_buffer,
in_memory_logs_buffer,
pipeline_metrics_aggregator,
)