diff --git a/api/services/pipecat/in_memory_buffers.py b/api/services/pipecat/in_memory_buffers.py index 94b6288..5c55aa0 100644 --- a/api/services/pipecat/in_memory_buffers.py +++ b/api/services/pipecat/in_memory_buffers.py @@ -120,9 +120,21 @@ class InMemoryLogsBuffer: f"Incremented turn counter to {self._turn_counter} for workflow {self._workflow_run_id}" ) + @staticmethod + def _event_sort_key(event: dict) -> str: + payload_ts = event.get("payload", {}).get("timestamp") + return payload_ts or event.get("timestamp", "") + + def _sorted_events(self) -> List[dict]: + # Stable sort by the realtime (payload) timestamp when available, falling + # back to the buffer-append timestamp. Python's sort is stable, so events + # sharing a key retain their original insertion order — this keeps + # consecutive bot-text chunks of a single turn contiguous. + return sorted(self._events, key=self._event_sort_key) + def get_events(self) -> List[dict]: - """Get all events for final storage.""" - return self._events + """Get all events for final storage, ordered by realtime timestamp.""" + return self._sorted_events() def contains_user_speech(self) -> bool: """Return True if any final user transcription event has non-empty text.""" @@ -141,7 +153,7 @@ class InMemoryLogsBuffer: Filters for rtf-user-transcription (final) and rtf-bot-text events, formats them as '[timestamp] user/assistant: text\\n'. """ - return _generate_transcript_text(self._events) + return _generate_transcript_text(self._sorted_events()) def write_transcript_to_temp_file(self) -> Optional[str]: """Write transcript to a temporary text file and return the path. diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index b9ff9a3..6584433 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -616,10 +616,15 @@ async def _run_pipeline( llm = create_realtime_llm_service(user_config, audio_config) stt = None tts = None + # Realtime services don't implement run_inference, so create a + # separate text LLM for variable extraction and other out-of-band + # inference calls. + inference_llm = create_llm_service(user_config) else: stt = create_stt_service(user_config, audio_config, keyterms=keyterms) tts = create_tts_service(user_config, audio_config) llm = create_llm_service(user_config) + inference_llm = None workflow_graph = WorkflowGraph(ReactFlowDTO.model_validate(run_workflow_json)) @@ -703,9 +708,15 @@ async def _run_pipeline( context_compaction_enabled = (workflow.workflow_configurations or {}).get( "context_compaction_enabled", False ) + # Context compaction doesn't apply in realtime mode: the speech-to-speech + # service manages its own conversation state server-side. + if is_realtime and context_compaction_enabled: + logger.info("Disabling context_compaction_enabled for realtime workflow run") + context_compaction_enabled = False engine = PipecatEngine( llm=llm, + inference_llm=inference_llm, workflow=workflow_graph, call_context_vars=merged_call_context_vars, workflow_run_id=workflow_run_id, diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index c29f25b..d72270d 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -60,6 +60,7 @@ class PipecatEngine: *, task: Optional[PipelineTask] = None, llm: Optional["LLMService"] = None, + inference_llm: Optional["LLMService"] = None, context: Optional[LLMContext] = None, workflow: WorkflowGraph, call_context_vars: dict, @@ -75,6 +76,12 @@ class PipecatEngine: ): self.task = task self.llm = llm + # LLM used for out-of-band inference (variable extraction, context + # summarization). Falls back to the pipeline LLM when not provided. + # In realtime mode the pipeline LLM is a speech-to-speech service + # that does not implement run_inference, so a separate text LLM + # must be passed in. + self.inference_llm = inference_llm or llm self.context = context self.workflow = workflow self._call_context_vars = call_context_vars diff --git a/api/services/workflow/pipecat_engine_context_summarizer.py b/api/services/workflow/pipecat_engine_context_summarizer.py index 687fe11..1ea9f47 100644 --- a/api/services/workflow/pipecat_engine_context_summarizer.py +++ b/api/services/workflow/pipecat_engine_context_summarizer.py @@ -63,7 +63,7 @@ class ContextSummarizationManager: orphaned tool calls from previous nodes) with a concise summary. """ context = self._engine.context - llm = self._engine.llm + llm = self._engine.inference_llm current_node = self._engine._current_node try: diff --git a/api/services/workflow/pipecat_engine_variable_extractor.py b/api/services/workflow/pipecat_engine_variable_extractor.py index 7fe2e41..53996cd 100644 --- a/api/services/workflow/pipecat_engine_variable_extractor.py +++ b/api/services/workflow/pipecat_engine_variable_extractor.py @@ -203,12 +203,12 @@ class VariableExtractionManager: # current node's system prompt that build_chat_completion_params # would otherwise prepend. # ------------------------------------------------------------------ - llm_response = await self._engine.llm.run_inference( + llm_response = await self._engine.inference_llm.run_inference( extraction_context, system_instruction=system_prompt ) # Get model name for tracing - model_name = getattr(self._engine.llm, "model_name", "unknown") + model_name = getattr(self._engine.inference_llm, "model_name", "unknown") if ensure_tracing(): tracer = trace.get_tracer("pipecat") @@ -221,7 +221,7 @@ class VariableExtractionManager: ] add_llm_span_attributes( span, - service_name=self._engine.llm.__class__.__name__, + service_name=self._engine.inference_llm.__class__.__name__, model=model_name, operation_name="llm-variable-extraction", messages=tracing_messages, diff --git a/docs/configurations/inference-providers.mdx b/docs/configurations/inference-providers.mdx index 6ad2d4a..3fb62ba 100644 --- a/docs/configurations/inference-providers.mdx +++ b/docs/configurations/inference-providers.mdx @@ -73,7 +73,7 @@ For example, if you only want to change the voice for a specific agent: You can also switch an individual agent to use a **Realtime** provider (such as Gemini Live) even if the global configuration uses standard LLM + TTS + STT. Toggle the **Realtime** switch in the Model Overrides tab, then configure the realtime provider, model, and voice. -When an agent uses a Realtime provider, it replaces the separate LLM, TTS, and STT services with a single speech-to-speech model. The individual LLM/TTS/STT override tabs are hidden in this mode. +When an agent uses a Realtime provider, it replaces the separate TTS and STT services with a single speech-to-speech model. An **LLM** is still required alongside the Realtime model — it's used for out-of-band tasks like variable extraction and QA analysis, which the realtime service does not handle. Context compaction is not applicable in Realtime mode and is ignored if enabled. ## Gemini 3.1 Live @@ -119,5 +119,5 @@ To use Gemini 3.1 Live with Dograh, you need a Google Gemini API key. Follow the 6. Select the language (currently `en` is supported). - When using a Realtime provider like Gemini Live, you do not need to configure separate LLM, TTS, and STT services — the realtime model handles all three. + When using a Realtime provider like Gemini Live, you do not need to configure separate TTS and STT services — the realtime model handles speech in and out. However, you **must** still configure an **LLM** under the LLM tab: it powers variable extraction and QA analysis, which the realtime service does not perform. \ No newline at end of file diff --git a/pipecat b/pipecat index 49f1965..edefaad 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 49f1965d652fb5027968dcc24677bc83c5f905ab +Subproject commit edefaad42b97e52a3ad5eef8d15115a5c6ba3b11 diff --git a/ui/.env.example b/ui/.env.example index e6e901e..741790e 100644 --- a/ui/.env.example +++ b/ui/.env.example @@ -1,2 +1,3 @@ BACKEND_URL=http://localhost:8000 +NEXT_PUBLIC_BACKEND_URL=http://localhost:8000 NEXT_PUBLIC_NODE_ENV=development diff --git a/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx b/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx index 9066d57..12f0eea 100644 --- a/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx +++ b/ui/src/app/workflow/[workflowId]/components/WorkflowEditorHeader.tsx @@ -385,7 +385,7 @@ export const WorkflowEditorHeader = ({ {/* GitHub star badge - desktop only */}
- +
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx index c3a0038..cf10081 100644 --- a/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx +++ b/ui/src/app/workflow/[workflowId]/run/[runId]/components/UnifiedTranscript.tsx @@ -84,7 +84,7 @@ export const UnifiedTranscript = ({ } return ( diff --git a/ui/src/app/workflow/[workflowId]/settings/page.tsx b/ui/src/app/workflow/[workflowId]/settings/page.tsx index b2d1f9c..3115a32 100644 --- a/ui/src/app/workflow/[workflowId]/settings/page.tsx +++ b/ui/src/app/workflow/[workflowId]/settings/page.tsx @@ -582,7 +582,7 @@ function GeneralSection({

Context Compaction

- Automatically summarize conversation context when transitioning between nodes. + Automatically summarize conversation context when transitioning between nodes. Not applicable in Realtime mode — the speech-to-speech service manages its own conversation state and this setting is ignored.

diff --git a/ui/src/components/ServiceConfigurationForm.tsx b/ui/src/components/ServiceConfigurationForm.tsx index 9e60d57..ddcf4c4 100644 --- a/ui/src/components/ServiceConfigurationForm.tsx +++ b/ui/src/components/ServiceConfigurationForm.tsx @@ -52,6 +52,7 @@ const STANDARD_TABS: { key: ServiceSegment; label: string }[] = [ const REALTIME_TABS: { key: ServiceSegment; label: string }[] = [ { key: "realtime", label: "Realtime Model" }, + { key: "llm", label: "LLM" }, { key: "embeddings", label: "Embedding" }, ]; @@ -63,6 +64,7 @@ const OVERRIDE_STANDARD_TABS: { key: ServiceSegment; label: string }[] = [ const OVERRIDE_REALTIME_TABS: { key: ServiceSegment; label: string }[] = [ { key: "realtime", label: "Realtime Model" }, + { key: "llm", label: "LLM" }, ]; // Display names for Sarvam voices @@ -407,7 +409,7 @@ export function ServiceConfigurationForm({ if (mode === 'override') { // Build model_overrides for enabled services only const modelOverrides: Record = {}; - const services = isRealtime ? ["realtime"] : ["llm", "tts", "stt"]; + const services = isRealtime ? ["realtime", "llm"] : ["llm", "tts", "stt"]; for (const svc of services) { if (enabledOverrides[svc]) { modelOverrides[svc] = buildServiceConfig(svc as ServiceSegment, data); @@ -758,7 +760,7 @@ export function ServiceConfigurationForm({ Realtime Mode

- Uses a single speech-to-speech model (no separate STT/TTS) + Uses a single speech-to-speech model (no separate STT/TTS). An LLM is still required for variable extraction and QA.