feat: persist split user and bot audio

This commit is contained in:
Abhishek Kumar 2026-06-16 15:19:49 +05:30
parent dd3f2e7323
commit 3d1886c450
30 changed files with 1322 additions and 253 deletions

View file

@ -56,6 +56,7 @@ from api.services.configuration.masking import is_mask_of, mask_key, mask_user_c
from api.services.configuration.registry import (
DOGRAH_STT_LANGUAGES,
REGISTRY,
DograhTTSService,
ServiceProviders,
ServiceType,
)
@ -210,6 +211,13 @@ async def get_telephony_config_warnings(user: UserModel = Depends(get_user)):
# ---------------------------------------------------------------------------
def _dograh_allows_custom_voice() -> bool:
extra = DograhTTSService.model_fields["voice"].json_schema_extra
if isinstance(extra, dict):
return bool(extra.get("allow_custom_input", False))
return False
def _byok_provider_schemas(service_type: ServiceType) -> dict[str, dict]:
return {
provider: model_cls.model_json_schema()
@ -251,6 +259,7 @@ async def get_model_configuration_v2_defaults(
return {
"dograh": {
"voices": [DOGRAH_DEFAULT_VOICE],
"allow_custom_input": _dograh_allows_custom_voice(),
"speeds": list(DOGRAH_SPEED_OPTIONS),
"languages": DOGRAH_STT_LANGUAGES,
"defaults": {

View file

@ -14,6 +14,7 @@ from api.services.auth.depends import get_user, get_user_with_selected_organizat
from api.services.mps_service_key_client import mps_service_key_client
from api.services.reports import generate_usage_runs_report_csv
from api.utils.artifacts import artifact_url
from api.utils.recording_artifacts import has_recording_track
router = APIRouter(prefix="/organizations")
@ -99,8 +100,12 @@ class WorkflowRunUsageResponse(BaseModel):
call_duration_seconds: int
recording_url: Optional[str] = None
transcript_url: Optional[str] = None
user_recording_url: Optional[str] = None
bot_recording_url: Optional[str] = None
recording_public_url: Optional[str] = None
transcript_public_url: Optional[str] = None
user_recording_public_url: Optional[str] = None
bot_recording_public_url: Optional[str] = None
public_access_token: Optional[str] = None
phone_number: Optional[str] = Field(
default=None,
@ -308,14 +313,18 @@ async def get_billing_credits(
aggregation_key=entry.get("aggregation_key"),
usage_event_id=_optional_int(entry.get("usage_event_id")),
workflow_run_id=_optional_int(entry.get("workflow_run_id")),
workflow_id=workflow_ids_by_run_id.get(
_optional_int(entry.get("workflow_run_id"))
)
if entry.get("workflow_run_id") is not None
else None,
billable_quantity=float(entry["billable_quantity"])
if entry.get("billable_quantity") is not None
else None,
workflow_id=(
workflow_ids_by_run_id.get(
_optional_int(entry.get("workflow_run_id"))
)
if entry.get("workflow_run_id") is not None
else None
),
billable_quantity=(
float(entry["billable_quantity"])
if entry.get("billable_quantity") is not None
else None
),
quantity_unit=entry.get("quantity_unit"),
metadata=entry.get("metadata") or {},
created_at=str(entry["created_at"]),
@ -478,6 +487,17 @@ async def get_usage_history(
public_access_token, "transcript"
)
run["recording_public_url"] = artifact_url(public_access_token, "recording")
run["user_recording_public_url"] = (
artifact_url(public_access_token, "user_recording")
if has_recording_track(run.get("extra"), "user")
else None
)
run["bot_recording_public_url"] = (
artifact_url(public_access_token, "bot_recording")
if has_recording_track(run.get("extra"), "bot")
else None
)
run.pop("extra", None)
return {
"runs": runs,

View file

@ -6,14 +6,16 @@ post-call processing for runs that execute integrations, QA, or campaign
reporting.
"""
from typing import Literal
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import RedirectResponse
from loguru import logger
from api.db import db_client
from api.services.storage import get_storage_for_backend
from api.utils.recording_artifacts import (
get_recording_storage_backend,
get_recording_storage_key,
)
router = APIRouter(prefix="/public/download")
@ -21,7 +23,7 @@ router = APIRouter(prefix="/public/download")
@router.get("/workflow/{token}/{artifact_type}")
async def download_workflow_artifact(
token: str,
artifact_type: Literal["recording", "transcript"],
artifact_type: str,
inline: bool = Query(
default=False, description="Display inline in browser instead of download"
),
@ -36,13 +38,15 @@ async def download_workflow_artifact(
Args:
token: The public access token (UUID format)
artifact_type: Type of artifact - "recording" or "transcript"
artifact_type: Type of artifact - "recording", "transcript",
"user_recording", or "bot_recording"
inline: If true, sets Content-Disposition to inline for browser preview
Returns:
RedirectResponse to the signed URL (302 redirect)
Raises:
HTTPException 400: If artifact type is unsupported
HTTPException 404: If token is invalid or artifact not found
"""
# 1. Lookup workflow run by token
@ -52,10 +56,26 @@ async def download_workflow_artifact(
raise HTTPException(status_code=404, detail="Invalid or expired token")
# 2. Get file path based on artifact type
artifact_storage_backend = None
if artifact_type == "recording":
file_path = workflow_run.recording_url
else: # transcript
elif artifact_type == "transcript":
file_path = workflow_run.transcript_url
elif artifact_type == "user_recording":
file_path = get_recording_storage_key(workflow_run.extra, "user")
artifact_storage_backend = get_recording_storage_backend(
workflow_run.extra, "user"
)
elif artifact_type == "bot_recording":
file_path = get_recording_storage_key(workflow_run.extra, "bot")
artifact_storage_backend = get_recording_storage_backend(
workflow_run.extra, "bot"
)
else:
logger.warning(
f"Unsupported artifact type: type={artifact_type}, workflow_run_id={workflow_run.id}"
)
raise HTTPException(status_code=400, detail="Unsupported artifact type")
if not file_path:
logger.warning(
@ -68,7 +88,9 @@ async def download_workflow_artifact(
# 3. Get storage backend for this workflow run
try:
storage = get_storage_for_backend(workflow_run.storage_backend)
storage = get_storage_for_backend(
artifact_storage_backend or workflow_run.storage_backend
)
except ValueError as e:
logger.error(f"Invalid storage backend: {workflow_run.storage_backend}")
raise HTTPException(status_code=500, detail="Storage configuration error")

View file

@ -40,14 +40,22 @@ class PresignedUploadUrlResponse(BaseModel):
router = APIRouter(prefix="/s3", tags=["s3"])
ORG_SCOPED_STORAGE_PREFIXES = ("campaigns", "knowledge_base")
def _extract_org_id_from_key(key: str) -> Optional[int]:
"""Try to extract an organization ID from a storage key.
Matches keys of the form ``{prefix}/{org_id}/...`` where *org_id* is a
positive integer. Returns ``None`` when the pattern does not match.
Matches known org-scoped keys of the form ``{prefix}/{org_id}/...`` where
*org_id* is a positive integer. Returns ``None`` when the pattern does not
match.
"""
parts = key.split("/")
if len(parts) >= 3 and parts[1].isdigit():
if (
len(parts) >= 3
and parts[0] in ORG_SCOPED_STORAGE_PREFIXES
and parts[1].isdigit()
):
return int(parts[1])
return None
@ -58,15 +66,20 @@ def _extract_legacy_workflow_run_id(key: str) -> Optional[int]:
Supports:
- ``transcripts/{run_id}.txt``
- ``recordings/{run_id}.wav``
- ``recordings/{run_id}/user.wav``
- ``recordings/{run_id}/bot.wav``
Returns ``None`` when the key does not match a legacy pattern.
"""
if key.startswith("transcripts/") and key.endswith(".txt"):
run_id_str = key[len("transcripts/") : -4]
elif key.startswith("recordings/") and key.endswith(".wav"):
run_id_str = key[len("recordings/") : -4]
else:
return None
recording_match = re.fullmatch(
r"recordings/(\d+)(?:\.wav|/(?:user|bot)\.wav)", key
)
if not recording_match:
return None
run_id_str = recording_match.group(1)
return int(run_id_str) if run_id_str.isdigit() else None
@ -89,8 +102,13 @@ async def _validate_and_extract_workflow_run_id(
"""
if key.startswith("transcripts/") and key.endswith(".txt"):
run_id_str = key[len("transcripts/") : -4] # strip prefix & suffix
elif key.startswith("recordings/") and key.endswith(".wav"):
run_id_str = key[len("recordings/") : -4]
elif key.startswith("recordings/"):
run_id = _extract_legacy_workflow_run_id(key)
if run_id is None:
raise HTTPException(
status_code=400, detail="Invalid workflow_run_id in key"
)
return run_id
elif allow_special_paths and key.startswith("voicemail_detections/"):
return None # Skip validation for these paths
else:
@ -159,9 +177,9 @@ async def get_signed_url(
"""Return a short-lived signed URL for a file stored on S3 / MinIO.
Access Control:
* Keys that embed an organization ID (``{prefix}/{org_id}/...``) are
authorized by matching the org_id against the requesting user's
organization.
* Known org-scoped keys (for example ``campaigns/{org_id}/...`` and
``knowledge_base/{org_id}/...``) are authorized by matching the org_id
against the requesting user's organization.
* Legacy keys (``recordings/{run_id}.wav``, ``transcripts/{run_id}.txt``)
are authorized via the workflow run they belong to.
* Superusers can request any key.

View file

@ -19,7 +19,7 @@ import ipaddress
import os
from datetime import UTC, datetime
from enum import Enum
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Set
from aiortc import RTCIceServer
from aiortc.sdp import candidate_from_sdp
@ -246,6 +246,74 @@ class SignalingManager:
def __init__(self):
self._connections: Dict[str, WebSocket] = {}
self._peer_connections: Dict[str, SmallWebRTCConnection] = {}
self._connection_peer_ids: Dict[str, Set[str]] = {}
self._peer_connection_owners: Dict[str, str] = {}
def _track_peer_connection(
self, connection_id: str, pc_id: str, pc: SmallWebRTCConnection
) -> None:
self._peer_connections[pc_id] = pc
self._peer_connection_owners[pc_id] = connection_id
self._connection_peer_ids.setdefault(connection_id, set()).add(pc_id)
def _forget_peer_connection(self, pc_id: str) -> Optional[str]:
connection_id = self._peer_connection_owners.pop(pc_id, None)
self._peer_connections.pop(pc_id, None)
if connection_id:
peer_ids = self._connection_peer_ids.get(connection_id)
if peer_ids is not None:
peer_ids.discard(pc_id)
if not peer_ids:
self._connection_peer_ids.pop(connection_id, None)
return connection_id
async def _send_json_if_connected(
self, websocket: WebSocket, message: dict
) -> bool:
if websocket.application_state != WebSocketState.CONNECTED:
return False
try:
await websocket.send_json(message)
return True
except Exception as e:
logger.debug(f"Failed to send signaling WebSocket message: {e}")
return False
async def _close_websocket_if_connected(
self, websocket: WebSocket, code: int = 1000, reason: str = ""
) -> None:
if websocket.application_state != WebSocketState.CONNECTED:
return
try:
await websocket.close(code=code, reason=reason)
except Exception as e:
logger.debug(f"Failed to close signaling WebSocket: {e}")
async def _notify_call_ended_and_close_websocket(
self,
websocket: WebSocket,
workflow_run_id: int,
pc_id: str,
reason: str,
) -> None:
await self._send_json_if_connected(
websocket,
{
"type": "call-ended",
"payload": {
"workflow_run_id": workflow_run_id,
"pc_id": pc_id,
"reason": reason,
},
},
)
await self._close_websocket_if_connected(
websocket, code=1000, reason="call ended"
)
async def handle_websocket(
self,
@ -257,35 +325,51 @@ class SignalingManager:
"""Handle WebSocket connection for signaling."""
await websocket.accept()
connection_id = f"{workflow_id}:{workflow_run_id}:{user.id}"
self._connections[connection_id] = websocket
connection_key = f"{connection_id}:{id(websocket)}"
self._connections[connection_key] = websocket
try:
while True:
message = await websocket.receive_json()
await self._handle_message(
websocket, message, workflow_id, workflow_run_id, user
websocket,
message,
workflow_id,
workflow_run_id,
user,
connection_key,
)
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected for {connection_id}")
except Exception as e:
logger.error(f"WebSocket error for {connection_id}: {e}")
if websocket.application_state == WebSocketState.DISCONNECTED:
logger.info(f"WebSocket disconnected for {connection_id}")
else:
logger.error(f"WebSocket error for {connection_id}: {e}")
finally:
# Cleanup
self._connections.pop(connection_id, None)
self._connections.pop(connection_key, None)
peer_ids = list(self._connection_peer_ids.pop(connection_key, set()))
# Unregister WebSocket sender for real-time feedback
unregister_ws_sender(workflow_run_id)
# Clean up all peer connections for this workflow run
# Clean up peer connections owned by this WebSocket.
# Note: In a WebSocket-based signaling approach (vs HTTP PATCH),
# we maintain our own connection map instead of relying on
# SmallWebRTCRequestHandler's _pcs_map. This is suitable for
# multi-worker FastAPI deployments where state cannot be shared.
for pc_id in list(self._peer_connections.keys()):
for pc_id in peer_ids:
self._peer_connection_owners.pop(pc_id, None)
pc = self._peer_connections.pop(pc_id, None)
if pc:
await pc.disconnect()
logger.debug(f"Disconnected peer connection: {pc_id}")
try:
await pc.disconnect()
logger.debug(f"Disconnected peer connection: {pc_id}")
except Exception as e:
logger.debug(
f"Failed to disconnect peer connection {pc_id}: {e}"
)
async def _handle_message(
self,
@ -294,17 +378,20 @@ class SignalingManager:
workflow_id: int,
workflow_run_id: int,
user: UserModel,
connection_key: str,
):
"""Handle incoming WebSocket messages."""
msg_type = message.get("type")
payload = message.get("payload", {})
if msg_type == "offer":
await self._handle_offer(ws, payload, workflow_id, workflow_run_id, user)
await self._handle_offer(
ws, payload, workflow_id, workflow_run_id, user, connection_key
)
elif msg_type == "ice-candidate":
await self._handle_ice_candidate(ws, payload, workflow_run_id)
await self._handle_ice_candidate(payload, connection_key)
elif msg_type == "renegotiate":
await self._handle_renegotiation(ws, payload, workflow_id, workflow_run_id)
await self._handle_renegotiation(ws, payload, connection_key)
async def _handle_offer(
self,
@ -313,6 +400,7 @@ class SignalingManager:
workflow_id: int,
workflow_run_id: int,
user: UserModel,
connection_key: str,
):
"""Handle offer message and create answer with ICE trickling."""
pc_id = payload.get("pc_id")
@ -320,6 +408,15 @@ class SignalingManager:
type_ = payload.get("type")
call_context_vars = payload.get("call_context_vars", {})
if not pc_id or not sdp or not type_:
await ws.send_json(
{
"type": "error",
"payload": {"message": "Missing offer fields"},
}
)
return
# Set run context for logging and tracing. org_id must be set before
# pc.initialize() so that aiortc's internal tasks inherit it.
set_current_run_id(workflow_run_id)
@ -347,7 +444,16 @@ class SignalingManager:
)
return
if pc_id and pc_id in self._peer_connections:
if pc_id in self._peer_connections:
if self._peer_connection_owners.get(pc_id) != connection_key:
await ws.send_json(
{
"type": "error",
"payload": {"message": "Peer connection already owned"},
}
)
return
# Reuse existing connection
logger.info(f"Reusing existing connection for pc_id: {pc_id}")
pc = self._peer_connections[pc_id]
@ -379,7 +485,7 @@ class SignalingManager:
await pc.initialize(sdp=sdp, type=type_)
# Store peer connection using client's pc_id
self._peer_connections[pc_id] = pc
self._track_peer_connection(connection_key, pc_id, pc)
# Register WebSocket sender for real-time feedback
async def ws_sender(message: dict):
@ -392,7 +498,16 @@ class SignalingManager:
@pc.event_handler("closed")
async def handle_disconnected(webrtc_connection: SmallWebRTCConnection):
logger.info(f"PeerConnection closed: {webrtc_connection.pc_id}")
self._peer_connections.pop(webrtc_connection.pc_id, None)
owner_connection_id = self._forget_peer_connection(
webrtc_connection.pc_id
)
if owner_connection_id == connection_key:
await self._notify_call_ended_and_close_websocket(
ws,
workflow_run_id,
webrtc_connection.pc_id,
reason="peer_connection_closed",
)
# Start pipeline in background
asyncio.create_task(
@ -421,9 +536,7 @@ class SignalingManager:
}
)
async def _handle_ice_candidate(
self, ws: WebSocket, payload: dict, workflow_run_id: int
):
async def _handle_ice_candidate(self, payload: dict, connection_key: str):
"""Handle incoming ICE candidate from client.
Uses SmallWebRTC's native ICE trickling support via add_ice_candidate().
@ -442,6 +555,9 @@ class SignalingManager:
if not pc:
logger.warning(f"No peer connection found for pc_id: {pc_id}")
return
if self._peer_connection_owners.get(pc_id) != connection_key:
logger.warning(f"Ignoring ICE candidate for unowned pc_id: {pc_id}")
return
if candidate_data:
candidate_str = candidate_data.get("candidate", "")
@ -466,7 +582,7 @@ class SignalingManager:
logger.debug(f"End of ICE candidates for pc_id: {pc_id}")
async def _handle_renegotiation(
self, ws: WebSocket, payload: dict, workflow_id: int, workflow_run_id: int
self, ws: WebSocket, payload: dict, connection_key: str
):
"""Handle renegotiation request."""
pc_id = payload.get("pc_id")
@ -479,6 +595,11 @@ class SignalingManager:
{"type": "error", "payload": {"message": "Peer connection not found"}}
)
return
if self._peer_connection_owners.get(pc_id) != connection_key:
await ws.send_json(
{"type": "error", "payload": {"message": "Peer connection not found"}}
)
return
pc = self._peer_connections[pc_id]
await pc.renegotiate(sdp=sdp, type=type_, restart_pc=restart_pc)

View file

@ -60,6 +60,10 @@ from api.services.workflow.trigger_paths import (
)
from api.services.workflow.workflow_graph import WorkflowGraph
from api.utils.artifacts import artifact_url
from api.utils.recording_artifacts import (
get_recording_storage_key,
has_recording_track,
)
router = APIRouter(prefix="/workflow")
@ -1255,7 +1259,16 @@ async def get_workflow_run(
raise HTTPException(status_code=404, detail="Workflow run not found")
public_access_token = run.public_access_token
if (run.transcript_url or run.recording_url) and not public_access_token:
user_recording_url = get_recording_storage_key(run.extra, "user")
bot_recording_url = get_recording_storage_key(run.extra, "bot")
has_user_recording = has_recording_track(run.extra, "user")
has_bot_recording = has_recording_track(run.extra, "bot")
if (
run.transcript_url
or run.recording_url
or has_user_recording
or has_bot_recording
) and not public_access_token:
public_access_token = await db_client.ensure_public_access_token(run.id)
return {
@ -1266,8 +1279,20 @@ async def get_workflow_run(
"is_completed": run.is_completed,
"transcript_url": run.transcript_url,
"recording_url": run.recording_url,
"user_recording_url": user_recording_url,
"bot_recording_url": bot_recording_url,
"transcript_public_url": artifact_url(public_access_token, "transcript"),
"recording_public_url": artifact_url(public_access_token, "recording"),
"user_recording_public_url": (
artifact_url(public_access_token, "user_recording")
if has_user_recording
else None
),
"bot_recording_public_url": (
artifact_url(public_access_token, "bot_recording")
if has_bot_recording
else None
),
"public_access_token": public_access_token,
"cost_info": format_public_cost_info(run.cost_info, run.usage_info),
"usage_info": format_public_usage_info(run.usage_info),