mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
77 lines
2.6 KiB
Python
77 lines
2.6 KiB
Python
from typing import Dict
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends
|
|
from loguru import logger
|
|
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
|
from pipecat.utils.context import set_current_run_id
|
|
from pydantic import BaseModel
|
|
|
|
from api.db.models import UserModel
|
|
from api.services.auth.depends import get_user
|
|
from api.services.pipecat.run_pipeline import run_pipeline_smallwebrtc
|
|
|
|
router = APIRouter(prefix="/pipecat")
|
|
|
|
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
|
ice_servers = ["stun:stun.l.google.com:19302"]
|
|
|
|
|
|
class RTCOfferRequest(BaseModel):
|
|
pc_id: str | None
|
|
sdp: str
|
|
type: str
|
|
workflow_id: int
|
|
workflow_run_id: int
|
|
restart_pc: bool = False
|
|
call_context_vars: dict | None = None
|
|
|
|
|
|
@router.post("/rtc-offer")
|
|
async def offer(
|
|
request: RTCOfferRequest,
|
|
background_tasks: BackgroundTasks,
|
|
user: UserModel = Depends(get_user),
|
|
):
|
|
pc_id = request.pc_id
|
|
|
|
if pc_id and pc_id in pcs_map:
|
|
# Ensure run_id context is available for logs even when reusing an existing PC.
|
|
set_current_run_id(request.workflow_run_id)
|
|
|
|
pipecat_connection = pcs_map[pc_id]
|
|
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
|
|
await pipecat_connection.renegotiate(
|
|
sdp=request.sdp,
|
|
type=request.type,
|
|
restart_pc=request.restart_pc,
|
|
)
|
|
else:
|
|
# Set the run_id *before* creating the SmallWebRTCConnection so that all
|
|
# async tasks and event-handler coroutines spawned inside the
|
|
# constructor inherit the correct context variable value. Otherwise the
|
|
# default ("NA") leaks into the log output produced by those tasks.
|
|
set_current_run_id(request.workflow_run_id)
|
|
|
|
pipecat_connection = SmallWebRTCConnection(ice_servers)
|
|
await pipecat_connection.initialize(sdp=request.sdp, type=request.type)
|
|
|
|
@pipecat_connection.event_handler("closed")
|
|
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
|
|
logger.info(
|
|
f"In pipecat connection closed handler. Popping peer connection pc_id: {webrtc_connection.pc_id} from pcs_map"
|
|
)
|
|
pcs_map.pop(webrtc_connection.pc_id, None)
|
|
|
|
background_tasks.add_task(
|
|
run_pipeline_smallwebrtc,
|
|
pipecat_connection,
|
|
request.workflow_id,
|
|
request.workflow_run_id,
|
|
user.id,
|
|
request.call_context_vars or {},
|
|
)
|
|
|
|
answer = pipecat_connection.get_answer()
|
|
pcs_map[answer["pc_id"]] = pipecat_connection
|
|
|
|
return answer
|