mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
chore: remove old signaling route
This commit is contained in:
parent
ad4cff73c8
commit
e9bc5bd1cc
7 changed files with 1 additions and 432 deletions
|
|
@ -20,7 +20,6 @@ if SENTRY_DSN and (
|
|||
print(f"Sentry initialized in environment: {ENVIRONMENT}")
|
||||
|
||||
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional
|
||||
|
||||
|
|
@ -30,7 +29,6 @@ from fastapi.middleware.cors import CORSMiddleware
|
|||
from loguru import logger
|
||||
|
||||
from api.routes.main import router as main_router
|
||||
from api.routes.rtc_offer import pcs_map
|
||||
from api.services.telephony.worker_event_subscriber import (
|
||||
WorkerEventSubscriber,
|
||||
setup_worker_subscriber,
|
||||
|
|
@ -77,11 +75,6 @@ async def lifespan(app: FastAPI):
|
|||
# Fall back to immediate stop
|
||||
await worker_subscriber.stop()
|
||||
|
||||
# close all dangling pipecat connections
|
||||
coros = [pc.close() for pc in pcs_map.values()]
|
||||
await asyncio.gather(*coros)
|
||||
pcs_map.clear()
|
||||
|
||||
await redis.aclose()
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ from api.routes.organization_usage import router as organization_usage_router
|
|||
from api.routes.public_agent import router as public_agent_router
|
||||
from api.routes.public_embed import router as public_embed_router
|
||||
from api.routes.reports import router as reports_router
|
||||
from api.routes.rtc_offer import router as rtc_offer_router
|
||||
from api.routes.s3_signed_url import router as s3_router
|
||||
from api.routes.service_keys import router as service_keys_router
|
||||
from api.routes.superuser import router as superuser_router
|
||||
|
|
@ -27,7 +26,6 @@ router = APIRouter(
|
|||
)
|
||||
|
||||
router.include_router(telephony_router)
|
||||
router.include_router(rtc_offer_router)
|
||||
router.include_router(superuser_router)
|
||||
router.include_router(workflow_router)
|
||||
router.include_router(user_router)
|
||||
|
|
|
|||
|
|
@ -1,77 +0,0 @@
|
|||
from typing import Dict
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel
|
||||
|
||||
from api.db.models import UserModel
|
||||
from api.services.auth.depends import get_user
|
||||
from api.services.pipecat.run_pipeline import run_pipeline_smallwebrtc
|
||||
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
|
||||
from pipecat.utils.context import set_current_run_id
|
||||
|
||||
router = APIRouter(prefix="/pipecat")
|
||||
|
||||
pcs_map: Dict[str, SmallWebRTCConnection] = {}
|
||||
ice_servers = ["stun:stun.l.google.com:19302"]
|
||||
|
||||
|
||||
class RTCOfferRequest(BaseModel):
|
||||
pc_id: str | None
|
||||
sdp: str
|
||||
type: str
|
||||
workflow_id: int
|
||||
workflow_run_id: int
|
||||
restart_pc: bool = False
|
||||
call_context_vars: dict | None = None
|
||||
|
||||
|
||||
@router.post("/rtc-offer")
|
||||
async def offer(
|
||||
request: RTCOfferRequest,
|
||||
background_tasks: BackgroundTasks,
|
||||
user: UserModel = Depends(get_user),
|
||||
):
|
||||
pc_id = request.pc_id
|
||||
|
||||
if pc_id and pc_id in pcs_map:
|
||||
# Ensure run_id context is available for logs even when reusing an existing PC.
|
||||
set_current_run_id(request.workflow_run_id)
|
||||
|
||||
pipecat_connection = pcs_map[pc_id]
|
||||
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
|
||||
await pipecat_connection.renegotiate(
|
||||
sdp=request.sdp,
|
||||
type=request.type,
|
||||
restart_pc=request.restart_pc,
|
||||
)
|
||||
else:
|
||||
# Set the run_id *before* creating the SmallWebRTCConnection so that all
|
||||
# async tasks and event-handler coroutines spawned inside the
|
||||
# constructor inherit the correct context variable value. Otherwise the
|
||||
# default ("NA") leaks into the log output produced by those tasks.
|
||||
set_current_run_id(request.workflow_run_id)
|
||||
|
||||
pipecat_connection = SmallWebRTCConnection(ice_servers)
|
||||
await pipecat_connection.initialize(sdp=request.sdp, type=request.type)
|
||||
|
||||
@pipecat_connection.event_handler("closed")
|
||||
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
|
||||
logger.info(
|
||||
f"In pipecat connection closed handler. Popping peer connection pc_id: {webrtc_connection.pc_id} from pcs_map"
|
||||
)
|
||||
pcs_map.pop(webrtc_connection.pc_id, None)
|
||||
|
||||
background_tasks.add_task(
|
||||
run_pipeline_smallwebrtc,
|
||||
pipecat_connection,
|
||||
request.workflow_id,
|
||||
request.workflow_run_id,
|
||||
user.id,
|
||||
request.call_context_vars or {},
|
||||
)
|
||||
|
||||
answer = pipecat_connection.get_answer()
|
||||
pcs_map[answer["pc_id"]] = pipecat_connection
|
||||
|
||||
return answer
|
||||
|
|
@ -1,3 +1,2 @@
|
|||
export * from './useDeviceInputs';
|
||||
export * from './useWebRTC';
|
||||
export * from './useWebSocketRTC';
|
||||
|
|
|
|||
|
|
@ -1,287 +0,0 @@
|
|||
import { useRef, useState } from "react";
|
||||
|
||||
import { offerApiV1PipecatRtcOfferPost, validateUserConfigurationsApiV1UserConfigurationsUserValidateGet, validateWorkflowApiV1WorkflowWorkflowIdValidatePost } from "@/client/sdk.gen";
|
||||
import { WorkflowValidationError } from "@/components/flow/types";
|
||||
import logger from '@/lib/logger';
|
||||
|
||||
import { sdpFilterCodec } from "../utils";
|
||||
import { useDeviceInputs } from "./useDeviceInputs";
|
||||
|
||||
interface UseWebRTCProps {
|
||||
workflowId: number;
|
||||
workflowRunId: number;
|
||||
accessToken: string | null;
|
||||
initialContextVariables?: Record<string, string> | null;
|
||||
}
|
||||
|
||||
export const useWebRTC = ({ workflowId, workflowRunId, accessToken, initialContextVariables }: UseWebRTCProps) => {
|
||||
const [connectionStatus, setConnectionStatus] = useState<'idle' | 'connecting' | 'connected' | 'failed'>('idle');
|
||||
const [connectionActive, setConnectionActive] = useState(false);
|
||||
const [isCompleted, setIsCompleted] = useState(false);
|
||||
const [apiKeyModalOpen, setApiKeyModalOpen] = useState(false);
|
||||
const [apiKeyError, setApiKeyError] = useState<string | null>(null);
|
||||
const [workflowConfigModalOpen, setWorkflowConfigModalOpen] = useState(false);
|
||||
const [workflowConfigError, setWorkflowConfigError] = useState<string | null>(null);
|
||||
const [isStarting, setIsStarting] = useState(false);
|
||||
// Use initial context variables directly, no UI for editing
|
||||
const initialContext = initialContextVariables || {};
|
||||
|
||||
const {
|
||||
audioInputs,
|
||||
selectedAudioInput,
|
||||
setSelectedAudioInput,
|
||||
permissionError,
|
||||
setPermissionError
|
||||
} = useDeviceInputs();
|
||||
|
||||
const useStun = true;
|
||||
const useAudio = true;
|
||||
const audioCodec = 'default';
|
||||
|
||||
const audioRef = useRef<HTMLAudioElement>(null);
|
||||
const pcRef = useRef<RTCPeerConnection | null>(null);
|
||||
const timeStartRef = useRef<number | null>(null);
|
||||
|
||||
// Generate a cryptographically secure unique ID
|
||||
const generateSecureId = () => {
|
||||
// Use Web Crypto API to generate random bytes
|
||||
const array = new Uint8Array(16);
|
||||
crypto.getRandomValues(array);
|
||||
// Convert to hex string
|
||||
return 'PC-' + Array.from(array)
|
||||
.map(b => b.toString(16).padStart(2, '0'))
|
||||
.join('');
|
||||
};
|
||||
|
||||
const pc_id = generateSecureId();
|
||||
|
||||
const createPeerConnection = () => {
|
||||
const config: RTCConfiguration = {
|
||||
iceServers: useStun ? [{ urls: ['stun:stun.l.google.com:19302'] }] : []
|
||||
};
|
||||
|
||||
const pc = new RTCPeerConnection(config);
|
||||
|
||||
pc.addEventListener('icegatheringstatechange', () => {
|
||||
logger.info(`ICE gathering state changed in createPeerConnection, ${pc.iceGatheringState}`);
|
||||
});
|
||||
|
||||
pc.addEventListener('iceconnectionstatechange', () => {
|
||||
logger.info(`ICE connection state changed: ${pc.iceConnectionState}`);
|
||||
if (pc.iceConnectionState === 'connected' || pc.iceConnectionState === 'completed') {
|
||||
setConnectionStatus('connected');
|
||||
} else if (pc.iceConnectionState === 'failed' || pc.iceConnectionState === 'disconnected') {
|
||||
setConnectionStatus('failed');
|
||||
}
|
||||
});
|
||||
|
||||
pc.addEventListener('track', (evt) => {
|
||||
if (evt.track.kind === 'audio' && audioRef.current) {
|
||||
audioRef.current.srcObject = evt.streams[0];
|
||||
}
|
||||
});
|
||||
|
||||
pcRef.current = pc;
|
||||
return pc;
|
||||
};
|
||||
|
||||
const negotiate = async () => {
|
||||
const pc = pcRef.current;
|
||||
if (!pc) return;
|
||||
|
||||
try {
|
||||
const offer = await pc.createOffer();
|
||||
await pc.setLocalDescription(offer);
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
if (pc.iceGatheringState === 'complete') {
|
||||
resolve();
|
||||
} else {
|
||||
const checkState = () => {
|
||||
if (pc.iceGatheringState === 'complete') {
|
||||
logger.debug(`ICE gathering is complete in negotiate, ${pc.iceGatheringState}`);
|
||||
pc.removeEventListener('icegatheringstatechange', checkState);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
pc.addEventListener('icegatheringstatechange', checkState);
|
||||
}
|
||||
});
|
||||
|
||||
const localDescription = pc.localDescription;
|
||||
if (!localDescription) return;
|
||||
|
||||
let sdp = localDescription.sdp;
|
||||
|
||||
if (audioCodec !== 'default') {
|
||||
sdp = sdpFilterCodec('audio', audioCodec, sdp);
|
||||
}
|
||||
|
||||
if (!accessToken) return;
|
||||
|
||||
const response = await offerApiV1PipecatRtcOfferPost({
|
||||
headers: {
|
||||
'Authorization': `Bearer ${accessToken}`,
|
||||
},
|
||||
body: {
|
||||
sdp: sdp,
|
||||
type: 'offer',
|
||||
pc_id: pc_id,
|
||||
restart_pc: false,
|
||||
workflow_id: workflowId,
|
||||
workflow_run_id: workflowRunId,
|
||||
call_context_vars: initialContext
|
||||
}
|
||||
});
|
||||
|
||||
if (response && response.data) {
|
||||
const answerSdpText = typeof response.data === 'object' && 'sdp' in response.data
|
||||
? response.data.sdp as string
|
||||
: '';
|
||||
|
||||
await pc.setRemoteDescription({
|
||||
type: 'answer',
|
||||
sdp: answerSdpText
|
||||
});
|
||||
setConnectionActive(true);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(`Negotiation failed: ${e}`);
|
||||
}
|
||||
};
|
||||
|
||||
const start = async () => {
|
||||
if (isStarting || !accessToken) return;
|
||||
setIsStarting(true);
|
||||
setConnectionStatus('connecting');
|
||||
try {
|
||||
const response = await validateUserConfigurationsApiV1UserConfigurationsUserValidateGet({
|
||||
headers: {
|
||||
'Authorization': `Bearer ${accessToken}`,
|
||||
},
|
||||
query: {
|
||||
validity_ttl_seconds: 86400
|
||||
},
|
||||
});
|
||||
if (response.error) {
|
||||
setApiKeyModalOpen(true);
|
||||
let msg = 'API Key Error';
|
||||
const detail = (response.error as unknown as { detail?: { errors: { model: string; message: string }[] } }).detail;
|
||||
if (Array.isArray(detail)) {
|
||||
msg = detail
|
||||
.map((e: { model: string; message: string }) => `${e.model}: ${e.message}`)
|
||||
.join('\n');
|
||||
}
|
||||
setApiKeyError(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
// Then check workflow validation
|
||||
const workflowResponse = await validateWorkflowApiV1WorkflowWorkflowIdValidatePost({
|
||||
path: {
|
||||
workflow_id: workflowId,
|
||||
},
|
||||
headers: {
|
||||
'Authorization': `Bearer ${accessToken}`,
|
||||
},
|
||||
});
|
||||
|
||||
if (workflowResponse.error) {
|
||||
setWorkflowConfigModalOpen(true);
|
||||
let msg = 'Workflow validation failed';
|
||||
const errorDetail = workflowResponse.error as { detail?: { errors: WorkflowValidationError[] } };
|
||||
if (errorDetail?.detail?.errors) {
|
||||
msg = errorDetail.detail.errors
|
||||
.map(err => `${err.kind}: ${err.message}`)
|
||||
.join('\n');
|
||||
}
|
||||
setWorkflowConfigError(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
timeStartRef.current = null;
|
||||
const pc = createPeerConnection();
|
||||
|
||||
const constraints: MediaStreamConstraints = {
|
||||
audio: false,
|
||||
};
|
||||
|
||||
if (useAudio) {
|
||||
const audioConstraints: MediaTrackConstraints = {};
|
||||
if (selectedAudioInput) {
|
||||
audioConstraints.deviceId = { exact: selectedAudioInput };
|
||||
}
|
||||
constraints.audio = Object.keys(audioConstraints).length ? audioConstraints : true;
|
||||
}
|
||||
|
||||
if (constraints.audio) {
|
||||
try {
|
||||
const stream = await navigator.mediaDevices.getUserMedia(constraints);
|
||||
stream.getTracks().forEach((track) => {
|
||||
pc.addTrack(track, stream);
|
||||
});
|
||||
await negotiate();
|
||||
} catch (err) {
|
||||
logger.error(`Could not acquire media: ${err}`);
|
||||
setPermissionError('Could not acquire media');
|
||||
setConnectionStatus('failed');
|
||||
}
|
||||
} else {
|
||||
await negotiate();
|
||||
}
|
||||
} finally {
|
||||
setIsStarting(false);
|
||||
}
|
||||
};
|
||||
|
||||
const stop = () => {
|
||||
setConnectionActive(false);
|
||||
setIsCompleted(true);
|
||||
setConnectionStatus('idle');
|
||||
|
||||
const pc = pcRef.current;
|
||||
if (!pc) return;
|
||||
|
||||
if (pc.getTransceivers) {
|
||||
pc.getTransceivers().forEach((transceiver) => {
|
||||
if (transceiver.stop) {
|
||||
transceiver.stop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pc.getSenders().forEach((sender) => {
|
||||
if (sender.track) {
|
||||
sender.track.stop();
|
||||
}
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
if (pcRef.current) {
|
||||
pcRef.current.close();
|
||||
pcRef.current = null;
|
||||
}
|
||||
}, 500);
|
||||
};
|
||||
|
||||
return {
|
||||
audioRef,
|
||||
audioInputs,
|
||||
selectedAudioInput,
|
||||
setSelectedAudioInput,
|
||||
connectionActive,
|
||||
permissionError,
|
||||
isCompleted,
|
||||
apiKeyModalOpen,
|
||||
setApiKeyModalOpen,
|
||||
apiKeyError,
|
||||
workflowConfigError,
|
||||
workflowConfigModalOpen,
|
||||
setWorkflowConfigModalOpen,
|
||||
connectionStatus,
|
||||
start,
|
||||
stop,
|
||||
isStarting,
|
||||
initialContext
|
||||
};
|
||||
};
|
||||
File diff suppressed because one or more lines are too long
|
|
@ -483,18 +483,6 @@ export type PresignedUploadUrlResponse = {
|
|||
expires_in: number;
|
||||
};
|
||||
|
||||
export type RtcOfferRequest = {
|
||||
pc_id: string | null;
|
||||
sdp: string;
|
||||
type: string;
|
||||
workflow_id: number;
|
||||
workflow_run_id: number;
|
||||
restart_pc?: boolean;
|
||||
call_context_vars?: {
|
||||
[key: string]: unknown;
|
||||
} | null;
|
||||
};
|
||||
|
||||
export type S3SignedUrlResponse = {
|
||||
url: string;
|
||||
expires_in: number;
|
||||
|
|
@ -1260,37 +1248,6 @@ export type HandleInboundFallbackApiV1TelephonyInboundFallbackPostResponses = {
|
|||
200: unknown;
|
||||
};
|
||||
|
||||
export type OfferApiV1PipecatRtcOfferPostData = {
|
||||
body: RtcOfferRequest;
|
||||
headers?: {
|
||||
authorization?: string | null;
|
||||
'X-API-Key'?: string | null;
|
||||
};
|
||||
path?: never;
|
||||
query?: never;
|
||||
url: '/api/v1/pipecat/rtc-offer';
|
||||
};
|
||||
|
||||
export type OfferApiV1PipecatRtcOfferPostErrors = {
|
||||
/**
|
||||
* Not found
|
||||
*/
|
||||
404: unknown;
|
||||
/**
|
||||
* Validation Error
|
||||
*/
|
||||
422: HttpValidationError;
|
||||
};
|
||||
|
||||
export type OfferApiV1PipecatRtcOfferPostError = OfferApiV1PipecatRtcOfferPostErrors[keyof OfferApiV1PipecatRtcOfferPostErrors];
|
||||
|
||||
export type OfferApiV1PipecatRtcOfferPostResponses = {
|
||||
/**
|
||||
* Successful Response
|
||||
*/
|
||||
200: unknown;
|
||||
};
|
||||
|
||||
export type ImpersonateApiV1SuperuserImpersonatePostData = {
|
||||
body: ImpersonateRequest;
|
||||
headers?: {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue