mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-19 08:28:10 +02:00
Show real time feedbacks
This commit is contained in:
parent
e9bc5bd1cc
commit
a849031457
8 changed files with 598 additions and 37 deletions
|
|
@ -18,11 +18,16 @@ from aiortc import RTCIceServer
|
|||
from aiortc.sdp import candidate_from_sdp
|
||||
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
|
||||
from loguru import logger
|
||||
from starlette.websockets import WebSocketState
|
||||
|
||||
from api.db import db_client
|
||||
from api.db.models import UserModel
|
||||
from api.services.auth.depends import get_user_ws
|
||||
from api.services.pipecat.run_pipeline import run_pipeline_smallwebrtc
|
||||
from api.services.pipecat.ws_sender_registry import (
|
||||
register_ws_sender,
|
||||
unregister_ws_sender,
|
||||
)
|
||||
from api.services.quota_service import check_dograh_quota
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
|
|
@ -92,6 +97,9 @@ class SignalingManager:
|
|||
# Cleanup
|
||||
self._connections.pop(connection_id, None)
|
||||
|
||||
# Unregister WebSocket sender for real-time feedback
|
||||
unregister_ws_sender(workflow_run_id)
|
||||
|
||||
# Clean up all peer connections for this workflow run
|
||||
# Note: In a WebSocket-based signaling approach (vs HTTP PATCH),
|
||||
# we maintain our own connection map instead of relying on
|
||||
|
|
@ -182,6 +190,13 @@ class SignalingManager:
|
|||
# Store peer connection using client's pc_id
|
||||
self._peer_connections[pc_id] = pc
|
||||
|
||||
# Register WebSocket sender for real-time feedback
|
||||
async def ws_sender(message: dict):
|
||||
if ws.application_state == WebSocketState.CONNECTED:
|
||||
await ws.send_json(message)
|
||||
|
||||
register_ws_sender(workflow_run_id, ws_sender)
|
||||
|
||||
# Setup closed handler
|
||||
@pc.event_handler("closed")
|
||||
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
|
||||
|
|
|
|||
227
api/services/pipecat/realtime_feedback_observer.py
Normal file
227
api/services/pipecat/realtime_feedback_observer.py
Normal file
|
|
@ -0,0 +1,227 @@
|
|||
"""Real-time feedback observer for sending pipeline events to the frontend.
|
||||
|
||||
This observer watches pipeline frames and sends relevant events (transcriptions,
|
||||
bot text) over WebSocket to provide real-time feedback in the UI.
|
||||
|
||||
For frames with presentation timestamps (pts), like TTSTextFrame, we respect
|
||||
the timing by queuing them and sending at the appropriate time, similar to
|
||||
how base_output.py handles timed frames.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Awaitable, Callable, Optional, Set
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
FunctionCallInProgressFrame,
|
||||
FunctionCallResultFrame,
|
||||
InterimTranscriptionFrame,
|
||||
InterruptionFrame,
|
||||
StopFrame,
|
||||
TranscriptionFrame,
|
||||
TTSTextFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.utils.time import nanoseconds_to_seconds
|
||||
|
||||
|
||||
class RealtimeFeedbackObserver(BaseObserver):
|
||||
"""Observer that sends real-time transcription and bot response events via WebSocket.
|
||||
|
||||
For frames with pts (presentation timestamp), we queue them and send at the
|
||||
appropriate time to sync with audio playback.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ws_sender: Callable[[dict], Awaitable[None]],
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
ws_sender: Async function to send messages over WebSocket.
|
||||
Expected signature: async def send(message: dict) -> None
|
||||
"""
|
||||
super().__init__()
|
||||
self._ws_sender = ws_sender
|
||||
self._frames_seen: Set[str] = set()
|
||||
|
||||
# Clock/timing for pts-based frames (similar to base_output.py)
|
||||
self._clock_queue: Optional[asyncio.PriorityQueue] = None
|
||||
self._clock_task: Optional[asyncio.Task] = None
|
||||
self._clock_start_time: Optional[float] = (
|
||||
None # Wall clock time when we started
|
||||
)
|
||||
self._pts_start_time: Optional[int] = None # First pts value we saw
|
||||
|
||||
async def _ensure_clock_task(self):
|
||||
"""Create the clock task if it doesn't exist."""
|
||||
if self._clock_queue is None:
|
||||
self._clock_queue = asyncio.PriorityQueue()
|
||||
self._clock_task = asyncio.create_task(self._clock_task_handler())
|
||||
|
||||
async def _cancel_clock_task(self):
|
||||
"""Cancel the clock task and clear the queue.
|
||||
|
||||
Called on interruption to discard any pending bot text that
|
||||
hasn't been sent yet.
|
||||
"""
|
||||
if self._clock_task:
|
||||
self._clock_task.cancel()
|
||||
try:
|
||||
await self._clock_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._clock_task = None
|
||||
self._clock_queue = None
|
||||
# Reset timing references so next bot response starts fresh
|
||||
self._clock_start_time = None
|
||||
self._pts_start_time = None
|
||||
|
||||
async def _handle_interruption(self):
|
||||
"""Handle interruption by clearing queued bot text.
|
||||
|
||||
Similar to base_output.py's handle_interruptions, we cancel the
|
||||
clock task and recreate it to discard pending frames.
|
||||
"""
|
||||
await self._cancel_clock_task()
|
||||
|
||||
async def _clock_task_handler(self):
|
||||
"""Process timed frames from the queue, respecting their presentation timestamps.
|
||||
|
||||
Similar to base_output.py's _clock_task_handler, we wait until the
|
||||
frame's pts time has arrived before sending.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
pts, _frame_id, message = await self._clock_queue.get()
|
||||
|
||||
# Calculate when to send based on pts relative to our start time
|
||||
if (
|
||||
self._clock_start_time is not None
|
||||
and self._pts_start_time is not None
|
||||
):
|
||||
# Target time = start wall time + (frame pts - start pts) in seconds
|
||||
target_time = self._clock_start_time + nanoseconds_to_seconds(
|
||||
pts - self._pts_start_time
|
||||
)
|
||||
current_time = time.time()
|
||||
if target_time > current_time:
|
||||
await asyncio.sleep(target_time - current_time)
|
||||
|
||||
# Send the message
|
||||
await self._send_message(message)
|
||||
self._clock_queue.task_done()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.debug(f"Clock task error: {e}")
|
||||
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Process frames and send relevant ones to the client."""
|
||||
frame = data.frame
|
||||
frame_direction = data.direction
|
||||
|
||||
# Handle pipeline termination - stop clock task
|
||||
if isinstance(frame, (EndFrame, CancelFrame, StopFrame)):
|
||||
await self._cancel_clock_task()
|
||||
return
|
||||
|
||||
# Handle interruptions - clear any queued bot text
|
||||
if isinstance(frame, InterruptionFrame):
|
||||
await self._handle_interruption()
|
||||
return
|
||||
|
||||
# Skip already processed frames (frames can be observed multiple times)
|
||||
if frame.id in self._frames_seen:
|
||||
return
|
||||
self._frames_seen.add(frame.id)
|
||||
|
||||
# Handle user transcriptions (interim)
|
||||
if isinstance(frame, InterimTranscriptionFrame):
|
||||
await self._send_message(
|
||||
{
|
||||
"type": "rtf-user-transcription",
|
||||
"payload": {
|
||||
"text": frame.text,
|
||||
"final": False,
|
||||
"user_id": frame.user_id,
|
||||
"timestamp": frame.timestamp,
|
||||
},
|
||||
}
|
||||
)
|
||||
# Handle user transcriptions (final)
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self._send_message(
|
||||
{
|
||||
"type": "rtf-user-transcription",
|
||||
"payload": {
|
||||
"text": frame.text,
|
||||
"final": True,
|
||||
"user_id": frame.user_id,
|
||||
"timestamp": frame.timestamp,
|
||||
},
|
||||
}
|
||||
)
|
||||
# Handle bot TTS text - respect pts timing
|
||||
elif isinstance(frame, TTSTextFrame):
|
||||
message = {
|
||||
"type": "rtf-bot-text",
|
||||
"payload": {
|
||||
"text": frame.text,
|
||||
},
|
||||
}
|
||||
|
||||
# If frame has pts, queue it for timed delivery
|
||||
if frame.pts:
|
||||
# Initialize timing reference on first pts frame
|
||||
if self._pts_start_time is None:
|
||||
self._pts_start_time = frame.pts
|
||||
self._clock_start_time = time.time()
|
||||
|
||||
await self._ensure_clock_task()
|
||||
await self._clock_queue.put((frame.pts, frame.id, message))
|
||||
else:
|
||||
# No pts, send immediately
|
||||
await self._send_message(message)
|
||||
# Handle function call in progress
|
||||
elif (
|
||||
isinstance(frame, FunctionCallInProgressFrame)
|
||||
and frame_direction == FrameDirection.DOWNSTREAM
|
||||
):
|
||||
await self._send_message(
|
||||
{
|
||||
"type": "rtf-function-call-start",
|
||||
"payload": {
|
||||
"function_name": frame.function_name,
|
||||
"tool_call_id": frame.tool_call_id,
|
||||
},
|
||||
}
|
||||
)
|
||||
# Handle function call result
|
||||
elif (
|
||||
isinstance(frame, FunctionCallResultFrame)
|
||||
and frame_direction == FrameDirection.DOWNSTREAM
|
||||
):
|
||||
await self._send_message(
|
||||
{
|
||||
"type": "rtf-function-call-end",
|
||||
"payload": {
|
||||
"function_name": frame.function_name,
|
||||
"tool_call_id": frame.tool_call_id,
|
||||
"result": str(frame.result) if frame.result else None,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
async def _send_message(self, message: dict):
|
||||
"""Send message via WebSocket, handling errors gracefully."""
|
||||
try:
|
||||
await self._ws_sender(message)
|
||||
except Exception as e:
|
||||
# Log but don't fail - feedback is non-critical
|
||||
logger.debug(f"Failed to send real-time feedback message: {e}")
|
||||
|
|
@ -23,6 +23,7 @@ from api.services.pipecat.pipeline_engine_callbacks_processor import (
|
|||
PipelineEngineCallbacksProcessor,
|
||||
)
|
||||
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
|
||||
from api.services.pipecat.realtime_feedback_observer import RealtimeFeedbackObserver
|
||||
from api.services.pipecat.service_factory import (
|
||||
create_llm_service,
|
||||
create_stt_service,
|
||||
|
|
@ -38,6 +39,7 @@ from api.services.pipecat.transport_setup import (
|
|||
create_vonage_transport,
|
||||
create_webrtc_transport,
|
||||
)
|
||||
from api.services.pipecat.ws_sender_registry import get_ws_sender
|
||||
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
|
||||
from api.services.workflow.dto import ReactFlowDTO
|
||||
from api.services.workflow.pipecat_engine import PipecatEngine
|
||||
|
|
@ -564,6 +566,12 @@ async def _run_pipeline(
|
|||
# Create pipeline task with audio configuration
|
||||
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
|
||||
|
||||
# Add real-time feedback observer if WebSocket sender is available
|
||||
ws_sender = get_ws_sender(workflow_run_id)
|
||||
if ws_sender:
|
||||
feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender)
|
||||
task.add_observer(feedback_observer)
|
||||
|
||||
# Now set the task on the engine
|
||||
engine.set_task(task)
|
||||
|
||||
|
|
|
|||
28
api/services/pipecat/ws_sender_registry.py
Normal file
28
api/services/pipecat/ws_sender_registry.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
"""Registry to store WebSocket senders by workflow_run_id.
|
||||
|
||||
This allows the pipeline observer to send messages back through
|
||||
the signaling WebSocket without passing the WebSocket directly.
|
||||
"""
|
||||
|
||||
from typing import Awaitable, Callable, Dict, Optional
|
||||
|
||||
_ws_senders: Dict[int, Callable[[dict], Awaitable[None]]] = {}
|
||||
|
||||
|
||||
def register_ws_sender(
|
||||
workflow_run_id: int, sender: Callable[[dict], Awaitable[None]]
|
||||
) -> None:
|
||||
"""Register a WebSocket sender for a workflow run."""
|
||||
_ws_senders[workflow_run_id] = sender
|
||||
|
||||
|
||||
def unregister_ws_sender(workflow_run_id: int) -> None:
|
||||
"""Unregister a WebSocket sender for a workflow run."""
|
||||
_ws_senders.pop(workflow_run_id, None)
|
||||
|
||||
|
||||
def get_ws_sender(
|
||||
workflow_run_id: int,
|
||||
) -> Optional[Callable[[dict], Awaitable[None]]]:
|
||||
"""Get the WebSocket sender for a workflow run."""
|
||||
return _ws_senders.get(workflow_run_id)
|
||||
Loading…
Add table
Add a link
Reference in a new issue