chore: rename PipelineTask to PipelineWorker

This commit is contained in:
Abhishek Kumar 2026-05-29 14:12:55 +05:30
parent e695436fb3
commit dfb9fd80d2
20 changed files with 50 additions and 50 deletions

View file

@ -15,7 +15,7 @@ Provided here:
- ``NoopFeedbackObserver``: a ``RealtimeFeedbackObserver`` stand-in with
no WebSocket / clock-task side effects.
- ``patch_run_pipeline_externals``: ``contextmanager`` that applies the
full patch set and captures the constructed ``PipelineTask`` for the
full patch set and captures the constructed ``PipelineWorker`` for the
caller. Optional ``llm`` / ``tts`` arguments inject preconfigured
mocks; otherwise blank ``MockLLMService`` / ``MockTTSService``
instances are constructed per-call.
@ -84,10 +84,10 @@ def patch_run_pipeline_externals(
tts: MockTTSService | None = None,
):
"""Patch the externally-talking pieces of ``_run_pipeline`` and capture
the constructed ``PipelineTask`` so tests can drive it from outside.
the constructed ``PipelineWorker`` so tests can drive it from outside.
Args:
captured_task: A list the constructed ``PipelineTask`` is appended
captured_task: A list the constructed ``PipelineWorker`` is appended
to. Tests read ``captured_task[0]`` to get a handle on the task
(to wait on its start event, queue frames, cancel it, etc.).
llm: Optional pre-built ``MockLLMService``. When given, every call
@ -168,7 +168,7 @@ def patch_run_pipeline_externals(
return_value="completed",
)
)
# Capture the PipelineTask so the test can drive it from outside.
# Capture the PipelineWorker so the test can drive it from outside.
stack.enter_context(
patch(
"api.services.pipecat.run_pipeline.create_pipeline_task",

View file

@ -2,7 +2,7 @@
Drives the actual ``_run_pipeline`` against the test database with real
DB rows (organization, user, user configuration, workflow, workflow run)
and pipecat's real ``MockTransport`` / ``Pipeline`` / ``PipelineTask``.
and pipecat's real ``MockTransport`` / ``Pipeline`` / ``PipelineWorker``.
The only patches are for things that talk to genuinely external systems;
those are applied via ``patch_run_pipeline_externals`` from the shared
helpers module.

View file

@ -191,7 +191,7 @@ async def _run_test_body(workflow_run_setup, db_session) -> None:
)
# Locate the assistant aggregator's LLM context (downstream of TTS).
# The PipelineTask wraps the user's pipeline inside another Pipeline,
# The PipelineWorker wraps the user's pipeline inside another Pipeline,
# so we walk the tree recursively.
assistant_aggregator = _find_processor_by_class_name(
pipeline_task, "LLMAssistantAggregator"

View file

@ -21,7 +21,7 @@ import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -116,7 +116,7 @@ async def run_pipeline_and_capture_context(
)
# Create pipeline task
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)

View file

@ -26,7 +26,7 @@ import pytest
from pipecat.frames.frames import Frame, LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -112,7 +112,7 @@ async def create_engine_with_tracking(
mock_llm: MockLLMService,
test_helper: EndCallTestHelper,
generate_audio: bool = True,
) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineTask]:
) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineWorker]:
"""Create a PipecatEngine with tracking for end call behavior.
Args:
@ -222,7 +222,7 @@ async def create_engine_with_tracking(
)
# Create pipeline task
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)

View file

@ -25,7 +25,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -119,7 +119,7 @@ async def create_test_pipeline(
workflow: WorkflowGraph,
mock_llm: MockLLMService,
user_speech_initial_delay: float = 0.01,
) -> tuple[PipecatEngine, MockTransport, PipelineTask]:
) -> tuple[PipecatEngine, MockTransport, PipelineWorker]:
"""Create a PipecatEngine with full pipeline for testing node switch scenarios.
The pipeline includes a UserSpeechInjector processor that injects
@ -208,7 +208,7 @@ async def create_test_pipeline(
)
# Create pipeline task
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)

View file

@ -12,7 +12,7 @@ import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -107,7 +107,7 @@ async def run_pipeline_with_tool_calls(
)
# Create a real pipeline task
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)

View file

@ -16,7 +16,7 @@ import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -99,7 +99,7 @@ async def _build_engine_and_pipeline(
]
)
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)
return engine, task, function_call_mute_strategy, user_context_aggregator

View file

@ -19,7 +19,7 @@ import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -142,7 +142,7 @@ class TestVariableExtractionDuringTransitions:
)
# Create pipeline task
task = PipelineTask(
task = PipelineWorker(
pipeline,
params=PipelineParams(),
enable_rtvi=False,

View file

@ -8,10 +8,10 @@ from pipecat.frames.frames import (
InterruptionTaskFrame,
LLMRunFrame,
)
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.worker import PipelineWorker
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.workers.base_worker import WorkerParams
class MockTransport(FrameProcessor):
@ -51,11 +51,11 @@ async def test_interruption_with_blocked_end_frame():
transport = MockTransport()
pipeline = Pipeline([transport, busy_wait_processor])
task = PipelineTask(pipeline, enable_rtvi=False)
task = PipelineWorker(pipeline, enable_rtvi=False)
async def run_pipeline():
loop = asyncio.get_running_loop()
params = PipelineTaskParams(loop=loop)
params = WorkerParams(loop=loop)
await task.run(params=params)
async def queue_frame():

View file

@ -21,7 +21,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -212,7 +212,7 @@ async def run_pipeline_and_capture_frames(
engine.set_transport_output(transport_output)
pipeline = Pipeline([llm, tts, transport_output, context_aggregator.assistant()])
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)
# Spy on task.queue_frame and transport_output.queue_frame to capture

View file

@ -35,7 +35,7 @@ import pytest
from pipecat.frames.frames import LLMContextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -62,7 +62,7 @@ async def create_test_pipeline_with_failing_transport(
workflow: WorkflowGraph,
mock_llm: MockLLMService,
fail_after_n_frames: int = 0,
) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineTask]:
) -> tuple[PipecatEngine, MockTTSService, MockTransport, PipelineWorker]:
"""Create a PipecatEngine with failing output transport for testing.
Uses the real MockTransport which now extends BaseOutputTransport and uses
@ -152,7 +152,7 @@ async def create_test_pipeline_with_failing_transport(
)
# Create pipeline task
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)

View file

@ -24,7 +24,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -100,7 +100,7 @@ async def create_pipeline_with_speech_injection(
speeches: list[str],
user_idle_timeout: float = 0.2,
mock_audio_duration_ms: int = 400,
) -> tuple[PipecatEngine, PipelineTask, object]:
) -> tuple[PipecatEngine, PipelineWorker, object]:
"""Create a pipeline with user speech injection and idle handling.
Sets up a realistic pipeline with:
@ -194,7 +194,7 @@ async def create_pipeline_with_speech_injection(
]
)
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)
return engine, task, user_idle_handler

View file

@ -26,7 +26,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -125,7 +125,7 @@ async def create_engine_for_mute_test(
PipecatEngine,
MockTTSService,
MockTransport,
PipelineTask,
PipelineWorker,
LLMUserAggregator,
BotSpeakingObserverProcessor,
]:
@ -196,7 +196,7 @@ async def create_engine_for_mute_test(
]
)
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
engine.set_task(task)
return engine, tts, mock_transport, task, user_context_aggregator, observer

View file

@ -18,7 +18,7 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
@ -161,7 +161,7 @@ class TestVoicemailDetectorWithUserAggregator:
]
)
task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False)
task = PipelineWorker(pipeline, params=PipelineParams(), enable_rtvi=False)
runner = PipelineRunner()
async def run_pipeline():