diff --git a/api/routes/workflow_recording.py b/api/routes/workflow_recording.py index e54ea72..d1f62b4 100644 --- a/api/routes/workflow_recording.py +++ b/api/routes/workflow_recording.py @@ -2,9 +2,10 @@ from typing import Annotated, Optional -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile from loguru import logger +from api.constants import DEPLOYMENT_MODE from api.db import db_client from api.db.workflow_recording_client import generate_short_id from api.enums import StorageBackend @@ -16,6 +17,7 @@ from api.schemas.workflow_recording import ( RecordingUploadResponseSchema, ) from api.services.auth.depends import get_user +from api.services.mps_service_key_client import mps_service_key_client from api.services.storage import storage_fs router = APIRouter(prefix="/workflow-recordings", tags=["workflow-recordings"]) @@ -216,3 +218,42 @@ async def delete_recording( raise HTTPException( status_code=500, detail="Failed to delete recording" ) from exc + + +@router.post( + "/transcribe", + summary="Transcribe an audio file", +) +async def transcribe_audio( + file: UploadFile = File(...), + language: str = Form("en"), + user=Depends(get_user), +): + """Transcribe an uploaded audio file using MPS STT.""" + try: + audio_data = await file.read() + + if DEPLOYMENT_MODE == "oss": + result = await mps_service_key_client.transcribe_audio( + audio_data=audio_data, + filename=file.filename or "audio.wav", + content_type=file.content_type or "audio/wav", + language=language, + created_by=str(user.provider_id), + ) + else: + result = await mps_service_key_client.transcribe_audio( + audio_data=audio_data, + filename=file.filename or "audio.wav", + content_type=file.content_type or "audio/wav", + language=language, + organization_id=user.selected_organization_id, + ) + + return result + + except Exception as exc: + logger.error(f"Error transcribing audio: {exc}") + raise HTTPException( + status_code=500, detail="Failed to transcribe audio" + ) from exc diff --git a/api/services/mps_service_key_client.py b/api/services/mps_service_key_client.py index 1281a7c..62452ba 100644 --- a/api/services/mps_service_key_client.py +++ b/api/services/mps_service_key_client.py @@ -351,6 +351,71 @@ class MPSServiceKeyClient: response=response, ) + async def transcribe_audio( + self, + audio_data: bytes, + filename: str = "audio.wav", + content_type: str = "audio/wav", + language: str = "en", + model: str = "default", + correlation_id: Optional[str] = None, + organization_id: Optional[int] = None, + created_by: Optional[str] = None, + ) -> dict: + """ + Transcribe an audio file via MPS STT API. + + Args: + audio_data: Raw audio bytes + filename: Name of the audio file + content_type: MIME type of the audio (e.g., audio/wav, audio/mp3) + language: Language code for transcription (default: "en") + model: Model tier name (default: "default") + correlation_id: Optional correlation ID for tracking + organization_id: Organization ID (for authenticated mode) + created_by: User provider ID (for OSS mode) + + Returns: + Dictionary containing transcription result with keys like + 'transcript', 'duration_seconds', etc. + + Raises: + httpx.HTTPStatusError: If the API call fails + """ + async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: + files = { + "file": (filename, audio_data, content_type), + } + data = { + "language": language, + "model": model, + } + if correlation_id: + data["correlation_id"] = correlation_id + + headers = self._get_headers(organization_id, created_by) + # Remove Content-Type so httpx sets the correct multipart boundary + headers.pop("Content-Type", None) + + response = await client.post( + f"{self.base_url}/api/v1/stt/transcribe", + files=files, + data=data, + headers=headers, + ) + + if response.status_code == 200: + return response.json() + else: + logger.error( + f"Failed to transcribe audio: {response.status_code} - {response.text}" + ) + raise httpx.HTTPStatusError( + f"Failed to transcribe audio: {response.text}", + request=response.request, + response=response, + ) + def validate_service_key(self, service_key: str) -> bool: """ Synchronously validate a Dograh service key by checking usage via MPS. diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index 2229069..511c938 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -165,49 +165,39 @@ class RealtimeFeedbackObserver(BaseObserver): frame = data.frame frame_direction = data.direction - logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}") - - # Handle pipeline termination - stop clock task - if isinstance(frame, (EndFrame, CancelFrame, StopFrame)): - await self._cancel_clock_task() - return - - # Handle interruptions - clear any queued bot text - if isinstance(frame, InterruptionFrame): - await self._handle_interruption() - return - - # Bot speaking state - WS only (ephemeral state signals, not persisted) - if isinstance(frame, BotStartedSpeakingFrame): - await self._send_ws( - {"type": RealtimeFeedbackType.BOT_STARTED_SPEAKING.value, "payload": {}} - ) - return - if isinstance(frame, BotStoppedSpeakingFrame): - await self._send_ws( - {"type": RealtimeFeedbackType.BOT_STOPPED_SPEAKING.value, "payload": {}} - ) - return - - # User mute state - WS only (ephemeral state signals, not persisted) - if isinstance(frame, UserMuteStartedFrame): - await self._send_ws( - {"type": RealtimeFeedbackType.USER_MUTE_STARTED.value, "payload": {}} - ) - return - if isinstance(frame, UserMuteStoppedFrame): - await self._send_ws( - {"type": RealtimeFeedbackType.USER_MUTE_STOPPED.value, "payload": {}} - ) - return - # Skip already processed frames (frames can be observed multiple times) if frame.id in self._frames_seen: return self._frames_seen.add(frame.id) + logger.trace(f"{self} Received Frame: {frame} Direction: {frame_direction}") + + # Handle pipeline termination - stop clock task + if isinstance(frame, (EndFrame, CancelFrame, StopFrame)): + await self._cancel_clock_task() + # Handle interruptions - clear any queued bot text + elif isinstance(frame, InterruptionFrame): + await self._handle_interruption() + # Bot speaking state - WS only (ephemeral state signals, not persisted) + elif isinstance(frame, BotStartedSpeakingFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.BOT_STARTED_SPEAKING.value, "payload": {}} + ) + elif isinstance(frame, BotStoppedSpeakingFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.BOT_STOPPED_SPEAKING.value, "payload": {}} + ) + # User mute state - WS only (ephemeral state signals, not persisted) + elif isinstance(frame, UserMuteStartedFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.USER_MUTE_STARTED.value, "payload": {}} + ) + elif isinstance(frame, UserMuteStoppedFrame): + await self._send_ws( + {"type": RealtimeFeedbackType.USER_MUTE_STOPPED.value, "payload": {}} + ) # Handle user transcriptions (interim) - WebSocket only - if isinstance(frame, InterimTranscriptionFrame): + elif isinstance(frame, InterimTranscriptionFrame): await self._send_ws( { "type": RealtimeFeedbackType.USER_TRANSCRIPTION.value, diff --git a/api/services/workflow/pipecat_engine_context_composer.py b/api/services/workflow/pipecat_engine_context_composer.py index 96b7236..0613cd7 100644 --- a/api/services/workflow/pipecat_engine_context_composer.py +++ b/api/services/workflow/pipecat_engine_context_composer.py @@ -77,11 +77,8 @@ def compose_system_prompt_for_node( parts = [p for p in (global_prompt, formatted_node_prompt) if p] - if has_recordings: + if has_recordings and "RECORDING_ID:" in formatted_node_prompt: parts.append(RECORDING_RESPONSE_MODE_INSTRUCTIONS) - # TODO: Append per-node available recordings list here once - # Node.recording_ids is populated. The list should include - # recording_id and a short description so the LLM can choose. return "\n\n".join(parts) diff --git a/pipecat b/pipecat index 3f566a4..2e2171e 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 3f566a4ba1e112255cc7459735bdb4b716948d59 +Subproject commit 2e2171e2a64ec87b3964fbc2440b5291489912a8 diff --git a/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx b/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx index aef3a79..0e87e2e 100644 --- a/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx +++ b/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx @@ -14,6 +14,7 @@ import type { DocumentResponseSchema, RecordingResponseSchema, ToolResponse } fr import { FlowEdge, FlowNode, NodeType } from "@/components/flow/types"; import { Button } from '@/components/ui/button'; import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip'; +import { useUserConfig } from '@/context/UserConfigContext'; import { WorkflowConfigurations } from '@/types/workflow-configurations'; import AddNodePanel from "../../../components/flow/AddNodePanel"; @@ -64,6 +65,11 @@ interface RenderWorkflowProps { } function RenderWorkflow({ initialWorkflowName, workflowId, initialFlow, initialTemplateContextVariables, initialWorkflowConfigurations, user }: RenderWorkflowProps) { + const { userConfig } = useUserConfig(); + const ttsProvider = (userConfig?.tts?.provider as string) ?? ""; + const ttsModel = (userConfig?.tts?.model as string) ?? ""; + const ttsVoiceId = (userConfig?.tts?.voice as string) ?? ""; + const [isContextVarsDialogOpen, setIsContextVarsDialogOpen] = useState(false); const [isConfigurationsDialogOpen, setIsConfigurationsDialogOpen] = useState(false); const [isDictionaryDialogOpen, setIsDictionaryDialogOpen] = useState(false); @@ -125,10 +131,15 @@ function RenderWorkflow({ initialWorkflowName, workflowId, initialFlow, initialT setTools(toolsResponse.data); } - // Fetch recordings for this workflow + // Fetch recordings for this workflow filtered by active TTS config try { const recordingsResponse = await listRecordingsApiV1WorkflowRecordingsGet({ - query: { workflow_id: workflowId }, + query: { + workflow_id: workflowId, + tts_provider: ttsProvider || undefined, + tts_model: ttsModel || undefined, + tts_voice_id: ttsVoiceId || undefined, + }, }); if (recordingsResponse.data) { setRecordings(recordingsResponse.data.recordings); @@ -142,7 +153,7 @@ function RenderWorkflow({ initialWorkflowName, workflowId, initialFlow, initialT }; fetchData(); - }, [workflowId]); + }, [workflowId, ttsProvider, ttsModel, ttsVoiceId]); // Memoize defaultEdgeOptions to prevent unnecessary re-renders const defaultEdgeOptions = useMemo(() => ({ diff --git a/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx b/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx index 5ee48d9..b917f2b 100644 --- a/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx +++ b/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx @@ -1,4 +1,4 @@ -import { Loader2, Trash2Icon, Upload } from "lucide-react"; +import { Loader2, Mic, Square, Trash2Icon, Upload } from "lucide-react"; import { useCallback, useEffect, useRef, useState } from "react"; import { @@ -6,6 +6,7 @@ import { deleteRecordingApiV1WorkflowRecordingsRecordingIdDelete, getUploadUrlApiV1WorkflowRecordingsUploadUrlPost, listRecordingsApiV1WorkflowRecordingsGet, + transcribeAudioApiV1WorkflowRecordingsTranscribePost, } from "@/client"; import type { RecordingResponseSchema } from "@/client/types.gen"; import { Button } from "@/components/ui/button"; @@ -18,6 +19,15 @@ import { } from "@/components/ui/dialog"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Textarea } from "@/components/ui/textarea"; +import { LANGUAGE_DISPLAY_NAMES } from "@/constants/languages"; import { useUserConfig } from "@/context/UserConfigContext"; interface RecordingsDialogProps { @@ -29,6 +39,8 @@ interface RecordingsDialogProps { const MAX_FILE_SIZE = 5 * 1024 * 1024; // 5MB +type RecordingStep = "idle" | "naming" | "recording" | "transcribing"; + export const RecordingsDialog = ({ open, onOpenChange, @@ -42,7 +54,16 @@ export const RecordingsDialog = ({ const [transcript, setTranscript] = useState(""); const [selectedFile, setSelectedFile] = useState(null); const [error, setError] = useState(null); + const [language, setLanguage] = useState("multi"); + const [recordingStep, setRecordingStep] = useState("idle"); + const [recordingFilename, setRecordingFilename] = useState(""); + const [recordingDuration, setRecordingDuration] = useState(0); + const mediaRecorderRef = useRef(null); + const audioChunksRef = useRef([]); + const recordingTimerRef = useRef | null>(null); const fileInputRef = useRef(null); + const languageRef = useRef(language); + languageRef.current = language; const ttsProvider = (userConfig?.tts?.provider as string) ?? ""; const ttsModel = (userConfig?.tts?.model as string) ?? ""; @@ -70,14 +91,119 @@ export const RecordingsDialog = ({ } }, [workflowId, ttsProvider, ttsModel, ttsVoiceId, onRecordingsChange]); + const stopRecordingTimer = useCallback(() => { + if (recordingTimerRef.current) { + clearInterval(recordingTimerRef.current); + recordingTimerRef.current = null; + } + }, []); + + const stopRecording = useCallback(() => { + if (mediaRecorderRef.current && mediaRecorderRef.current.state !== "inactive") { + mediaRecorderRef.current.stop(); + } + }, []); + + const resetRecordingState = useCallback(() => { + setRecordingStep("idle"); + setRecordingFilename(""); + setRecordingDuration(0); + }, []); + useEffect(() => { if (open) { fetchRecordings(); setError(null); setTranscript(""); setSelectedFile(null); + setLanguage("multi"); + resetRecordingState(); } - }, [open, fetchRecordings]); + }, [open, fetchRecordings, resetRecordingState]); + + useEffect(() => { + if (!open) { + stopRecording(); + stopRecordingTimer(); + } + }, [open, stopRecording, stopRecordingTimer]); + + const transcribeFile = async (file: File) => { + setRecordingStep("transcribing"); + try { + const currentLang = languageRef.current; + const result = await transcribeAudioApiV1WorkflowRecordingsTranscribePost({ + body: { file, language: currentLang }, + }); + const data = result.data as Record | undefined; + if (data?.transcript) { + setTranscript(data.transcript as string); + } + } catch { + // Transcription failed — user can still type manually + setError("Auto-transcription failed. You can type the transcript manually."); + } finally { + setRecordingStep("idle"); + } + }; + + const startRecording = async () => { + try { + const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); + const mediaRecorder = new MediaRecorder(stream); + mediaRecorderRef.current = mediaRecorder; + audioChunksRef.current = []; + + mediaRecorder.ondataavailable = (e) => { + if (e.data.size > 0) audioChunksRef.current.push(e.data); + }; + + const filename = recordingFilename.trim() || "recording"; + mediaRecorder.onstop = () => { + stream.getTracks().forEach((t) => t.stop()); + stopRecordingTimer(); + + const blob = new Blob(audioChunksRef.current, { type: mediaRecorder.mimeType }); + if (blob.size > MAX_FILE_SIZE) { + setError(`Recording (${(blob.size / (1024 * 1024)).toFixed(1)}MB) exceeds the maximum allowed size of 5MB.`); + resetRecordingState(); + return; + } + const ext = mediaRecorder.mimeType.includes("webm") ? "webm" : "mp4"; + const file = new File([blob], `${filename}.${ext}`, { type: mediaRecorder.mimeType }); + setSelectedFile(file); + setError(null); + transcribeFile(file); + }; + + mediaRecorder.start(); + setRecordingStep("recording"); + setRecordingDuration(0); + setError(null); + recordingTimerRef.current = setInterval(() => { + setRecordingDuration((d) => d + 1); + }, 1000); + } catch { + setError("Microphone access denied. Please allow microphone permissions."); + resetRecordingState(); + } + }; + + const handleStopRecording = () => { + stopRecording(); + }; + + const handleFileSelect = (file: File | null) => { + if (file && file.size > MAX_FILE_SIZE) { + setError(`File size (${(file.size / (1024 * 1024)).toFixed(1)}MB) exceeds the maximum allowed size of 5MB.`); + setSelectedFile(null); + if (fileInputRef.current) fileInputRef.current.value = ""; + return; + } + setError(null); + setSelectedFile(file); + if (file) transcribeFile(file); + }; const handleUpload = async () => { if (!selectedFile || !transcript.trim()) return; @@ -137,6 +263,7 @@ export const RecordingsDialog = ({ original_filename: selectedFile.name, file_size_bytes: selectedFile.size, mime_type: selectedFile.type, + language, }, }, }); @@ -144,6 +271,8 @@ export const RecordingsDialog = ({ // Reset form and refresh list setTranscript(""); setSelectedFile(null); + setLanguage("multi"); + resetRecordingState(); if (fileInputRef.current) fileInputRef.current.value = ""; await fetchRecordings(); } catch (err) { @@ -166,13 +295,17 @@ export const RecordingsDialog = ({ } }; + const isRecording = recordingStep === "recording"; + const isTranscribing = recordingStep === "transcribing"; + const isBusy = uploading || isRecording || isTranscribing; + return ( Workflow Recordings - Upload audio recordings for hybrid prompts. Recordings are + Upload or record audio for hybrid prompts. Recordings are scoped to your current TTS configuration. Use{" "} @ in prompt fields to insert them. @@ -211,61 +344,158 @@ export const RecordingsDialog = ({ {/* Upload Section */}
- + + + {/* Audio source: file picker or record */}
- { - const file = e.target.files?.[0] ?? null; - if (file && file.size > MAX_FILE_SIZE) { - setError( - `File size (${(file.size / (1024 * 1024)).toFixed(1)}MB) exceeds the maximum allowed size of 5MB.` - ); - setSelectedFile(null); - if (fileInputRef.current) fileInputRef.current.value = ""; - return; - } - setError(null); - setSelectedFile(file); - }} - className="hidden" - /> - + {recordingStep === "idle" && ( + )} - +
+ + {/* Recording: filename + start/stop */} + {(recordingStep === "naming" || isRecording) && ( +
+ {recordingStep === "naming" && ( + <> +
+ + setRecordingFilename(e.target.value)} + autoFocus + /> +
+
+ + +
+ + )} + {isRecording && ( +
+ + + + + + {Math.floor(recordingDuration / 60)}:{(recordingDuration % 60).toString().padStart(2, "0")} + + {recordingFilename} + +
+ )} +
+ )} + + {/* Transcribing progress */} + {isTranscribing && ( +
+ + Transcribing audio... +
+ )} + + {/* Language */} +
+ + +
+ + {/* Transcript */}
- setTranscript(e.target.value)} + disabled={isTranscribing} + rows={3} + className="resize-none text-sm" />
+ + + + + + + + +
+
Use Agent Builder
+
AI generates a workflow from your description
+
+
+ + +
+
Blank Canvas
+
Start from scratch with an empty workflow
+
+
+
+
); } diff --git a/ui/src/constants/languages.ts b/ui/src/constants/languages.ts new file mode 100644 index 0000000..4967914 --- /dev/null +++ b/ui/src/constants/languages.ts @@ -0,0 +1,98 @@ +// Display names for language codes (Deepgram + Sarvam) +export const LANGUAGE_DISPLAY_NAMES: Record = { + "multi": "Multilingual (Auto-detect)", + // Arabic + "ar": "Arabic", + "ar-AE": "Arabic (UAE)", + "ar-SA": "Arabic (Saudi Arabia)", + "ar-QA": "Arabic (Qatar)", + "ar-KW": "Arabic (Kuwait)", + "ar-SY": "Arabic (Syria)", + "ar-LB": "Arabic (Lebanon)", + "ar-PS": "Arabic (Palestine)", + "ar-JO": "Arabic (Jordan)", + "ar-EG": "Arabic (Egypt)", + "ar-SD": "Arabic (Sudan)", + "ar-TD": "Arabic (Chad)", + "ar-MA": "Arabic (Morocco)", + "ar-DZ": "Arabic (Algeria)", + "ar-TN": "Arabic (Tunisia)", + "ar-IQ": "Arabic (Iraq)", + "ar-IR": "Arabic (Iran)", + // Other languages + "be": "Belarusian", + "bn": "Bengali", + "bs": "Bosnian", + "bg": "Bulgarian", + "ca": "Catalan", + "cs": "Czech", + "da": "Danish", + "da-DK": "Danish (Denmark)", + "de": "German", + "de-CH": "German (Switzerland)", + "el": "Greek", + "en": "English", + "en-US": "English (US)", + "en-AU": "English (Australia)", + "en-GB": "English (UK)", + "en-IN": "English (India)", + "en-NZ": "English (New Zealand)", + "es": "Spanish", + "es-419": "Spanish (Latin America)", + "et": "Estonian", + "fa": "Persian", + "fi": "Finnish", + "fr": "French", + "fr-CA": "French (Canada)", + "he": "Hebrew", + "hi": "Hindi", + "hr": "Croatian", + "hu": "Hungarian", + "id": "Indonesian", + "it": "Italian", + "ja": "Japanese", + "kn": "Kannada", + "ko": "Korean", + "ko-KR": "Korean (South Korea)", + "lt": "Lithuanian", + "lv": "Latvian", + "mk": "Macedonian", + "mr": "Marathi", + "ms": "Malay", + "nl": "Dutch", + "nl-BE": "Flemish", + "no": "Norwegian", + "pl": "Polish", + "pt": "Portuguese", + "pt-BR": "Portuguese (Brazil)", + "pt-PT": "Portuguese (Portugal)", + "ro": "Romanian", + "ru": "Russian", + "sk": "Slovak", + "sl": "Slovenian", + "sr": "Serbian", + "sv": "Swedish", + "sv-SE": "Swedish (Sweden)", + "ta": "Tamil", + "te": "Telugu", + "th": "Thai", + "tl": "Tagalog", + "tr": "Turkish", + "uk": "Ukrainian", + "ur": "Urdu", + "vi": "Vietnamese", + "zh-CN": "Chinese (Simplified)", + "zh-TW": "Chinese (Traditional)", + // Sarvam Indian languages + "bn-IN": "Bengali", + "gu-IN": "Gujarati", + "hi-IN": "Hindi", + "kn-IN": "Kannada", + "ml-IN": "Malayalam", + "mr-IN": "Marathi", + "od-IN": "Odia", + "pa-IN": "Punjabi", + "ta-IN": "Tamil", + "te-IN": "Telugu", + "as-IN": "Assamese", +};