fix: fix tests

This commit is contained in:
Abhishek Kumar 2026-05-29 15:25:58 +05:30
parent dfb9fd80d2
commit a0e9b31b20
20 changed files with 94 additions and 74 deletions

View file

@ -51,6 +51,7 @@ from api.services.pipecat.tracing_config import (
ensure_tracing,
)
from api.services.pipecat.transport_setup import create_webrtc_transport
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.pipecat.ws_sender_registry import get_ws_sender
from api.services.telephony import registry as telephony_registry
from api.services.workflow.dto import ReactFlowDTO
@ -87,7 +88,6 @@ from pipecat.turns.user_stop import (
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.enums import EndTaskReason, RealtimeFeedbackType
from pipecat.utils.run_context import set_current_org_id, set_current_run_id
from pipecat.workers.base_worker import WorkerParams
# Setup tracing if enabled
ensure_tracing()
@ -821,9 +821,7 @@ async def _run_pipeline(
try:
# Run the pipeline
loop = asyncio.get_running_loop()
params = WorkerParams(loop=loop)
await task.run(params)
await run_pipeline_worker(task)
logger.info(f"Task completed for run {workflow_run_id}")
except asyncio.CancelledError:
logger.warning("Received CancelledError in _run_pipeline")

View file

@ -0,0 +1,36 @@
import asyncio
from pipecat.pipeline.worker import PipelineWorker
from pipecat.workers.runner import WorkerRunner
async def run_pipeline_worker(
worker: PipelineWorker,
*,
handle_sigint: bool = False,
handle_sigterm: bool = False,
auto_end: bool = True,
) -> None:
"""Run a pipeline worker through the v1.3 worker runner lifecycle."""
runner = WorkerRunner(handle_sigint=handle_sigint, handle_sigterm=handle_sigterm)
await runner.add_workers(worker)
await runner.run(auto_end=auto_end)
async def wait_for_pipeline_worker_started(
worker: PipelineWorker,
*,
timeout: float = 3.0,
run_task: asyncio.Task | None = None,
) -> None:
"""Wait until a pipeline worker has fired its stable start lifecycle."""
async def _wait_until_started():
while worker.started_at is None:
if run_task and run_task.done():
await run_task
if worker.has_finished():
raise RuntimeError("PipelineWorker finished before starting")
await asyncio.sleep(0.01)
await asyncio.wait_for(_wait_until_started(), timeout=timeout)

View file

@ -22,7 +22,6 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -45,6 +44,10 @@ from api.services.pipecat.tracing_config import (
build_remote_parent_context,
get_trace_url,
)
from api.services.pipecat.worker_runner import (
run_pipeline_worker,
wait_for_pipeline_worker_started,
)
from api.services.workflow.dto import ReactFlowDTO
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow_graph import WorkflowGraph
@ -534,8 +537,7 @@ async def execute_text_chat_pending_turn(
conversation_type="text",
additional_span_attributes=trace_span_attributes,
)
runner = PipelineRunner(handle_sigint=False, handle_sigterm=False)
runner_task = asyncio.create_task(runner.run(task))
runner_task = asyncio.create_task(run_pipeline_worker(task))
engine.set_task(task)
engine.set_audio_config(audio_config)
@ -548,7 +550,7 @@ async def execute_text_chat_pending_turn(
)
try:
await asyncio.wait_for(task._pipeline_start_event.wait(), timeout=5.0)
await wait_for_pipeline_worker_started(task, timeout=5.0, run_task=runner_task)
await engine.initialize()

View file

@ -23,6 +23,7 @@ from pipecat.transports.base_transport import TransportParams
from api.enums import WorkflowRunMode, WorkflowRunState
from api.services.pipecat.audio_config import create_audio_config
from api.services.pipecat.run_pipeline import _run_pipeline
from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started
from api.tests.integrations._run_pipeline_helpers import (
create_workflow_run_rows,
patch_run_pipeline_externals,
@ -116,7 +117,9 @@ async def test_run_pipeline_fires_initial_response_and_completes_run(
run_task.result() # re-raise the failure
assert captured_task, "create_pipeline_task was never invoked"
pipeline_task = captured_task[0]
await asyncio.wait_for(pipeline_task._pipeline_start_event.wait(), timeout=3.0)
await wait_for_pipeline_worker_started(
pipeline_task, timeout=3.0, run_task=run_task
)
# Let the initial response handler (set_node, queue LLMContextFrame)
# complete before tearing things down.
await asyncio.sleep(0.1)

View file

@ -36,6 +36,7 @@ from pipecat.utils.time import time_now_iso8601
from api.enums import WorkflowRunMode, WorkflowRunState
from api.services.pipecat.audio_config import create_audio_config
from api.services.pipecat.run_pipeline import _run_pipeline
from api.services.pipecat.worker_runner import wait_for_pipeline_worker_started
from api.tests.integrations._run_pipeline_helpers import (
create_workflow_run_rows,
patch_run_pipeline_externals,
@ -186,8 +187,8 @@ async def _run_test_body(workflow_run_setup, db_session) -> None:
assert captured_task, "create_pipeline_task was never invoked"
pipeline_task = captured_task[0]
await asyncio.wait_for(
pipeline_task._pipeline_start_event.wait(), timeout=3.0
await wait_for_pipeline_worker_started(
pipeline_task, timeout=3.0, run_task=run_task
)
# Locate the assistant aggregator's LLM context (downstream of TTS).

View file

@ -21,6 +21,7 @@ from pipecat.frames.frames import (
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
UserTurnInferenceCompletedFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
@ -468,7 +469,7 @@ class TestExecuteHttpTool:
mock_client.request.return_value = mock_response
mock_client_class.return_value.__aenter__.return_value = mock_client
result = await execute_http_tool(tool, arguments)
await execute_http_tool(tool, arguments)
call_kwargs = mock_client.request.call_args.kwargs
assert call_kwargs["method"] == "DELETE"
@ -793,6 +794,7 @@ class TestCustomToolManagerIntegration:
expected_down_frames=[
LLMFullResponseStartFrame,
FunctionCallsFromLLMInfoFrame,
UserTurnInferenceCompletedFrame,
FunctionCallsStartedFrame,
LLMFullResponseEndFrame,
FunctionCallInProgressFrame,

View file

@ -20,7 +20,6 @@ from unittest.mock import AsyncMock, patch
import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -30,6 +29,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow_graph import WorkflowGraph
from api.tests.conftest import (
@ -131,10 +131,9 @@ async def run_pipeline_and_capture_context(
new_callable=AsyncMock,
return_value="completed",
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)

View file

@ -25,7 +25,6 @@ from unittest.mock import AsyncMock, patch
import pytest
from pipecat.frames.frames import Frame, LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -42,6 +41,7 @@ from pipecat.turns.user_mute import (
from pipecat.utils.enums import EndTaskReason
from api.enums import ToolCategory
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.dto import (
EdgeDataDTO,
EndCallNodeData,
@ -279,10 +279,9 @@ class TestEndCallViaNodeTransition:
new_callable=AsyncMock,
return_value={"user_intent": "end call"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)
@ -383,10 +382,9 @@ class TestEndCallViaNodeTransition:
new_callable=AsyncMock,
return_value={"greeting_type": "formal", "user_name": "John"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)
@ -482,10 +480,9 @@ class TestEndCallViaCustomTool:
new_callable=AsyncMock,
return_value={"user_intent": "end"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)
@ -574,10 +571,9 @@ class TestEndCallViaCustomTool:
new_callable=AsyncMock,
return_value={"user_intent": "end"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)
@ -652,10 +648,9 @@ class TestEndCallViaClientDisconnect:
new_callable=AsyncMock,
return_value={"user_intent": "disconnected"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_and_disconnect():
await asyncio.sleep(0.01)
@ -743,10 +738,9 @@ class TestEndCallRaceConditions:
new_callable=AsyncMock,
return_value={"user_intent": "end"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_and_race():
await asyncio.sleep(0.01)
@ -855,10 +849,9 @@ class TestEndCallRaceConditions:
new_callable=AsyncMock,
return_value={"user_intent": "end"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_and_race_disconnect():
nonlocal disconnect_called
@ -950,10 +943,9 @@ class TestEndCallExtractionBehavior:
"_perform_extraction",
side_effect=mock_extraction,
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_and_end():
await asyncio.sleep(0.01)
@ -1076,10 +1068,9 @@ class TestEndCallExtractionBehavior:
"_perform_extraction",
extraction_mock,
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_and_end():
await asyncio.sleep(0.01)

View file

@ -24,7 +24,6 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -48,6 +47,7 @@ from pipecat.turns.user_stop import (
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService
@ -286,10 +286,9 @@ class TestNodeSwitchWithUserSpeech:
new_callable=AsyncMock,
return_value="completed",
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)

View file

@ -11,7 +11,6 @@ from unittest.mock import AsyncMock, patch
import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -21,6 +20,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow_graph import WorkflowGraph
from api.tests.conftest import END_CALL_SYSTEM_PROMPT
@ -122,10 +122,9 @@ async def run_pipeline_with_tool_calls(
new_callable=AsyncMock,
return_value="completed",
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
# Small delay to let runner start

View file

@ -15,7 +15,6 @@ from unittest.mock import AsyncMock, patch
import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -31,6 +30,7 @@ from pipecat.turns.user_mute import (
MuteUntilFirstBotCompleteUserMuteStrategy,
)
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
@ -182,10 +182,9 @@ class TestTransitionFunctionMutesUser:
new_callable=AsyncMock,
return_value={"user_intent": "end call"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)
@ -257,10 +256,9 @@ class TestTransitionFunctionMutesUser:
new_callable=AsyncMock,
return_value={"user_intent": "end call"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)

View file

@ -18,7 +18,6 @@ from unittest.mock import AsyncMock, patch
import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -28,6 +27,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
@ -168,10 +168,9 @@ class TestVariableExtractionDuringTransitions:
new_callable=AsyncMock,
return_value={"user_name": "John Doe"},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)

View file

@ -11,7 +11,8 @@ from pipecat.frames.frames import (
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.worker import PipelineWorker
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.workers.base_worker import WorkerParams
from api.services.pipecat.worker_runner import run_pipeline_worker
class MockTransport(FrameProcessor):
@ -54,9 +55,7 @@ async def test_interruption_with_blocked_end_frame():
task = PipelineWorker(pipeline, enable_rtvi=False)
async def run_pipeline():
loop = asyncio.get_running_loop()
params = WorkerParams(loop=loop)
await task.run(params=params)
await run_pipeline_worker(task)
async def queue_frame():
await task.queue_frames([LLMRunFrame()])

View file

@ -20,7 +20,6 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -31,6 +30,7 @@ from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.services.pipecat.recording_audio_cache import RecordingAudio
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.dto import (
EdgeDataDTO,
EndCallNodeData,
@ -247,10 +247,9 @@ async def run_pipeline_and_capture_frames(
return_value="completed",
),
):
runner = PipelineRunner()
async def run():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize():
await asyncio.sleep(0.01)

View file

@ -34,7 +34,6 @@ from unittest.mock import AsyncMock, patch
import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -50,6 +49,7 @@ from pipecat.turns.user_mute import (
)
from pipecat.utils.enums import EndTaskReason
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
@ -219,10 +219,9 @@ class TestTTSPauseWithAudioWriteFailure:
new_callable=AsyncMock,
return_value={},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_and_end_call():
await asyncio.sleep(0.01)
@ -339,10 +338,9 @@ class TestTTSPauseWithAudioWriteFailure:
new_callable=AsyncMock,
return_value={},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_and_observe():
await asyncio.sleep(0.01)

View file

@ -9,6 +9,7 @@ from pipecat.frames.frames import (
LLMContextFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
UserTurnInferenceCompletedFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
@ -45,6 +46,7 @@ class TestUnregisteredFunctionCall:
expected_down_frames=[
LLMFullResponseStartFrame,
FunctionCallsFromLLMInfoFrame,
UserTurnInferenceCompletedFrame,
FunctionCallsStartedFrame,
LLMFullResponseEndFrame,
FunctionCallInProgressFrame,

View file

@ -23,7 +23,6 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -43,6 +42,7 @@ from pipecat.turns.user_stop import ExternalUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow_graph import WorkflowGraph
from pipecat.tests import MockLLMService, MockTTSService
@ -266,10 +266,9 @@ class TestUserIdleHandler:
new_callable=AsyncMock,
return_value="completed",
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def initialize_engine():
await asyncio.sleep(0.01)

View file

@ -25,7 +25,6 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -44,6 +43,7 @@ from pipecat.turns.user_mute import (
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from api.services.pipecat.worker_runner import run_pipeline_worker
from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.pipecat_engine_variable_extractor import (
VariableExtractionManager,
@ -258,10 +258,9 @@ class TestUserMutingDuringBotSpeech:
new_callable=AsyncMock,
return_value={},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def run_test():
await asyncio.sleep(0.01)
@ -349,10 +348,9 @@ class TestUserMutingDuringBotSpeech:
new_callable=AsyncMock,
return_value={},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def run_test():
await asyncio.sleep(0.01)
@ -445,10 +443,9 @@ class TestUserMutingDuringBotSpeech:
new_callable=AsyncMock,
return_value={},
):
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def run_test():
await asyncio.sleep(0.01)

View file

@ -17,7 +17,6 @@ from pipecat.frames.frames import (
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
@ -36,6 +35,7 @@ from pipecat.turns.user_stop import (
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.time import time_now_iso8601
from api.services.pipecat.worker_runner import run_pipeline_worker
from pipecat.tests import MockLLMService
@ -162,10 +162,9 @@ class TestVoicemailDetectorWithUserAggregator:
)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
runner = PipelineRunner()
async def run_pipeline():
await runner.run(task)
await run_pipeline_worker(task)
async def inject_frames():
await asyncio.sleep(0.05)

@ -1 +1 @@
Subproject commit 1e145e4267df535b64547ed229d6234a99bd8843
Subproject commit 54c83d862a418082d9b33d90233a6d1b78ebcdf8