From 344c8220de103d89a4a81dad8c33daecf396e910 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Thu, 18 Jun 2026 12:17:36 +0530 Subject: [PATCH] chore: bump pipecat submodule to 1.4.0 --- api/services/workflow/text_chat_runner.py | 22 +++++++----- api/services/workflow_run_billing.py | 6 ++++ api/tests/test_dograh_managed_correlation.py | 36 ++++++++++++++++++++ pipecat | 2 +- 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/api/services/workflow/text_chat_runner.py b/api/services/workflow/text_chat_runner.py index 6cc4615a..10b5329d 100644 --- a/api/services/workflow/text_chat_runner.py +++ b/api/services/workflow/text_chat_runner.py @@ -17,7 +17,6 @@ from pipecat.frames.frames import ( LLMContextFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, - TextFrame, TTSSpeakFrame, TTSStoppedFrame, ) @@ -167,12 +166,17 @@ class _TaskQueueProxy: class _TextChatCaptureProcessor(FrameProcessor): - def __init__(self, response_window: _ResponseWindowState) -> None: + def __init__( + self, + response_window: _ResponseWindowState, + context: LLMContext, + ) -> None: super().__init__() self.last_activity_at = time.monotonic() self.activity_count = 0 self.events: list[dict[str, Any]] = [] self._response_window = response_window + self._context = context def _touch(self) -> None: self.last_activity_at = time.monotonic() @@ -192,12 +196,14 @@ class _TextChatCaptureProcessor(FrameProcessor): self._touch() if isinstance(frame, TTSSpeakFrame): - text_frame = TextFrame(frame.text) - text_frame.append_to_context = ( + append_to_context = ( frame.append_to_context if frame.append_to_context is not None else True ) - await self.push_frame(text_frame, direction) - await self.push_frame(LLMAssistantPushAggregationFrame(), direction) + text = frame.text.strip() + if text: + self._response_window.outputs.append(text) + if append_to_context: + self._context.add_message({"role": "assistant", "content": text}) return if isinstance(frame, LLMContextFrame) and direction == FrameDirection.UPSTREAM: @@ -456,10 +462,10 @@ async def execute_text_chat_pending_turn( ) base_checkpoint = _resolve_checkpoint_for_pending_turn(session_data, checkpoint) - response_window = _ResponseWindowState() - capture_processor = _TextChatCaptureProcessor(response_window) context = LLMContext() context.set_messages(base_checkpoint["messages"]) + response_window = _ResponseWindowState() + capture_processor = _TextChatCaptureProcessor(response_window, context) node_transition_events = capture_processor.events diff --git a/api/services/workflow_run_billing.py b/api/services/workflow_run_billing.py index 2c61dc8b..18ef328d 100644 --- a/api/services/workflow_run_billing.py +++ b/api/services/workflow_run_billing.py @@ -52,6 +52,9 @@ async def report_workflow_run_platform_usage(workflow_run) -> None: return if not getattr(workflow_run, "is_completed", False): + logger.warning( + "Workflow run is not completed in report_workflow_run_platform_usage" + ) return organization_id = _workflow_run_organization_id(workflow_run) @@ -77,6 +80,9 @@ async def report_workflow_run_platform_usage(workflow_run) -> None: try: if not await _organization_uses_mps_billing_v2(organization_id): + logger.debug( + "Not reporting platform usage since org not using mps billing v2" + ) return result = await mps_service_key_client.report_platform_usage( diff --git a/api/tests/test_dograh_managed_correlation.py b/api/tests/test_dograh_managed_correlation.py index b0cb52c0..e182c495 100644 --- a/api/tests/test_dograh_managed_correlation.py +++ b/api/tests/test_dograh_managed_correlation.py @@ -22,6 +22,20 @@ class _FakeWebSocket: self.state = State.CLOSED +class _IterableFakeWebSocket(_FakeWebSocket): + def __init__(self, incoming_messages: list[dict]): + super().__init__() + self.incoming_messages = [json.dumps(message) for message in incoming_messages] + + def __aiter__(self): + return self + + async def __anext__(self) -> str: + if not self.incoming_messages: + raise StopAsyncIteration + return self.incoming_messages.pop(0) + + def test_dograh_llm_uses_explicit_mps_correlation_id(): service = DograhLLMService( api_key="mps-secret", @@ -108,3 +122,25 @@ async def test_dograh_tts_messages_use_explicit_mps_correlation_id(monkeypatch): assert fake_ws.messages[1]["type"] == "create_context" assert fake_ws.messages[1]["correlation_id"] == "mps-corr-123" assert fake_ws.messages[1]["mps_billing_version"] == "2" + + +@pytest.mark.asyncio +async def test_dograh_tts_final_for_missing_context_is_ignored(): + service = DograhTTSService(api_key="mps-secret") + service._websocket = _IterableFakeWebSocket( + [{"type": "final", "context_id": "ctx-already-removed"}] + ) + service._remote_initialized_context_ids.add("ctx-already-removed") + + remove_calls = [] + + async def fake_remove_audio_context(context_id: str): + remove_calls.append(context_id) + + service.audio_context_available = lambda context_id: False + service.remove_audio_context = fake_remove_audio_context + + await service._receive_messages() + + assert remove_calls == [] + assert "ctx-already-removed" not in service._remote_initialized_context_ids diff --git a/pipecat b/pipecat index 7992b834..85a48a37 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 7992b83484da402f45816bb3555cd50ceeb0ec1a +Subproject commit 85a48a37bf51e5b8854a85a2ff6319c67937b6fa