feat: allow recording audio in workflow builder

This commit is contained in:
Abhishek Kumar 2026-03-25 15:01:39 +05:30
parent ac0731a374
commit 2fa4191d9b
22 changed files with 700 additions and 246 deletions

View file

@ -2,9 +2,10 @@
from typing import Annotated, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile
from loguru import logger
from api.constants import DEPLOYMENT_MODE
from api.db import db_client
from api.db.workflow_recording_client import generate_short_id
from api.enums import StorageBackend
@ -16,6 +17,7 @@ from api.schemas.workflow_recording import (
RecordingUploadResponseSchema,
)
from api.services.auth.depends import get_user
from api.services.mps_service_key_client import mps_service_key_client
from api.services.storage import storage_fs
router = APIRouter(prefix="/workflow-recordings", tags=["workflow-recordings"])
@ -216,3 +218,42 @@ async def delete_recording(
raise HTTPException(
status_code=500, detail="Failed to delete recording"
) from exc
@router.post(
"/transcribe",
summary="Transcribe an audio file",
)
async def transcribe_audio(
file: UploadFile = File(...),
language: str = Form("en"),
user=Depends(get_user),
):
"""Transcribe an uploaded audio file using MPS STT."""
try:
audio_data = await file.read()
if DEPLOYMENT_MODE == "oss":
result = await mps_service_key_client.transcribe_audio(
audio_data=audio_data,
filename=file.filename or "audio.wav",
content_type=file.content_type or "audio/wav",
language=language,
created_by=str(user.provider_id),
)
else:
result = await mps_service_key_client.transcribe_audio(
audio_data=audio_data,
filename=file.filename or "audio.wav",
content_type=file.content_type or "audio/wav",
language=language,
organization_id=user.selected_organization_id,
)
return result
except Exception as exc:
logger.error(f"Error transcribing audio: {exc}")
raise HTTPException(
status_code=500, detail="Failed to transcribe audio"
) from exc

View file

@ -351,6 +351,71 @@ class MPSServiceKeyClient:
response=response,
)
async def transcribe_audio(
self,
audio_data: bytes,
filename: str = "audio.wav",
content_type: str = "audio/wav",
language: str = "en",
model: str = "default",
correlation_id: Optional[str] = None,
organization_id: Optional[int] = None,
created_by: Optional[str] = None,
) -> dict:
"""
Transcribe an audio file via MPS STT API.
Args:
audio_data: Raw audio bytes
filename: Name of the audio file
content_type: MIME type of the audio (e.g., audio/wav, audio/mp3)
language: Language code for transcription (default: "en")
model: Model tier name (default: "default")
correlation_id: Optional correlation ID for tracking
organization_id: Organization ID (for authenticated mode)
created_by: User provider ID (for OSS mode)
Returns:
Dictionary containing transcription result with keys like
'transcript', 'duration_seconds', etc.
Raises:
httpx.HTTPStatusError: If the API call fails
"""
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
files = {
"file": (filename, audio_data, content_type),
}
data = {
"language": language,
"model": model,
}
if correlation_id:
data["correlation_id"] = correlation_id
headers = self._get_headers(organization_id, created_by)
# Remove Content-Type so httpx sets the correct multipart boundary
headers.pop("Content-Type", None)
response = await client.post(
f"{self.base_url}/api/v1/stt/transcribe",
files=files,
data=data,
headers=headers,
)
if response.status_code == 200:
return response.json()
else:
logger.error(
f"Failed to transcribe audio: {response.status_code} - {response.text}"
)
raise httpx.HTTPStatusError(
f"Failed to transcribe audio: {response.text}",
request=response.request,
response=response,
)
def validate_service_key(self, service_key: str) -> bool:
"""
Synchronously validate a Dograh service key by checking usage via MPS.

View file

@ -165,49 +165,39 @@ class RealtimeFeedbackObserver(BaseObserver):
frame = data.frame
frame_direction = data.direction
logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}")
# Handle pipeline termination - stop clock task
if isinstance(frame, (EndFrame, CancelFrame, StopFrame)):
await self._cancel_clock_task()
return
# Handle interruptions - clear any queued bot text
if isinstance(frame, InterruptionFrame):
await self._handle_interruption()
return
# Bot speaking state - WS only (ephemeral state signals, not persisted)
if isinstance(frame, BotStartedSpeakingFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.BOT_STARTED_SPEAKING.value, "payload": {}}
)
return
if isinstance(frame, BotStoppedSpeakingFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.BOT_STOPPED_SPEAKING.value, "payload": {}}
)
return
# User mute state - WS only (ephemeral state signals, not persisted)
if isinstance(frame, UserMuteStartedFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.USER_MUTE_STARTED.value, "payload": {}}
)
return
if isinstance(frame, UserMuteStoppedFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.USER_MUTE_STOPPED.value, "payload": {}}
)
return
# Skip already processed frames (frames can be observed multiple times)
if frame.id in self._frames_seen:
return
self._frames_seen.add(frame.id)
logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}")
# Handle pipeline termination - stop clock task
if isinstance(frame, (EndFrame, CancelFrame, StopFrame)):
await self._cancel_clock_task()
# Handle interruptions - clear any queued bot text
elif isinstance(frame, InterruptionFrame):
await self._handle_interruption()
# Bot speaking state - WS only (ephemeral state signals, not persisted)
elif isinstance(frame, BotStartedSpeakingFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.BOT_STARTED_SPEAKING.value, "payload": {}}
)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.BOT_STOPPED_SPEAKING.value, "payload": {}}
)
# User mute state - WS only (ephemeral state signals, not persisted)
elif isinstance(frame, UserMuteStartedFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.USER_MUTE_STARTED.value, "payload": {}}
)
elif isinstance(frame, UserMuteStoppedFrame):
await self._send_ws(
{"type": RealtimeFeedbackType.USER_MUTE_STOPPED.value, "payload": {}}
)
# Handle user transcriptions (interim) - WebSocket only
if isinstance(frame, InterimTranscriptionFrame):
elif isinstance(frame, InterimTranscriptionFrame):
await self._send_ws(
{
"type": RealtimeFeedbackType.USER_TRANSCRIPTION.value,

View file

@ -77,11 +77,8 @@ def compose_system_prompt_for_node(
parts = [p for p in (global_prompt, formatted_node_prompt) if p]
if has_recordings:
if has_recordings and "RECORDING_ID:" in formatted_node_prompt:
parts.append(RECORDING_RESPONSE_MODE_INSTRUCTIONS)
# TODO: Append per-node available recordings list here once
# Node.recording_ids is populated. The list should include
# recording_id and a short description so the LLM can choose.
return "\n\n".join(parts)