chore: bump pipecat version and fix tests (#263)

* chore: bump pipecat version and fix tests

* chore: add github workflow to run tests

* fix: install reqirements.dev.txt in test script

* fix: fix api-test action

* feat: add integration test

* test: add integration tests

* test: add test for function call mute strategy
This commit is contained in:
Abhishek 2026-05-04 21:35:37 +05:30 committed by GitHub
parent d256c6005c
commit 0e12c41fc7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
76 changed files with 1776 additions and 670 deletions

View file

View file

@ -0,0 +1,249 @@
"""Shared scaffolding for ``_run_pipeline`` integration tests.
Both ``test_run_pipeline.py`` and ``test_run_pipeline_text_greeting.py``
drive the real ``_run_pipeline`` end-to-end with the same set of external
boundaries patched out (STT/LLM/TTS factories, S3 recording fetcher,
PostHog publisher, ARQ enqueuer, real-time feedback observer). This
module centralises that scaffolding so each test only declares the bits
that differ its workflow definition and any preconfigured mocks.
Provided here:
- ``USER_CONFIGURATION``: a minimal user-configuration dict with valid
provider/model values; the keys themselves are dummy.
- ``PassthroughProcessor``: an STT stand-in that forwards frames as-is.
- ``NoopFeedbackObserver``: a ``RealtimeFeedbackObserver`` stand-in with
no WebSocket / clock-task side effects.
- ``patch_run_pipeline_externals``: ``contextmanager`` that applies the
full patch set and captures the constructed ``PipelineTask`` for the
caller. Optional ``llm`` / ``tts`` arguments inject preconfigured
mocks; otherwise blank ``MockLLMService`` / ``MockTTSService``
instances are constructed per-call.
- ``create_workflow_run_rows``: helper that creates the org / user /
user-configuration / workflow / workflow-run rows for an integration
test. Each test wires this through its own thin fixture so the
workflow definition stays local to the test.
"""
from contextlib import ExitStack, contextmanager
from typing import Any
from unittest.mock import AsyncMock, patch
from pipecat.frames.frames import Frame
from pipecat.observers.base_observer import BaseObserver
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from api.db.models import OrganizationModel, UserModel
from api.enums import WorkflowRunMode
from pipecat.tests import MockLLMService, MockTTSService
USER_CONFIGURATION: dict[str, Any] = {
"is_realtime": False,
"stt": {
"provider": "deepgram",
"model": "nova-3",
"api_key": "test-key",
},
"tts": {
"provider": "cartesia",
"model": "sonic-2",
"api_key": "test-key",
"voice_id": "test-voice",
},
"llm": {
"provider": "openai",
"model": "gpt-4.1",
"api_key": "test-key",
},
}
class PassthroughProcessor(FrameProcessor):
"""Stand-in for the STT processor: forwards every frame untouched."""
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
class NoopFeedbackObserver(BaseObserver):
"""Stand-in for ``RealtimeFeedbackObserver``: no WS / no clock task."""
def __init__(self, *_args, **_kwargs):
super().__init__()
async def cleanup(self):
pass
@contextmanager
def patch_run_pipeline_externals(
captured_task: list,
*,
llm: MockLLMService | None = None,
tts: MockTTSService | None = None,
):
"""Patch the externally-talking pieces of ``_run_pipeline`` and capture
the constructed ``PipelineTask`` so tests can drive it from outside.
Args:
captured_task: A list the constructed ``PipelineTask`` is appended
to. Tests read ``captured_task[0]`` to get a handle on the task
(to wait on its start event, queue frames, cancel it, etc.).
llm: Optional pre-built ``MockLLMService``. When given, every call
to ``create_llm_service`` returns this same instance (so the
test can inspect its ``mock_steps`` / ``current_step``).
When ``None``, a blank ``MockLLMService`` is constructed.
tts: Optional pre-built ``MockTTSService``. Same semantics as
``llm``: pass an instance to share state with the test, or
``None`` to use a fresh one.
"""
from api.services.pipecat import pipeline_builder as _pipeline_builder
original_create_task = _pipeline_builder.create_pipeline_task
def _capture_task(*args, **kwargs):
task = original_create_task(*args, **kwargs)
captured_task.append(task)
return task
def _llm_factory(*_args, **_kwargs):
return llm if llm is not None else MockLLMService(api_key="test")
def _tts_factory(*_args, **_kwargs):
return tts if tts is not None else MockTTSService()
with ExitStack() as stack:
# Replace service factories with in-process test doubles.
stack.enter_context(
patch(
"api.services.pipecat.run_pipeline.create_llm_service",
_llm_factory,
)
)
stack.enter_context(
patch(
"api.services.pipecat.run_pipeline.create_stt_service",
lambda *_args, **_kwargs: PassthroughProcessor(),
)
)
stack.enter_context(
patch(
"api.services.pipecat.run_pipeline.create_tts_service",
_tts_factory,
)
)
# S3 — the recording fetcher would otherwise resolve org-scoped recordings.
stack.enter_context(
patch(
"api.services.pipecat.run_pipeline.create_recording_audio_fetcher",
lambda *_args, **_kwargs: AsyncMock(return_value=None),
)
)
# External fire-and-forget integrations.
stack.enter_context(
patch(
"api.services.pipecat.event_handlers._capture_call_event",
new=AsyncMock(),
)
)
stack.enter_context(
patch(
"api.services.pipecat.event_handlers.enqueue_job",
new=AsyncMock(),
)
)
# Skip the real-time feedback observer (WebSocket / log-buffer streaming).
stack.enter_context(
patch(
"api.services.pipecat.run_pipeline.RealtimeFeedbackObserver",
NoopFeedbackObserver,
)
)
# Disposition mapper would otherwise call out to the LLM.
stack.enter_context(
patch(
"api.services.workflow.pipecat_engine.apply_disposition_mapping",
new_callable=AsyncMock,
return_value="completed",
)
)
# Capture the PipelineTask so the test can drive it from outside.
stack.enter_context(
patch(
"api.services.pipecat.run_pipeline.create_pipeline_task",
side_effect=_capture_task,
)
)
yield
async def create_workflow_run_rows(
db_session,
async_session,
*,
workflow_definition: dict,
name_prefix: str,
provider_id_suffix: str,
):
"""Create org / user / user-configuration / workflow / workflow-run rows
in the test database for a ``_run_pipeline`` integration test.
Args:
db_session: The patched ``DBClient`` from the ``db_session`` fixture.
async_session: The raw ``AsyncSession`` from the ``async_session``
fixture (used to add the org/user rows directly).
workflow_definition: The dict that becomes
``WorkflowModel.workflow_definition`` and the V1 workflow_json.
name_prefix: Used to build human-readable workflow / run names.
provider_id_suffix: Used to generate unique ``provider_id`` values
for the org and user rows so concurrent or repeated test runs
don't collide.
Returns:
Tuple of (workflow_run, user, workflow).
"""
from api.schemas.user_configuration import UserConfiguration
org = OrganizationModel(provider_id=f"test-org-{provider_id_suffix}")
async_session.add(org)
await async_session.flush()
user = UserModel(
provider_id=f"test-user-{provider_id_suffix}",
selected_organization_id=org.id,
)
async_session.add(user)
await async_session.flush()
await db_session.update_user_configuration(
user_id=user.id,
configuration=UserConfiguration.model_validate(USER_CONFIGURATION),
)
workflow = await db_session.create_workflow(
name=f"{name_prefix} Workflow",
workflow_definition=workflow_definition,
user_id=user.id,
organization_id=org.id,
)
workflow_run = await db_session.create_workflow_run(
name=f"{name_prefix} Run",
workflow_id=workflow.id,
mode=WorkflowRunMode.SMALLWEBRTC.value,
user_id=user.id,
)
return workflow_run, user, workflow
# Keep the module's public surface explicit so ``import *`` doesn't grab
# transitive imports.
__all__ = [
"USER_CONFIGURATION",
"PassthroughProcessor",
"NoopFeedbackObserver",
"patch_run_pipeline_externals",
"create_workflow_run_rows",
]

View file

@ -0,0 +1,134 @@
"""Integration tests for ``api.services.pipecat.run_pipeline._run_pipeline``.
Drives the actual ``_run_pipeline`` against the test database with real
DB rows (organization, user, user configuration, workflow, workflow run)
and pipecat's real ``MockTransport`` / ``Pipeline`` / ``PipelineTask``.
The only patches are for things that talk to genuinely external systems;
those are applied via ``patch_run_pipeline_externals`` from the shared
helpers module.
Verifies that the wiring done by ``_run_pipeline`` (in particular
``register_event_handlers``) produces the right behaviour end-to-end:
``maybe_trigger_initial_response`` fires (``engine.set_node`` runs), and
on shutdown the workflow run is persisted with the expected state,
completion flag, and ``gathered_context`` entries.
"""
import asyncio
import pytest
from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from api.enums import WorkflowRunMode, WorkflowRunState
from api.services.pipecat.audio_config import create_audio_config
from api.services.pipecat.run_pipeline import _run_pipeline
from api.tests.integrations._run_pipeline_helpers import (
create_workflow_run_rows,
patch_run_pipeline_externals,
)
WORKFLOW_DEFINITION = {
"nodes": [
{
"id": "start",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {
"name": "Start",
"prompt": "You are a helpful assistant. Greet the user briefly.",
"is_start": True,
"allow_interrupt": False,
"add_global_prompt": False,
},
},
{
"id": "end",
"type": "endCall",
"position": {"x": 0, "y": 200},
"data": {
"name": "End",
"prompt": "End the call politely.",
"is_end": True,
"allow_interrupt": False,
"add_global_prompt": False,
},
},
],
"edges": [
{
"id": "start-end",
"source": "start",
"target": "end",
"data": {"label": "End", "condition": "When the user wants to end."},
}
],
}
@pytest.fixture
async def workflow_run_setup(db_session, async_session):
"""Create org/user/user_configuration/workflow/workflow_run rows in the
test database. Returns (workflow_run, user, workflow)."""
return await create_workflow_run_rows(
db_session,
async_session,
workflow_definition=WORKFLOW_DEFINITION,
name_prefix="Event Handler Integration",
provider_id_suffix="event-handlers",
)
@pytest.mark.asyncio
async def test_run_pipeline_fires_initial_response_and_completes_run(
workflow_run_setup, db_session
):
"""End-to-end: _run_pipeline boots, register_event_handlers wires up,
on_pipeline_started + on_client_connected both fire, the initial
response is triggered (set_node), and on_pipeline_finished updates
the workflow_run row to COMPLETED."""
workflow_run, user, workflow = workflow_run_setup
transport = MockTransport(
TransportParams(audio_in_enabled=True, audio_out_enabled=True)
)
captured_task: list = []
audio_config = create_audio_config(WorkflowRunMode.SMALLWEBRTC.value)
with patch_run_pipeline_externals(captured_task):
run_coro = _run_pipeline(
transport=transport,
workflow_id=workflow.id,
workflow_run_id=workflow_run.id,
user_id=user.id,
audio_config=audio_config,
user_provider_id=user.provider_id,
)
run_task = asyncio.create_task(run_coro)
# Wait until create_pipeline_task is invoked. Surface any
# exception from _run_pipeline immediately rather than swallowing
# it during the wait loop.
for _ in range(60):
if captured_task or run_task.done():
break
await asyncio.sleep(0.05)
if run_task.done() and not captured_task:
run_task.result() # re-raise the failure
assert captured_task, "create_pipeline_task was never invoked"
pipeline_task = captured_task[0]
await asyncio.wait_for(pipeline_task._pipeline_start_event.wait(), timeout=3.0)
# Let the initial response handler (set_node, queue LLMContextFrame)
# complete before tearing things down.
await asyncio.sleep(0.1)
await pipeline_task.cancel()
await asyncio.wait_for(run_task, timeout=5.0)
# Verify the run was completed end-to-end via the real on_pipeline_finished
# handler — DB side effects, not mock assertions.
refreshed = await db_session.get_workflow_run_by_id(workflow_run.id)
assert refreshed.is_completed is True
assert refreshed.state == WorkflowRunState.COMPLETED.value
# set_node("start") populates "nodes_visited" via _gathered_context, and
# on_pipeline_finished merges call_tags into gathered_context.
assert "Start" in refreshed.gathered_context.get("nodes_visited", [])
assert "call_tags" in refreshed.gathered_context

View file

@ -0,0 +1,289 @@
"""Integration test for the text-greeting flow through ``_run_pipeline``.
Drives the full pipeline produced by ``_run_pipeline`` against the test
database with a workflow whose start node has a text greeting configured.
The flow under test:
1. ``maybe_trigger_initial_response`` (in ``event_handlers.py``) sees a
text greeting and queues ``TTSSpeakFrame(greeting)``.
2. ``MockTTSService`` synthesises audio for the greeting; the real
``MediaSender`` machinery in ``MockOutputTransport`` emits
``BotStartedSpeakingFrame`` and ``BotStoppedSpeakingFrame``.
3. The TTS service emits an ``LLMAssistantPushAggregationFrame`` after
``TTSStoppedFrame``, so the greeting is appended to the assistant
context by ``LLMAssistantAggregator``.
4. We then push a ``TranscriptionFrame`` into the pipeline. After the
user-turn-stop timeout, ``LLMUserAggregator`` pushes a context frame
to the LLM, ``MockLLMService`` returns an ``end_call`` tool call, and
the engine's transition function moves to the end node and calls
``end_call_with_reason``.
5. ``on_pipeline_finished`` records the run as COMPLETED.
External boundaries are patched via ``patch_run_pipeline_externals``
from the shared helpers module. Preconfigured ``MockLLMService`` /
``MockTTSService`` instances are passed in so the end_call response is
deterministic and the synthesised audio length is short.
"""
import asyncio
import pytest
from pipecat.frames.frames import TranscriptionFrame
from pipecat.tests.mock_transport import MockTransport
from pipecat.transports.base_transport import TransportParams
from pipecat.utils.time import time_now_iso8601
from api.enums import WorkflowRunMode, WorkflowRunState
from api.services.pipecat.audio_config import create_audio_config
from api.services.pipecat.run_pipeline import _run_pipeline
from api.tests.integrations._run_pipeline_helpers import (
create_workflow_run_rows,
patch_run_pipeline_externals,
)
from pipecat.tests import MockLLMService, MockTTSService
GREETING_TEXT = (
"Thanks for calling Happy Feet, this is Sarah. How can I help you today?"
)
WORKFLOW_DEFINITION = {
"nodes": [
{
"id": "start",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {
"name": "Start",
"prompt": "You are Sarah. Help the caller and end the call when they ask.",
"is_start": True,
"allow_interrupt": False,
"add_global_prompt": False,
"greeting": GREETING_TEXT,
"greeting_type": "text",
},
},
{
"id": "end",
"type": "endCall",
"position": {"x": 0, "y": 200},
"data": {
"name": "End",
"prompt": "End the call politely.",
"is_end": True,
"allow_interrupt": False,
"add_global_prompt": False,
},
},
],
"edges": [
{
"id": "start-end",
"source": "start",
"target": "end",
"data": {"label": "End Call", "condition": "When the user wants to end."},
}
],
}
# Hard cap on the entire test. Without this, a hung pipeline would keep the
# pytest worker alive indefinitely (the harness has no pytest-timeout plugin).
TEST_HARD_TIMEOUT_SECONDS = 25.0
@pytest.fixture
async def workflow_run_setup(db_session, async_session):
"""Create org/user/user_configuration/workflow/workflow_run rows. The
workflow's start node is configured with a text greeting."""
return await create_workflow_run_rows(
db_session,
async_session,
workflow_definition=WORKFLOW_DEFINITION,
name_prefix="Text Greeting Integration",
provider_id_suffix="text-greeting",
)
def _greeting_in_assistant_context(context) -> bool:
"""Return True if the greeting text has been appended to the assistant context."""
for message in context.get_messages():
if isinstance(message, dict) and message.get("role") == "assistant":
content = message.get("content") or ""
if GREETING_TEXT in content:
return True
return False
def _find_processor_by_class_name(pipeline_task, class_name: str):
"""Walk every processor reachable from the task's pipeline (including nested
sub-pipelines) and return the first one whose class name matches."""
visited: set[int] = set()
stack = [pipeline_task._pipeline]
while stack:
processor = stack.pop()
if id(processor) in visited:
continue
visited.add(id(processor))
if processor.__class__.__name__ == class_name:
return processor
sub = getattr(processor, "_processors", None)
if sub:
stack.extend(sub)
return None
async def _wait_for(predicate, *, timeout: float, interval: float = 0.05) -> bool:
"""Poll ``predicate`` (sync callable returning bool) until it returns True
or the timeout elapses. Returns the final predicate value."""
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
if predicate():
return True
await asyncio.sleep(interval)
return predicate()
async def _run_test_body(workflow_run_setup, db_session) -> None:
workflow_run, user, workflow = workflow_run_setup
# Prepare the LLM with one step: the end_call function call.
# Edge label "End Call" maps to function name "end_call".
end_call_chunks = MockLLMService.create_function_call_chunks(
function_name="end_call",
arguments={},
tool_call_id="call_end_1",
)
llm = MockLLMService(mock_steps=[end_call_chunks], chunk_delay=0.001)
# Short audio greeting so the bot finishes speaking quickly in tests.
tts = MockTTSService(mock_audio_duration_ms=50, frame_delay=0)
transport = MockTransport(
TransportParams(audio_in_enabled=True, audio_out_enabled=True)
)
captured_task: list = []
audio_config = create_audio_config(WorkflowRunMode.SMALLWEBRTC.value)
pipeline_task = None
try:
with patch_run_pipeline_externals(captured_task, llm=llm, tts=tts):
run_coro = _run_pipeline(
transport=transport,
workflow_id=workflow.id,
workflow_run_id=workflow_run.id,
user_id=user.id,
audio_config=audio_config,
user_provider_id=user.provider_id,
)
run_task = asyncio.create_task(run_coro)
for _ in range(60):
if captured_task or run_task.done():
break
await asyncio.sleep(0.05)
if run_task.done() and not captured_task:
run_task.result()
assert captured_task, "create_pipeline_task was never invoked"
pipeline_task = captured_task[0]
await asyncio.wait_for(
pipeline_task._pipeline_start_event.wait(), timeout=3.0
)
# Locate the assistant aggregator's LLM context (downstream of TTS).
# The PipelineTask wraps the user's pipeline inside another Pipeline,
# so we walk the tree recursively.
assistant_aggregator = _find_processor_by_class_name(
pipeline_task, "LLMAssistantAggregator"
)
assert assistant_aggregator is not None, (
"LLMAssistantAggregator not found in pipeline"
)
context = assistant_aggregator.context
# Wait for the greeting to be appended to the assistant context. The
# TTSSpeakFrame -> audio frames -> BotStoppedSpeaking -> assistant
# aggregation push chain runs through the real pipeline.
appeared = await _wait_for(
lambda: _greeting_in_assistant_context(context), timeout=5.0
)
assert appeared, (
"Greeting was not appended to the assistant context. "
f"Messages: {context.get_messages()}"
)
# The LLM must not have been invoked yet — the greeting bypasses
# the LLM entirely (goes straight to TTS via TTSSpeakFrame).
assert llm.get_current_step() == 0, (
f"LLM should not have run yet; current_step={llm.get_current_step()}"
)
# Now simulate the user replying. SpeechTimeoutUserTurnStopStrategy
# (default 0.6s) ends the user turn, which triggers an LLM run;
# the LLM returns end_call; the transition function moves to the
# end node and ends the call.
await pipeline_task.queue_frame(
TranscriptionFrame(
text="I want to end the call now please.",
user_id="test-user",
timestamp=time_now_iso8601(),
)
)
# Wait for the run to complete.
await asyncio.wait_for(run_task, timeout=10.0)
# Outside the patch ctx so the assertions exercise real DB state.
# The first LLM run produces the end_call; the engine then transitions
# to the End node and triggers a second generation (which is empty —
# mock_steps[1] is unset). What matters is that at least one run
# happened, i.e. the user transcript actually drove the LLM.
assert llm.get_current_step() >= 1, (
f"Expected at least one LLM generation; got step={llm.get_current_step()}"
)
refreshed = await db_session.get_workflow_run_by_id(workflow_run.id)
assert refreshed.is_completed is True
assert refreshed.state == WorkflowRunState.COMPLETED.value
nodes_visited = refreshed.gathered_context.get("nodes_visited", [])
assert "Start" in nodes_visited
assert "End" in nodes_visited
finally:
# Best-effort cleanup so a partially-run pipeline doesn't leak tasks
# past the test boundary.
if pipeline_task is not None and not pipeline_task.has_finished():
try:
await asyncio.wait_for(pipeline_task.cancel(), timeout=3.0)
except Exception:
pass
@pytest.mark.asyncio
async def test_text_greeting_speaks_then_user_transcript_triggers_end_call(
workflow_run_setup, db_session
):
"""End-to-end:
- ``maybe_trigger_initial_response`` queues ``TTSSpeakFrame`` for the
start-node text greeting.
- ``MockTTSService`` synthesises audio; ``MockOutputTransport`` emits
bot speaking events; the assistant aggregator appends the greeting
to the context after the TTS turn ends.
- We push a ``TranscriptionFrame`` into the pipeline. After the user
turn stop timeout, ``MockLLMService`` returns an ``end_call`` tool
call which transitions to the end node and ends the run.
The whole body is bounded by ``TEST_HARD_TIMEOUT_SECONDS`` so a hung
pipeline fails the test rather than wedging the test runner.
"""
try:
await asyncio.wait_for(
_run_test_body(workflow_run_setup, db_session),
timeout=TEST_HARD_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError as e:
raise AssertionError(
f"Test exceeded hard timeout of {TEST_HARD_TIMEOUT_SECONDS}s — "
"pipeline likely hung. Check earlier debug logs for the last frame "
"to reach the pipeline."
) from e