feat: add transcript panel during live call for better visibility (#116)

* chore: remove old signaling route

* Show real time feedbacks
This commit is contained in:
Abhishek 2026-01-13 22:48:18 +05:30 committed by GitHub
parent ad4cff73c8
commit e7712474c1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 599 additions and 469 deletions

View file

@ -20,7 +20,6 @@ if SENTRY_DSN and (
print(f"Sentry initialized in environment: {ENVIRONMENT}")
import asyncio
from contextlib import asynccontextmanager
from typing import Optional
@ -30,7 +29,6 @@ from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
from api.routes.main import router as main_router
from api.routes.rtc_offer import pcs_map
from api.services.telephony.worker_event_subscriber import (
WorkerEventSubscriber,
setup_worker_subscriber,
@ -77,11 +75,6 @@ async def lifespan(app: FastAPI):
# Fall back to immediate stop
await worker_subscriber.stop()
# close all dangling pipecat connections
coros = [pc.close() for pc in pcs_map.values()]
await asyncio.gather(*coros)
pcs_map.clear()
await redis.aclose()

View file

@ -10,7 +10,6 @@ from api.routes.organization_usage import router as organization_usage_router
from api.routes.public_agent import router as public_agent_router
from api.routes.public_embed import router as public_embed_router
from api.routes.reports import router as reports_router
from api.routes.rtc_offer import router as rtc_offer_router
from api.routes.s3_signed_url import router as s3_router
from api.routes.service_keys import router as service_keys_router
from api.routes.superuser import router as superuser_router
@ -27,7 +26,6 @@ router = APIRouter(
)
router.include_router(telephony_router)
router.include_router(rtc_offer_router)
router.include_router(superuser_router)
router.include_router(workflow_router)
router.include_router(user_router)

View file

@ -1,77 +0,0 @@
from typing import Dict
from fastapi import APIRouter, BackgroundTasks, Depends
from loguru import logger
from pydantic import BaseModel
from api.db.models import UserModel
from api.services.auth.depends import get_user
from api.services.pipecat.run_pipeline import run_pipeline_smallwebrtc
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.utils.context import set_current_run_id
router = APIRouter(prefix="/pipecat")
pcs_map: Dict[str, SmallWebRTCConnection] = {}
ice_servers = ["stun:stun.l.google.com:19302"]
class RTCOfferRequest(BaseModel):
pc_id: str | None
sdp: str
type: str
workflow_id: int
workflow_run_id: int
restart_pc: bool = False
call_context_vars: dict | None = None
@router.post("/rtc-offer")
async def offer(
request: RTCOfferRequest,
background_tasks: BackgroundTasks,
user: UserModel = Depends(get_user),
):
pc_id = request.pc_id
if pc_id and pc_id in pcs_map:
# Ensure run_id context is available for logs even when reusing an existing PC.
set_current_run_id(request.workflow_run_id)
pipecat_connection = pcs_map[pc_id]
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
await pipecat_connection.renegotiate(
sdp=request.sdp,
type=request.type,
restart_pc=request.restart_pc,
)
else:
# Set the run_id *before* creating the SmallWebRTCConnection so that all
# async tasks and event-handler coroutines spawned inside the
# constructor inherit the correct context variable value. Otherwise the
# default ("NA") leaks into the log output produced by those tasks.
set_current_run_id(request.workflow_run_id)
pipecat_connection = SmallWebRTCConnection(ice_servers)
await pipecat_connection.initialize(sdp=request.sdp, type=request.type)
@pipecat_connection.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(
f"In pipecat connection closed handler. Popping peer connection pc_id: {webrtc_connection.pc_id} from pcs_map"
)
pcs_map.pop(webrtc_connection.pc_id, None)
background_tasks.add_task(
run_pipeline_smallwebrtc,
pipecat_connection,
request.workflow_id,
request.workflow_run_id,
user.id,
request.call_context_vars or {},
)
answer = pipecat_connection.get_answer()
pcs_map[answer["pc_id"]] = pipecat_connection
return answer

View file

@ -18,11 +18,16 @@ from aiortc import RTCIceServer
from aiortc.sdp import candidate_from_sdp
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from loguru import logger
from starlette.websockets import WebSocketState
from api.db import db_client
from api.db.models import UserModel
from api.services.auth.depends import get_user_ws
from api.services.pipecat.run_pipeline import run_pipeline_smallwebrtc
from api.services.pipecat.ws_sender_registry import (
register_ws_sender,
unregister_ws_sender,
)
from api.services.quota_service import check_dograh_quota
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.utils.context import set_current_run_id
@ -92,6 +97,9 @@ class SignalingManager:
# Cleanup
self._connections.pop(connection_id, None)
# Unregister WebSocket sender for real-time feedback
unregister_ws_sender(workflow_run_id)
# Clean up all peer connections for this workflow run
# Note: In a WebSocket-based signaling approach (vs HTTP PATCH),
# we maintain our own connection map instead of relying on
@ -182,6 +190,13 @@ class SignalingManager:
# Store peer connection using client's pc_id
self._peer_connections[pc_id] = pc
# Register WebSocket sender for real-time feedback
async def ws_sender(message: dict):
if ws.application_state == WebSocketState.CONNECTED:
await ws.send_json(message)
register_ws_sender(workflow_run_id, ws_sender)
# Setup closed handler
@pc.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):

View file

@ -0,0 +1,227 @@
"""Real-time feedback observer for sending pipeline events to the frontend.
This observer watches pipeline frames and sends relevant events (transcriptions,
bot text) over WebSocket to provide real-time feedback in the UI.
For frames with presentation timestamps (pts), like TTSTextFrame, we respect
the timing by queuing them and sending at the appropriate time, similar to
how base_output.py handles timed frames.
"""
import asyncio
import time
from typing import Awaitable, Callable, Optional, Set
from loguru import logger
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
FunctionCallInProgressFrame,
FunctionCallResultFrame,
InterimTranscriptionFrame,
InterruptionFrame,
StopFrame,
TranscriptionFrame,
TTSTextFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.time import nanoseconds_to_seconds
class RealtimeFeedbackObserver(BaseObserver):
"""Observer that sends real-time transcription and bot response events via WebSocket.
For frames with pts (presentation timestamp), we queue them and send at the
appropriate time to sync with audio playback.
"""
def __init__(
self,
ws_sender: Callable[[dict], Awaitable[None]],
):
"""
Args:
ws_sender: Async function to send messages over WebSocket.
Expected signature: async def send(message: dict) -> None
"""
super().__init__()
self._ws_sender = ws_sender
self._frames_seen: Set[str] = set()
# Clock/timing for pts-based frames (similar to base_output.py)
self._clock_queue: Optional[asyncio.PriorityQueue] = None
self._clock_task: Optional[asyncio.Task] = None
self._clock_start_time: Optional[float] = (
None # Wall clock time when we started
)
self._pts_start_time: Optional[int] = None # First pts value we saw
async def _ensure_clock_task(self):
"""Create the clock task if it doesn't exist."""
if self._clock_queue is None:
self._clock_queue = asyncio.PriorityQueue()
self._clock_task = asyncio.create_task(self._clock_task_handler())
async def _cancel_clock_task(self):
"""Cancel the clock task and clear the queue.
Called on interruption to discard any pending bot text that
hasn't been sent yet.
"""
if self._clock_task:
self._clock_task.cancel()
try:
await self._clock_task
except asyncio.CancelledError:
pass
self._clock_task = None
self._clock_queue = None
# Reset timing references so next bot response starts fresh
self._clock_start_time = None
self._pts_start_time = None
async def _handle_interruption(self):
"""Handle interruption by clearing queued bot text.
Similar to base_output.py's handle_interruptions, we cancel the
clock task and recreate it to discard pending frames.
"""
await self._cancel_clock_task()
async def _clock_task_handler(self):
"""Process timed frames from the queue, respecting their presentation timestamps.
Similar to base_output.py's _clock_task_handler, we wait until the
frame's pts time has arrived before sending.
"""
while True:
try:
pts, _frame_id, message = await self._clock_queue.get()
# Calculate when to send based on pts relative to our start time
if (
self._clock_start_time is not None
and self._pts_start_time is not None
):
# Target time = start wall time + (frame pts - start pts) in seconds
target_time = self._clock_start_time + nanoseconds_to_seconds(
pts - self._pts_start_time
)
current_time = time.time()
if target_time > current_time:
await asyncio.sleep(target_time - current_time)
# Send the message
await self._send_message(message)
self._clock_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.debug(f"Clock task error: {e}")
async def on_push_frame(self, data: FramePushed):
"""Process frames and send relevant ones to the client."""
frame = data.frame
frame_direction = data.direction
# Handle pipeline termination - stop clock task
if isinstance(frame, (EndFrame, CancelFrame, StopFrame)):
await self._cancel_clock_task()
return
# Handle interruptions - clear any queued bot text
if isinstance(frame, InterruptionFrame):
await self._handle_interruption()
return
# Skip already processed frames (frames can be observed multiple times)
if frame.id in self._frames_seen:
return
self._frames_seen.add(frame.id)
# Handle user transcriptions (interim)
if isinstance(frame, InterimTranscriptionFrame):
await self._send_message(
{
"type": "rtf-user-transcription",
"payload": {
"text": frame.text,
"final": False,
"user_id": frame.user_id,
"timestamp": frame.timestamp,
},
}
)
# Handle user transcriptions (final)
elif isinstance(frame, TranscriptionFrame):
await self._send_message(
{
"type": "rtf-user-transcription",
"payload": {
"text": frame.text,
"final": True,
"user_id": frame.user_id,
"timestamp": frame.timestamp,
},
}
)
# Handle bot TTS text - respect pts timing
elif isinstance(frame, TTSTextFrame):
message = {
"type": "rtf-bot-text",
"payload": {
"text": frame.text,
},
}
# If frame has pts, queue it for timed delivery
if frame.pts:
# Initialize timing reference on first pts frame
if self._pts_start_time is None:
self._pts_start_time = frame.pts
self._clock_start_time = time.time()
await self._ensure_clock_task()
await self._clock_queue.put((frame.pts, frame.id, message))
else:
# No pts, send immediately
await self._send_message(message)
# Handle function call in progress
elif (
isinstance(frame, FunctionCallInProgressFrame)
and frame_direction == FrameDirection.DOWNSTREAM
):
await self._send_message(
{
"type": "rtf-function-call-start",
"payload": {
"function_name": frame.function_name,
"tool_call_id": frame.tool_call_id,
},
}
)
# Handle function call result
elif (
isinstance(frame, FunctionCallResultFrame)
and frame_direction == FrameDirection.DOWNSTREAM
):
await self._send_message(
{
"type": "rtf-function-call-end",
"payload": {
"function_name": frame.function_name,
"tool_call_id": frame.tool_call_id,
"result": str(frame.result) if frame.result else None,
},
}
)
async def _send_message(self, message: dict):
"""Send message via WebSocket, handling errors gracefully."""
try:
await self._ws_sender(message)
except Exception as e:
# Log but don't fail - feedback is non-critical
logger.debug(f"Failed to send real-time feedback message: {e}")

View file

@ -23,6 +23,7 @@ from api.services.pipecat.pipeline_engine_callbacks_processor import (
PipelineEngineCallbacksProcessor,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.realtime_feedback_observer import RealtimeFeedbackObserver
from api.services.pipecat.service_factory import (
create_llm_service,
create_stt_service,
@ -38,6 +39,7 @@ from api.services.pipecat.transport_setup import (
create_vonage_transport,
create_webrtc_transport,
)
from api.services.pipecat.ws_sender_registry import get_ws_sender
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
@ -564,6 +566,12 @@ async def _run_pipeline(
# Create pipeline task with audio configuration
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
# Add real-time feedback observer if WebSocket sender is available
ws_sender = get_ws_sender(workflow_run_id)
if ws_sender:
feedback_observer = RealtimeFeedbackObserver(ws_sender=ws_sender)
task.add_observer(feedback_observer)
# Now set the task on the engine
engine.set_task(task)

View file

@ -0,0 +1,28 @@
"""Registry to store WebSocket senders by workflow_run_id.
This allows the pipeline observer to send messages back through
the signaling WebSocket without passing the WebSocket directly.
"""
from typing import Awaitable, Callable, Dict, Optional
_ws_senders: Dict[int, Callable[[dict], Awaitable[None]]] = {}
def register_ws_sender(
workflow_run_id: int, sender: Callable[[dict], Awaitable[None]]
) -> None:
"""Register a WebSocket sender for a workflow run."""
_ws_senders[workflow_run_id] = sender
def unregister_ws_sender(workflow_run_id: int) -> None:
"""Unregister a WebSocket sender for a workflow run."""
_ws_senders.pop(workflow_run_id, None)
def get_ws_sender(
workflow_run_id: int,
) -> Optional[Callable[[dict], Awaitable[None]]]:
"""Get the WebSocket sender for a workflow run."""
return _ws_senders.get(workflow_run_id)