fix: changes to update pipecat version to 0.0.100 (#122)

* feat: add stt evals

* add smart turn as provider

* chore: remove deprecations

* chore: format files

* fix: remove deprecated UserIdleProcessor

* fix: remove deprecated TranscriptProcessor

* chore: update pipecat submodule

* feat: add evals visualisation

* fix: trigger llm generation on client connected and pipeline started

* chore: update pipecat

* chore: update pipecat submodule

* Add tests

* fix: slow loading of workflow page

* chore: update pipecat submodule

* Show version after release

* Fixes #99

* fix: provider check for websocket connection

* Fixes #107

* Fix #96

* chore: fix documentation

* fix: cloudonix campaign call error

---------

Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
This commit is contained in:
Abhishek 2026-01-23 18:53:59 +05:30 committed by GitHub
parent a4367bd83b
commit 911c5ed416
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
104 changed files with 16919 additions and 597 deletions

View file

@ -0,0 +1,72 @@
"""add public_access_token
Revision ID: 181475b2a1a1
Revises: dc33eef8dabe
Create Date: 2026-01-23 17:37:54.449308
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "181475b2a1a1"
down_revision: Union[str, None] = "dc33eef8dabe"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_api_keys_key_hash"), table_name="api_keys")
op.create_index("ix_api_keys_key_hash", "api_keys", ["key_hash"], unique=False)
op.create_index(
"ix_kb_chunks_embedding_ivfflat",
"knowledge_base_chunks",
["embedding"],
unique=False,
postgresql_using="ivfflat",
postgresql_with={"lists": 100},
postgresql_ops={"embedding": "vector_cosine_ops"},
)
op.create_index(
"ix_kb_chunks_embedding_model",
"knowledge_base_chunks",
["embedding_model"],
unique=False,
)
op.add_column(
"workflow_runs",
sa.Column("public_access_token", sa.String(length=36), nullable=True),
)
op.create_index(
"idx_workflow_runs_public_access_token",
"workflow_runs",
["public_access_token"],
unique=True,
postgresql_where=sa.text("public_access_token IS NOT NULL"),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
"idx_workflow_runs_public_access_token",
table_name="workflow_runs",
postgresql_where=sa.text("public_access_token IS NOT NULL"),
)
op.drop_column("workflow_runs", "public_access_token")
op.drop_index("ix_kb_chunks_embedding_model", table_name="knowledge_base_chunks")
op.drop_index(
"ix_kb_chunks_embedding_ivfflat",
table_name="knowledge_base_chunks",
postgresql_using="ivfflat",
postgresql_with={"lists": 100},
postgresql_ops={"embedding": "vector_cosine_ops"},
)
op.drop_index("ix_api_keys_key_hash", table_name="api_keys")
op.create_index(op.f("ix_api_keys_key_hash"), "api_keys", ["key_hash"], unique=True)
# ### end Alembic commands ###

View file

@ -14,7 +14,6 @@ FILLER_SOUND_PROBABILITY = 0.0
VOICEMAIL_RECORDING_DURATION = 5.0
# Configuration constants
ENABLE_SMART_TURN = os.getenv("ENABLE_SMART_TURN", "false").lower() == "true"
ENABLE_TRACING = os.getenv("ENABLE_TRACING", "false").lower() == "true"
ENABLE_RNNOISE = os.getenv("ENABLE_RNNOISE", "false").lower() == "true"
@ -52,6 +51,23 @@ ENABLE_ARI_STASIS = os.getenv("ENABLE_ARI_STASIS", "false").lower() == "true"
SERIALIZE_LOG_OUTPUT = os.getenv("SERIALIZE_LOG_OUTPUT", "false").lower() == "true"
ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false").lower() == "true"
def _get_version() -> str:
"""Read version from pyproject.toml."""
try:
import tomllib
pyproject_path = APP_ROOT_DIR / "pyproject.toml"
with open(pyproject_path, "rb") as f:
pyproject = tomllib.load(f)
return pyproject.get("project", {}).get("version", "dev")
except Exception:
return "dev"
# Application version (read from pyproject.toml)
APP_VERSION = _get_version()
# Country code mapping: ISO country code -> international dialing prefix
COUNTRY_CODES = {
"US": "1", # United States

View file

@ -360,6 +360,17 @@ class WorkflowRunModel(Base):
campaign = relationship("CampaignModel")
queued_run_id = Column(Integer, ForeignKey("queued_runs.id"), nullable=True)
queued_run = relationship("QueuedRunModel", foreign_keys=[queued_run_id])
public_access_token = Column(String(36), nullable=True)
# Indexes
__table_args__ = (
Index(
"idx_workflow_runs_public_access_token",
"public_access_token",
unique=True,
postgresql_where=text("public_access_token IS NOT NULL"),
),
)
# LoopTalk Testing Models

View file

@ -4,7 +4,7 @@ from typing import Optional
from sqlalchemy import func
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import load_only, selectinload
from api.db.base_client import BaseDBClient
from api.db.models import WorkflowDefinitionModel, WorkflowModel, WorkflowRunModel
@ -111,6 +111,70 @@ class WorkflowClient(BaseDBClient):
result = await session.execute(query)
return result.scalars().all()
async def get_all_workflows_for_listing(
self, organization_id: int = None, status: str = None
) -> list[WorkflowModel]:
"""Get workflows with only the columns needed for listing.
This is an optimized version that excludes large JSON columns like
workflow_definition, template_context_variables, etc.
Args:
organization_id: Filter by organization ID
status: Filter by status (active/archived)
Returns:
List of WorkflowModel with only id, name, status, created_at loaded
"""
async with self.async_session() as session:
query = select(WorkflowModel).options(
load_only(
WorkflowModel.id,
WorkflowModel.name,
WorkflowModel.status,
WorkflowModel.created_at,
)
)
if organization_id:
query = query.where(WorkflowModel.organization_id == organization_id)
if status:
query = query.where(WorkflowModel.status == status)
result = await session.execute(query)
return result.scalars().all()
async def get_workflow_counts(self, organization_id: int = None) -> dict[str, int]:
"""Get workflow counts by status.
Args:
organization_id: Filter by organization ID
Returns:
Dict with 'total', 'active', 'archived' counts
"""
async with self.async_session() as session:
query = select(
WorkflowModel.status,
func.count(WorkflowModel.id).label("count"),
)
if organization_id:
query = query.where(WorkflowModel.organization_id == organization_id)
query = query.group_by(WorkflowModel.status)
result = await session.execute(query)
rows = result.all()
counts = {"total": 0, "active": 0, "archived": 0}
for status, count in rows:
counts[status] = count
counts["total"] += count
return counts
async def get_workflow(
self, workflow_id: int, user_id: int = None, organization_id: int = None
) -> WorkflowModel | None:
@ -310,3 +374,33 @@ class WorkflowClient(BaseDBClient):
)
)
return result.scalar() or 0
async def get_workflow_run_counts(self, workflow_ids: list[int]) -> dict[int, int]:
"""Get run counts for multiple workflows in a single query.
Args:
workflow_ids: List of workflow IDs to get counts for
Returns:
Dict mapping workflow_id to run count
"""
if not workflow_ids:
return {}
async with self.async_session() as session:
result = await session.execute(
select(
WorkflowRunModel.workflow_id,
func.count(WorkflowRunModel.id).label("run_count"),
)
.where(WorkflowRunModel.workflow_id.in_(workflow_ids))
.group_by(WorkflowRunModel.workflow_id)
)
rows = result.all()
# Build dict with counts, defaulting to 0 for workflows with no runs
counts = {workflow_id: 0 for workflow_id in workflow_ids}
for workflow_id, run_count in rows:
counts[workflow_id] = run_count
return counts

View file

@ -1,3 +1,4 @@
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
@ -414,3 +415,56 @@ class WorkflowRunClient(BaseDBClient):
organization_id = workflow_run.workflow.user.selected_organization_id
return workflow_run, organization_id
async def ensure_public_access_token(self, workflow_run_id: int) -> Optional[str]:
"""Generate a public access token if not exists, return existing if present (idempotent).
Args:
workflow_run_id: The ID of the workflow run
Returns:
The public access token string, or None if workflow run not found
"""
async with self.async_session() as session:
result = await session.execute(
select(WorkflowRunModel).where(WorkflowRunModel.id == workflow_run_id)
)
run = result.scalars().first()
if not run:
return None
# Return existing token if present
if run.public_access_token:
return run.public_access_token
# Generate and persist new token
token = str(uuid.uuid4())
run.public_access_token = token
try:
await session.commit()
except Exception as e:
await session.rollback()
raise e
await session.refresh(run)
return run.public_access_token
async def get_workflow_run_by_public_token(
self, token: str
) -> Optional[WorkflowRunModel]:
"""Lookup workflow run by public access token.
Args:
token: The public access token
Returns:
The WorkflowRunModel if found, None otherwise
"""
async with self.async_session() as session:
result = await session.execute(
select(WorkflowRunModel).where(
WorkflowRunModel.public_access_token == token
)
)
return result.scalars().first()

5
api/pyproject.toml Normal file
View file

@ -0,0 +1,5 @@
[project]
name = "dograh-api"
version = "1.10.0"
description = "Backend API for Dograh voice AI platform"
requires-python = ">=3.12"

View file

@ -1,5 +1,6 @@
from fastapi import APIRouter
from loguru import logger
from pydantic import BaseModel
from api.routes.campaign import router as campaign_router
from api.routes.credentials import router as credentials_router
@ -9,6 +10,7 @@ from api.routes.looptalk import router as looptalk_router
from api.routes.organization import router as organization_router
from api.routes.organization_usage import router as organization_usage_router
from api.routes.public_agent import router as public_agent_router
from api.routes.public_download import router as public_download_router
from api.routes.public_embed import router as public_embed_router
from api.routes.reports import router as reports_router
from api.routes.s3_signed_url import router as s3_router
@ -43,11 +45,24 @@ router.include_router(reports_router)
router.include_router(webrtc_signaling_router)
router.include_router(public_embed_router)
router.include_router(public_agent_router)
router.include_router(public_download_router)
router.include_router(workflow_embed_router)
router.include_router(knowledge_base_router)
@router.get("/health")
async def health():
class HealthResponse(BaseModel):
status: str
version: str
backend_api_endpoint: str
@router.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
from api.constants import APP_VERSION, BACKEND_API_ENDPOINT
logger.debug("Health endpoint called")
return {"message": "OK"}
return HealthResponse(
status="ok",
version=APP_VERSION,
backend_api_endpoint=BACKEND_API_ENDPOINT,
)

View file

@ -0,0 +1,95 @@
"""Public download endpoints for workflow recordings and transcripts.
These endpoints provide secure, token-based public access to workflow artifacts
without requiring authentication. Tokens are generated on-demand when webhooks
are executed and included in the webhook payload.
"""
from typing import Literal
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import RedirectResponse
from loguru import logger
from api.db import db_client
from api.services.storage import get_storage_for_backend
router = APIRouter(prefix="/public/download")
@router.get("/workflow/{token}/{artifact_type}")
async def download_workflow_artifact(
token: str,
artifact_type: Literal["recording", "transcript"],
inline: bool = Query(
default=False, description="Display inline in browser instead of download"
),
):
"""Download a workflow recording or transcript via public access token.
This endpoint:
1. Validates the public access token
2. Looks up the corresponding workflow run
3. Generates a signed URL for the requested artifact
4. Redirects to the signed URL
Args:
token: The public access token (UUID format)
artifact_type: Type of artifact - "recording" or "transcript"
inline: If true, sets Content-Disposition to inline for browser preview
Returns:
RedirectResponse to the signed URL (302 redirect)
Raises:
HTTPException 404: If token is invalid or artifact not found
"""
# 1. Lookup workflow run by token
workflow_run = await db_client.get_workflow_run_by_public_token(token)
if not workflow_run:
logger.warning(f"Invalid public access token: {token[:8]}...")
raise HTTPException(status_code=404, detail="Invalid or expired token")
# 2. Get file path based on artifact type
if artifact_type == "recording":
file_path = workflow_run.recording_url
else: # transcript
file_path = workflow_run.transcript_url
if not file_path:
logger.warning(
f"Artifact not found: type={artifact_type}, workflow_run_id={workflow_run.id}"
)
raise HTTPException(
status_code=404,
detail=f"No {artifact_type} available for this workflow run",
)
# 3. Get storage backend for this workflow run
try:
storage = get_storage_for_backend(workflow_run.storage_backend)
except ValueError as e:
logger.error(f"Invalid storage backend: {workflow_run.storage_backend}")
raise HTTPException(status_code=500, detail="Storage configuration error")
# 4. Generate signed URL (1 hour expiration)
try:
signed_url = await storage.aget_signed_url(
file_path=file_path,
expiration=3600, # 1 hour
force_inline=inline,
)
except Exception as e:
logger.error(f"Failed to generate signed URL: {e}")
raise HTTPException(status_code=500, detail="Failed to generate download URL")
if not signed_url:
logger.error(f"Storage returned None for signed URL: {file_path}")
raise HTTPException(status_code=500, detail="Failed to generate download URL")
logger.info(
f"Generated signed URL for {artifact_type}: workflow_run_id={workflow_run.id}, token={token[:8]}..."
)
# 5. Redirect to signed URL
return RedirectResponse(url=signed_url, status_code=302)

View file

@ -97,6 +97,24 @@ class WorkflowResponse(BaseModel):
workflow_configurations: dict | None = None
class WorkflowListResponse(BaseModel):
"""Lightweight response for workflow listings (excludes large fields)."""
id: int
name: str
status: str
created_at: datetime
total_runs: int
class WorkflowCountResponse(BaseModel):
"""Response for workflow count endpoint."""
total: int
active: int
archived: int
class WorkflowTemplateResponse(BaseModel):
id: int
template_name: str
@ -359,6 +377,26 @@ class WorkflowSummaryResponse(BaseModel):
name: str
@router.get("/count")
async def get_workflow_count(
user: UserModel = Depends(get_user),
) -> WorkflowCountResponse:
"""Get workflow counts for the authenticated user's organization.
This is a lightweight endpoint for checking if the user has workflows,
useful for redirect logic without fetching full workflow data.
"""
counts = await db_client.get_workflow_counts(
organization_id=user.selected_organization_id
)
return WorkflowCountResponse(
total=counts["total"],
active=counts["active"],
archived=counts["archived"],
)
@router.get("/fetch")
async def get_workflows(
user: UserModel = Depends(get_user),
@ -366,45 +404,43 @@ async def get_workflows(
None,
description="Filter by status - can be single value (active/archived) or comma-separated (active,archived)",
),
) -> List[WorkflowResponse]:
"""Get all workflows for the authenticated user's organization"""
) -> List[WorkflowListResponse]:
"""Get all workflows for the authenticated user's organization.
Returns a lightweight response with only essential fields for listing.
Use GET /workflow/fetch/{workflow_id} to get full workflow details.
"""
# Handle comma-separated status values
if status and "," in status:
# Split comma-separated values and fetch workflows for each status
status_list = [s.strip() for s in status.split(",")]
all_workflows = []
for status_value in status_list:
workflows = await db_client.get_all_workflows(
workflows = await db_client.get_all_workflows_for_listing(
organization_id=user.selected_organization_id, status=status_value
)
all_workflows.extend(workflows)
workflows = all_workflows
else:
# Single status or no status filter
workflows = await db_client.get_all_workflows(
workflows = await db_client.get_all_workflows_for_listing(
organization_id=user.selected_organization_id, status=status
)
# Get run counts for each workflow
workflow_responses = []
for workflow in workflows:
run_count = await db_client.get_workflow_run_count(workflow.id)
workflow_responses.append(
{
"id": workflow.id,
"name": workflow.name,
"status": workflow.status,
"created_at": workflow.created_at,
"workflow_definition": workflow.workflow_definition_with_fallback,
"current_definition_id": workflow.current_definition_id,
"template_context_variables": workflow.template_context_variables,
"call_disposition_codes": workflow.call_disposition_codes,
"workflow_configurations": workflow.workflow_configurations,
"total_runs": run_count,
}
)
# Get run counts for all workflows in a single query
workflow_ids = [workflow.id for workflow in workflows]
run_counts = await db_client.get_workflow_run_counts(workflow_ids)
return workflow_responses
return [
WorkflowListResponse(
id=workflow.id,
name=workflow.name,
status=workflow.status,
created_at=workflow.created_at,
total_runs=run_counts.get(workflow.id, 0),
)
for workflow in workflows
]
@router.get("/fetch/{workflow_id}")

View file

@ -170,13 +170,6 @@ class CampaignCallDispatcher:
)
raise ValueError(f"Workflow {campaign.workflow_id} not found")
# Merge context variables (queued_run context already includes retry info if applicable)
initial_context = {
**workflow.template_context_variables,
**queued_run.context_variables,
"campaign_id": campaign.id,
}
# Extract phone number
phone_number = queued_run.context_variables.get("phone_number")
if not phone_number:
@ -186,13 +179,25 @@ class CampaignCallDispatcher:
)
raise ValueError(f"No phone number in queued run {queued_run.id}")
# Create workflow run with queued_run_id tracking
workflow_run_name = f"WR-CAMPAIGN-{campaign.id}-{queued_run.id}"
# Get provider first to determine the mode
provider = await self.get_telephony_provider(campaign.organization_id)
workflow_run_mode = provider.PROVIDER_NAME
logger.info(f"Provider name: {provider.PROVIDER_NAME}")
logger.info(f"Queued run context: {queued_run.context_variables}")
# Merge context variables (queued_run context already includes retry info if applicable)
initial_context = {
**workflow.template_context_variables,
**queued_run.context_variables,
"campaign_id": campaign.id,
"provider": provider.PROVIDER_NAME,
}
logger.info(f"Final initial_context: {initial_context}")
# Create workflow run with queued_run_id tracking
workflow_run_name = f"WR-CAMPAIGN-{campaign.id}-{queued_run.id}"
try:
workflow_run = await db_client.create_workflow_run(
name=workflow_run_name,
@ -243,6 +248,8 @@ class CampaignCallDispatcher:
to_number=phone_number,
webhook_url=webhook_url,
workflow_run_id=workflow_run.id,
workflow_id=campaign.workflow_id,
user_id=campaign.created_by,
)
# Store provider type and metadata in gathered_context

View file

@ -300,7 +300,7 @@ TTSConfig = Annotated[
###################################################### STT ########################################################################
DEEPGRAM_STT_MODELS = ["nova-2", "nova-3-general"]
DEEPGRAM_STT_MODELS = ["nova-2", "nova-3-general", "flux-general-en"]
DEEPGRAM_LANGUAGES = [
"multi",
"en",

View file

@ -103,7 +103,6 @@ class LoopTalkPipelineBuilder:
# Set the context and audio_buffer after creation
engine.set_context(context)
engine.set_audio_buffer(audio_buffer)
context_aggregator = LLMContextAggregatorPair(context)

View file

@ -12,9 +12,8 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
from pipecat.serializers.base_serializer import FrameSerializer
class InternalFrameSerializer(FrameSerializer):
@ -24,15 +23,6 @@ class InternalFrameSerializer(FrameSerializer):
preventing control frames from creating infinite loops.
"""
@property
def type(self) -> FrameSerializerType:
"""Internal transport uses binary frames."""
return FrameSerializerType.BINARY
async def setup(self, frame: StartFrame):
"""No setup required for internal transport."""
pass
async def serialize(self, frame: Frame) -> bytes | None:
"""Only serialize audio frames for transmission between agents."""
# Only pass audio frames between agents

View file

@ -22,16 +22,21 @@ from pipecat.pipeline.task import PipelineTask
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
def register_transport_event_handlers(
def register_event_handlers(
task: PipelineTask,
transport,
workflow_run_id,
workflow_run_id: int,
engine: PipecatEngine,
audio_buffer: AudioBufferProcessor,
in_memory_logs_buffer: InMemoryLogsBuffer,
pipeline_metrics_aggregator: PipelineMetricsAggregator,
audio_config=AudioConfig,
):
"""Register event handlers for transport events"""
"""Register all event handlers for transport and task events.
Returns:
Tuple of (in_memory_audio_buffer, in_memory_transcript_buffer) for use by other handlers.
"""
# Initialize in-memory buffers with proper audio configuration
sample_rate = audio_config.pipeline_sample_rate if audio_config else 16000
num_channels = 1 # Pipeline audio is always mono
@ -48,13 +53,35 @@ def register_transport_event_handlers(
)
in_memory_transcript_buffer = InMemoryTranscriptBuffer(workflow_run_id)
# Track both events to ensure LLM is only triggered after both occur
ready_state = {
"pipeline_started": False,
"client_connected": False,
"llm_triggered": False,
}
async def maybe_trigger_llm():
"""Trigger LLM only after both pipeline_started and client_connected events."""
if (
ready_state["pipeline_started"]
and ready_state["client_connected"]
and not ready_state["llm_triggered"]
):
ready_state["llm_triggered"] = True
logger.debug(
"Both pipeline_started and client_connected received - triggering initial LLM generation"
)
await engine.llm.queue_frame(LLMContextFrame(engine.context))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, participant):
logger.debug("In on_client_connected callback handler - initializing workflow")
async def on_client_connected(_transport, _participant):
logger.debug("In on_client_connected callback handler")
await audio_buffer.start_recording()
ready_state["client_connected"] = True
await maybe_trigger_llm()
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, participant):
async def on_client_disconnected(_transport, _participant):
call_disposed = engine.is_call_disposed()
logger.debug(
@ -69,33 +96,16 @@ def register_transport_event_handlers(
if not call_disposed:
await task.cancel()
# Return the buffers so they can be passed to other handlers
return in_memory_audio_buffer, in_memory_transcript_buffer
def register_task_event_handler(
workflow_run_id: int,
engine: PipecatEngine,
task: PipelineTask,
transport,
audio_buffer: AudioBufferProcessor,
in_memory_audio_buffer: InMemoryAudioBuffer,
in_memory_transcript_buffer: InMemoryTranscriptBuffer,
in_memory_logs_buffer: InMemoryLogsBuffer,
pipeline_metrics_aggregator: PipelineMetricsAggregator,
):
@task.event_handler("on_pipeline_started")
async def on_pipeline_started(task: PipelineTask, frame: Frame):
logger.debug(
"In on_pipeline_started callback handler - triggering initial LLM generation"
)
# Trigger initial LLM generation after pipeline has started
await engine.llm.queue_frame(LLMContextFrame(engine.context))
async def on_pipeline_started(_task: PipelineTask, _frame: Frame):
logger.debug("In on_pipeline_started callback handler")
ready_state["pipeline_started"] = True
await maybe_trigger_llm()
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(
task: PipelineTask,
frame: Frame,
_frame: Frame,
):
logger.debug(f"In on_pipeline_finished callback handler")
@ -207,14 +217,13 @@ def register_task_event_handler(
if workflow_run and workflow_run.campaign_id:
await campaign_call_dispatcher.release_call_slot(workflow_run_id)
# Write buffers to temp files and enqueue S3 upload
# Write buffers to temp files and enqueue combined processing task
audio_temp_path = None
transcript_temp_path = None
try:
# Only upload if buffers have content
if not in_memory_audio_buffer.is_empty:
audio_temp_path = await in_memory_audio_buffer.write_to_temp_file()
await enqueue_job(
FunctionNames.UPLOAD_AUDIO_TO_S3, workflow_run_id, audio_temp_path
)
else:
logger.debug("Audio buffer is empty, skipping upload")
@ -222,11 +231,6 @@ def register_task_event_handler(
transcript_temp_path = (
await in_memory_transcript_buffer.write_to_temp_file()
)
await enqueue_job(
FunctionNames.UPLOAD_TRANSCRIPT_TO_S3,
workflow_run_id,
transcript_temp_path,
)
else:
logger.debug("Transcript buffer is empty, skipping upload")
@ -234,10 +238,18 @@ def register_task_event_handler(
logger.error(f"Error preparing buffers for S3 upload: {e}", exc_info=True)
await enqueue_job(FunctionNames.CALCULATE_WORKFLOW_RUN_COST, workflow_run_id)
# Combined task: uploads artifacts then runs integrations sequentially
await enqueue_job(
FunctionNames.RUN_INTEGRATIONS_POST_WORKFLOW_RUN, workflow_run_id
FunctionNames.PROCESS_WORKFLOW_COMPLETION,
workflow_run_id,
audio_temp_path,
transcript_temp_path,
)
# Return the buffers so they can be passed to other handlers
return in_memory_audio_buffer, in_memory_transcript_buffer
def register_audio_data_handler(
audio_buffer: AudioBufferProcessor,
@ -260,18 +272,26 @@ def register_audio_data_handler(
# Could implement overflow to disk here if needed
def register_transcript_handler(
transcript, workflow_run_id, in_memory_buffer: InMemoryTranscriptBuffer
def register_transcript_handlers(
user_aggregator,
assistant_aggregator,
workflow_run_id,
in_memory_buffer: InMemoryTranscriptBuffer,
):
"""Register event handler for transcript updates"""
"""Register event handlers for transcript updates on context aggregators.
@transcript.event_handler("on_transcript_update")
async def on_transcript_update(processor, frame):
transcript_text = ""
for msg in frame.messages:
timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
line = f"{timestamp}{msg.role}: {msg.content}\n"
transcript_text += line
Uses the on_user_turn_stopped and on_assistant_turn_stopped events to capture
transcripts as turns complete, following the event-based pattern.
"""
# Use in-memory buffer
await in_memory_buffer.append(transcript_text)
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}\n"
await in_memory_buffer.append(line)
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}\n"
await in_memory_buffer.append(line)

View file

@ -1,5 +1,4 @@
import os
from typing import TYPE_CHECKING
from loguru import logger
@ -11,14 +10,10 @@ from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.utils.context import turn_var
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine import PipecatEngine
def create_pipeline_components(audio_config: AudioConfig, engine: "PipecatEngine"):
def create_pipeline_components(audio_config: AudioConfig):
"""Create and return the main pipeline components with proper audio configuration"""
logger.info(f"Creating pipeline components with audio config: {audio_config}")
@ -28,28 +23,21 @@ def create_pipeline_components(audio_config: AudioConfig, engine: "PipecatEngine
buffer_size=audio_config.buffer_size_bytes,
)
transcript = TranscriptProcessor(
assistant_correct_aggregation_callback=engine.create_aggregation_correction_callback()
)
context = LLMContext()
return audio_buffer, transcript, context
return audio_buffer, context
def build_pipeline(
transport,
stt,
transcript,
audio_buffer,
llm,
tts,
user_context_aggregator,
assistant_context_aggregator,
pipeline_engine_callback_processor,
stt_mute_filter,
pipeline_metrics_aggregator,
user_idle_disconnect,
voicemail_detector=None,
):
"""Build the main pipeline with all components.
@ -63,7 +51,7 @@ def build_pipeline(
# Build processors list with optional voicemail detection
processors = [
transport.input(), # Transport user input
stt, # STT (audio_passthrough=True by default, passes InputAudioRawFrame)
stt,
]
# Insert voicemail detector after STT if enabled
@ -76,16 +64,12 @@ def build_pipeline(
# Continue with the rest of the pipeline
processors.extend(
[
stt_mute_filter, # STTMuteFilters don't let VAD related events pass through if muted
user_idle_disconnect,
transcript.user(),
user_context_aggregator,
llm, # LLM
pipeline_engine_callback_processor,
tts, # TTS
transport.output(), # Transport bot output
audio_buffer, # AudioBufferProcessor - records both input and output audio
transcript.assistant(),
assistant_context_aggregator, # Assistant spoken responses
pipeline_metrics_aggregator,
]
@ -98,7 +82,6 @@ def create_pipeline_task(pipeline, workflow_run_id, audio_config: AudioConfig =
"""Create a pipeline task with appropriate parameters"""
# Set up pipeline params with audio configuration if provided
pipeline_params = PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
send_initial_empty_metrics=False,
@ -119,6 +102,7 @@ def create_pipeline_task(pipeline, workflow_run_id, audio_config: AudioConfig =
pipeline,
params=pipeline_params,
enable_tracing=ENABLE_TRACING,
enable_rtvi=False,
conversation_id=f"{workflow_run_id}",
)

View file

@ -7,12 +7,12 @@ from loguru import logger
from api.db import db_client
from api.db.models import WorkflowModel
from api.enums import WorkflowRunMode
from api.services.configuration.registry import ServiceProviders
from api.services.pipecat.audio_config import AudioConfig, create_audio_config
from api.services.pipecat.event_handlers import (
register_audio_data_handler,
register_task_event_handler,
register_transcript_handler,
register_transport_event_handlers,
register_event_handlers,
register_transcript_handlers,
)
from api.services.pipecat.in_memory_buffers import InMemoryLogsBuffer
from api.services.pipecat.pipeline_builder import (
@ -46,20 +46,25 @@ from api.services.workflow.pipecat_engine import PipecatEngine
from api.services.workflow.workflow import WorkflowGraph
from pipecat.extensions.voicemail.voicemail_detector import VoicemailDetector
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.processors.aggregators.llm_response import (
from pipecat.processors.aggregators.llm_response_universal import (
LLMAssistantAggregatorParams,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
)
from pipecat.processors.filters.stt_mute_filter import (
STTMuteConfig,
STTMuteFilter,
STTMuteStrategy,
)
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.turns.user_mute import MuteUntilFirstBotCompleteUserMuteStrategy
from pipecat.turns.user_start import (
ExternalUserTurnStartStrategy,
TranscriptionUserTurnStartStrategy,
)
from pipecat.turns.user_start.vad_user_turn_start_strategy import (
VADUserTurnStartStrategy,
)
from pipecat.turns.user_stop import (
ExternalUserTurnStopStrategy,
TranscriptionUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.context import set_current_run_id
from pipecat.utils.enums import EndTaskReason
from pipecat.utils.tracing.context_registry import ContextProviderRegistry
@ -517,12 +522,11 @@ async def _run_pipeline(
embeddings_model=embeddings_model,
)
# Create pipeline components with audio configuration and engine
audio_buffer, transcript, context = create_pipeline_components(audio_config, engine)
# Create pipeline components with audio configuration
audio_buffer, context = create_pipeline_components(audio_config)
# Set the context and audio_buffer after creation
engine.set_context(context)
engine.set_audio_buffer(audio_buffer)
# Set Stasis connection for immediate transfers (if available)
if stasis_connection:
@ -532,7 +536,31 @@ async def _run_pipeline(
expect_stripped_words=True,
correct_aggregation_callback=engine.create_aggregation_correction_callback(),
)
user_params = LLMUserAggregatorParams(enable_emulated_vad_interruptions=True)
# Configure turn strategies based on STT provider and model
# Deepgram Flux uses external turn detection (VAD + External start/stop)
# Other models use transcription-based turn detection with smart turn analyzer
is_deepgram_flux = (
user_config.stt.provider == ServiceProviders.DEEPGRAM.value
and user_config.stt.model == "flux-general-en"
)
if is_deepgram_flux:
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), ExternalUserTurnStartStrategy()],
stop=[ExternalUserTurnStopStrategy()],
)
else:
user_turn_strategies = UserTurnStrategies(
start=[VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()],
stop=[TranscriptionUserTurnStopStrategy()],
)
user_params = LLMUserAggregatorParams(
user_turn_strategies=user_turn_strategies,
user_mute_strategies=[MuteUntilFirstBotCompleteUserMuteStrategy()],
user_idle_timeout=max_user_idle_timeout,
)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params, user_params=user_params
)
@ -547,25 +575,20 @@ async def _run_pipeline(
pipeline_metrics_aggregator = PipelineMetricsAggregator()
# Create STT mute filter using the selected strategies and the engine's callback
stt_mute_filter = STTMuteFilter(
config=STTMuteConfig(
strategies={
STTMuteStrategy.MUTE_UNTIL_FIRST_BOT_COMPLETE,
STTMuteStrategy.CUSTOM,
},
should_mute_callback=engine.create_should_mute_callback(),
)
)
# Use engine's user idle callback with configured timeout
user_idle_disconnect = UserIdleProcessor(
callback=engine.create_user_idle_callback(), timeout=max_user_idle_timeout
)
user_context_aggregator = context_aggregator.user()
assistant_context_aggregator = context_aggregator.assistant()
# Register user idle event handlers
user_idle_handler = engine.create_user_idle_handler()
@user_context_aggregator.event_handler("on_user_turn_idle")
async def on_user_turn_idle(aggregator):
await user_idle_handler.handle_idle(aggregator)
@user_context_aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(aggregator, strategy):
user_idle_handler.reset()
# Create voicemail detector if enabled in the workflow's start node
voicemail_detector = None
start_node = workflow_graph.nodes.get(workflow_graph.start_node_id)
@ -592,16 +615,13 @@ async def _run_pipeline(
pipeline = build_pipeline(
transport,
stt,
transcript,
audio_buffer,
llm,
tts,
user_context_aggregator,
assistant_context_aggregator,
pipeline_engine_callback_processor,
stt_mute_filter,
pipeline_metrics_aggregator,
user_idle_disconnect,
voicemail_detector=voicemail_detector,
)
@ -614,18 +634,6 @@ async def _run_pipeline(
# Initialize the engine to set the initial context
await engine.initialize()
# Register event handlers
in_memory_audio_buffer, in_memory_transcript_buffer = (
register_transport_event_handlers(
task,
transport,
workflow_run_id,
engine=engine,
audio_buffer=audio_buffer,
audio_config=audio_config,
)
)
# Add real-time feedback observer if WebSocket sender is available
# Note: ws_sender was already fetched earlier for node_transition_callback
if ws_sender:
@ -635,21 +643,24 @@ async def _run_pipeline(
)
task.add_observer(feedback_observer)
register_task_event_handler(
workflow_run_id,
engine,
# Register event handlers
in_memory_audio_buffer, in_memory_transcript_buffer = register_event_handlers(
task,
transport,
audio_buffer,
in_memory_audio_buffer,
in_memory_transcript_buffer,
in_memory_logs_buffer,
pipeline_metrics_aggregator,
workflow_run_id,
engine=engine,
audio_buffer=audio_buffer,
in_memory_logs_buffer=in_memory_logs_buffer,
pipeline_metrics_aggregator=pipeline_metrics_aggregator,
audio_config=audio_config,
)
register_audio_data_handler(audio_buffer, workflow_run_id, in_memory_audio_buffer)
register_transcript_handler(
transcript, workflow_run_id, in_memory_transcript_buffer
register_transcript_handlers(
user_context_aggregator,
assistant_context_aggregator,
workflow_run_id,
in_memory_transcript_buffer,
)
try:

View file

@ -7,6 +7,7 @@ from api.constants import MPS_API_URL
from api.services.configuration.registry import ServiceProviders
from pipecat.services.azure.llm import AzureLLMService
from pipecat.services.cartesia.stt import CartesiaSTTService
from pipecat.services.deepgram.flux.stt import DeepgramFluxSTTService
from pipecat.services.deepgram.stt import DeepgramSTTService, LiveOptions
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.dograh.llm import DograhLLMService
@ -34,6 +35,20 @@ def create_stt_service(user_config):
f"Creating STT service: provider={user_config.stt.provider}, model={user_config.stt.model}"
)
if user_config.stt.provider == ServiceProviders.DEEPGRAM.value:
# Check if using Flux model (English-only, no language selection)
if user_config.stt.model == "flux-general-en":
logger.debug("Using DeepGram Flux Model")
return DeepgramFluxSTTService(
api_key=user_config.stt.api_key,
model=user_config.stt.model,
params=DeepgramFluxSTTService.InputParams(
eot_timeout_ms=3000,
eot_threshold=0.7,
),
should_interrupt=False, # Let UserAggregator take care of sending InterruptionFrame
)
# Other models than flux
# Use language from user config, defaulting to "multi" for multilingual support
language = getattr(user_config.stt, "language", None) or "multi"
live_options = LiveOptions(
@ -44,7 +59,9 @@ def create_stt_service(user_config):
)
logger.debug(f"Using DeepGram Model - {user_config.stt.model}")
return DeepgramSTTService(
live_options=live_options, api_key=user_config.stt.api_key
live_options=live_options,
api_key=user_config.stt.api_key,
should_interrupt=False, # Let UserAggregator take care of sending InterruptionFrame
)
elif user_config.stt.provider == ServiceProviders.OPENAI.value:
return OpenAISTTService(

View file

@ -2,10 +2,9 @@ import os
from fastapi import WebSocket
from api.constants import APP_ROOT_DIR, ENABLE_RNNOISE, ENABLE_SMART_TURN
from api.constants import APP_ROOT_DIR
from api.db import db_client
from api.enums import OrganizationConfigurationKey
from api.services.looptalk.internal_transport import InternalTransport
from api.services.pipecat.audio_config import AudioConfig
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from api.services.telephony.stasis_rtp_serializer import StasisRTPFrameSerializer
@ -13,11 +12,8 @@ from api.services.telephony.stasis_rtp_transport import (
StasisRTPTransport,
StasisRTPTransportParams,
)
from pipecat.audio.filters.rnnoise_filter import RNNoiseFilter
from pipecat.audio.mixers.silence_mixer import SilenceAudioMixer
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer, VADParams
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.serializers.vobiz import VobizFrameSerializer
@ -35,19 +31,6 @@ librnnoise_path = os.path.normpath(
)
def create_turn_analyzer(workflow_run_id: int, audio_config: AudioConfig):
"""Create a turn analyzer backed by the local Smart Turn HTTP service.
Args:
workflow_run_id: ID of the workflow run for turn analyzer context
audio_config: Audio configuration containing pipeline sample rate
"""
if ENABLE_SMART_TURN:
return LocalSmartTurnAnalyzerV3(params=SmartTurnParams())
return None
async def create_twilio_transport(
websocket_client: WebSocket,
stream_sid: str,
@ -78,8 +61,6 @@ async def create_twilio_transport(
f"Incomplete Twilio configuration for organization {organization_id}"
)
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
@ -119,11 +100,7 @@ async def create_twilio_transport(
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
serializer=serializer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
)
@ -158,8 +135,6 @@ async def create_cloudonix_transport(
f"Required: bearer_token, domain_id"
)
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
from pipecat.serializers.cloudonix import CloudonixFrameSerializer
serializer = CloudonixFrameSerializer(
@ -202,11 +177,7 @@ async def create_cloudonix_transport(
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
serializer=serializer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
)
@ -238,8 +209,6 @@ async def create_vonage_transport(
f"Incomplete Vonage configuration for organization {organization_id}"
)
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
serializer = VonageFrameSerializer(
call_uuid=call_uuid,
application_id=application_id,
@ -283,11 +252,7 @@ async def create_vonage_transport(
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
serializer=serializer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
)
@ -337,8 +302,6 @@ async def create_vobiz_transport(
f"from_numbers={len(config.get('from_numbers', []))} numbers"
)
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
# Use VobizFrameSerializer for Vobiz WebSocket protocol
serializer = VobizFrameSerializer(
stream_id=stream_id,
@ -389,11 +352,7 @@ async def create_vobiz_transport(
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
serializer=serializer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
)
@ -411,7 +370,6 @@ def create_webrtc_transport(
ambient_noise_config: dict | None = None,
):
"""Create a transport for WebRTC connections"""
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
return SmallWebRTCTransport(
webrtc_connection=webrtc_connection,
@ -445,10 +403,6 @@ def create_webrtc_transport(
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
)
@ -461,7 +415,6 @@ def create_stasis_transport(
ambient_noise_config: dict | None = None,
):
"""Create a transport for ARI connections"""
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
serializer = StasisRTPFrameSerializer(
StasisRTPFrameSerializer.InputParams(
@ -502,11 +455,7 @@ def create_stasis_transport(
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
serializer=serializer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
)
@ -528,46 +477,44 @@ def create_internal_transport(
Returns:
InternalTransport instance configured with turn analyzer
"""
turn_analyzer = create_turn_analyzer(workflow_run_id, audio_config)
pass
# Commented out because looptalk coming in the regular import flow
# was causing issue. May be move this to looptalk/orchestrator.py
# Create and return the internal transport with latency
return InternalTransport(
params=TransportParams(
audio_out_enabled=True,
audio_out_sample_rate=audio_config.transport_out_sample_rate,
audio_out_channels=1,
audio_in_enabled=True,
audio_in_sample_rate=audio_config.transport_in_sample_rate,
audio_in_channels=1,
vad_analyzer=(
SileroVADAnalyzer(
params=VADParams(
confidence=vad_config.get("confidence", 0.7),
start_secs=vad_config.get("start_seconds", 0.4),
stop_secs=vad_config.get("stop_seconds", 0.8),
min_volume=vad_config.get("minimum_volume", 0.6),
)
)
if vad_config
else SileroVADAnalyzer()
),
audio_out_mixer=(
SoundfileMixer(
sound_files={
"office": APP_ROOT_DIR
/ "assets"
/ f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
},
default_sound="office",
volume=ambient_noise_config.get("volume", 0.3),
)
if ambient_noise_config and ambient_noise_config.get("enabled", False)
else SilenceAudioMixer()
),
turn_analyzer=turn_analyzer,
audio_in_filter=RNNoiseFilter(library_path=librnnoise_path)
if ENABLE_RNNOISE
else None,
),
latency_seconds=latency_seconds,
)
# return InternalTransport(
# params=TransportParams(
# audio_out_enabled=True,
# audio_out_sample_rate=audio_config.transport_out_sample_rate,
# audio_out_channels=1,
# audio_in_enabled=True,
# audio_in_sample_rate=audio_config.transport_in_sample_rate,
# audio_in_channels=1,
# vad_analyzer=(
# SileroVADAnalyzer(
# params=VADParams(
# confidence=vad_config.get("confidence", 0.7),
# start_secs=vad_config.get("start_seconds", 0.4),
# stop_secs=vad_config.get("stop_seconds", 0.8),
# min_volume=vad_config.get("minimum_volume", 0.6),
# )
# )
# if vad_config
# else SileroVADAnalyzer()
# ),
# audio_out_mixer=(
# SoundfileMixer(
# sound_files={
# "office": APP_ROOT_DIR
# / "assets"
# / f"office-ambience-{audio_config.transport_out_sample_rate}-mono.wav"
# },
# default_sound="office",
# volume=ambient_noise_config.get("volume", 0.3),
# )
# if ambient_noise_config and ambient_noise_config.get("enabled", False)
# else SilenceAudioMixer()
# ),
# ),
# latency_seconds=latency_seconds,
# )

View file

@ -15,6 +15,8 @@ The serializer:
from typing import Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import create_default_resampler, pcm_to_ulaw, ulaw_to_pcm
from pipecat.frames.frames import (
AudioRawFrame,
@ -22,8 +24,7 @@ from pipecat.frames.frames import (
InputAudioRawFrame,
StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
from pydantic import BaseModel
from pipecat.serializers.base_serializer import FrameSerializer
class StasisRTPFrameSerializer(FrameSerializer):
@ -59,11 +60,6 @@ class StasisRTPFrameSerializer(FrameSerializer):
# Resampler shared between encode / decode paths
self._resampler = create_default_resampler()
@property
def type(self) -> FrameSerializerType:
"""Stasis uses raw bytes → BINARY."""
return FrameSerializerType.BINARY
async def setup(self, frame: StartFrame):
"""Remember pipeline configuration."""
self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate

View file

@ -19,7 +19,6 @@ from pipecat.utils.enums import EndTaskReason
if TYPE_CHECKING:
from api.services.telephony.stasis_rtp_connection import StasisRTPConnection
from pipecat.processors.audio.audio_buffer_processor import AudioBuffer
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.openai.llm import OpenAILLMService
@ -64,7 +63,6 @@ class PipecatEngine:
transport: Optional[BaseTransport] = None,
workflow: WorkflowGraph,
call_context_vars: dict,
audio_buffer: Optional["AudioBuffer"] = None,
workflow_run_id: Optional[int] = None,
node_transition_callback: Optional[
Callable[[str, Optional[str]], Awaitable[None]]
@ -78,7 +76,6 @@ class PipecatEngine:
self.transport = transport
self.workflow = workflow
self._call_context_vars = call_context_vars
self._audio_buffer = audio_buffer
self._workflow_run_id = workflow_run_id
self._node_transition_callback = node_transition_callback
self._initialized = False
@ -204,6 +201,7 @@ class PipecatEngine:
logger.info(f"Arguments: {function_call_params.arguments}")
await self.set_node(transition_to_node)
try:
async def on_context_updated() -> None:
"""
pipecat framework will run this function after the function call result has been updated in the context.
@ -215,6 +213,12 @@ class PipecatEngine:
self._current_node
)
# Queue EndFrame if we just transitioned to EndNode
if self._current_node.is_end:
await self.send_end_task_frame(
EndTaskReason.USER_QUALIFIED.value
)
result = {"status": "done"}
properties = FunctionCallResultProperties(
@ -478,8 +482,6 @@ class PipecatEngine:
if node.extraction_enabled and node.extraction_variables:
await self._perform_variable_extraction_if_needed(node)
await self.send_end_task_frame(EndTaskReason.USER_QUALIFIED.value)
async def _handle_agent_node(self, node: Node) -> None:
"""Handle agent node execution."""
if node.is_static:
@ -680,12 +682,12 @@ class PipecatEngine:
"""
return engine_callbacks.create_should_mute_callback(self)
def create_user_idle_callback(self):
def create_user_idle_handler(self):
"""
This callback is called when the user is idle for a certain duration.
We use this to either play the static text or end the call
Returns a UserIdleHandler that manages user-idle timeouts with state.
The handler tracks retry count and handles escalating prompts.
"""
return engine_callbacks.create_user_idle_callback(self)
return engine_callbacks.create_user_idle_handler(self)
def create_max_duration_callback(self):
"""
@ -721,14 +723,6 @@ class PipecatEngine:
"""
self.task = task
def set_audio_buffer(self, audio_buffer: "AudioBuffer") -> None:
"""Set the audio buffer.
This allows setting the audio buffer after the engine has been created,
which is useful when the audio buffer needs to be created after the engine.
"""
self._audio_buffer = audio_buffer
def set_stasis_connection(
self, connection: Optional["StasisRTPConnection"]
) -> None:

View file

@ -23,7 +23,6 @@ from pipecat.utils.enums import EndTaskReason
if TYPE_CHECKING:
from api.services.workflow.pipecat_engine import PipecatEngine
from pipecat.processors.user_idle_processor import UserIdleProcessor
# ---------------------------------------------------------------------------
@ -57,33 +56,43 @@ def create_should_mute_callback(
# ---------------------------------------------------------------------------
def create_user_idle_callback(engine: "PipecatEngine"):
"""Return a callback that handles user-idle timeouts."""
class UserIdleHandler:
"""Helper class to manage user idle retry logic with state."""
async def handle_user_idle(
user_idle: "UserIdleProcessor", retry_count: int
) -> bool:
logger.debug(f"Handling user_idle, attempt: {retry_count}")
def __init__(self, engine: "PipecatEngine"):
self._engine = engine
self._retry_count = 0
if retry_count == 1:
def reset(self):
"""Reset the retry count when user becomes active."""
self._retry_count = 0
async def handle_idle(self, aggregator):
"""Handle user idle event with escalating prompts."""
self._retry_count += 1
logger.debug(f"Handling user_idle, attempt: {self._retry_count}")
if self._retry_count == 1:
message = {
"role": "system",
"content": "The user has been quiet. Politely and briefly ask if they're still there in the language that the user has been speaking so far.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return True
await aggregator.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
return
message = {
"role": "system",
"content": "The user has been quiet. We will be disconnecting the call now. Wish them a good day in the language that the user has been speaking so far.",
}
await user_idle.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
await engine.send_end_task_frame(
await aggregator.push_frame(LLMMessagesAppendFrame([message], run_llm=True))
await self._engine.send_end_task_frame(
EndTaskReason.USER_IDLE_MAX_DURATION_EXCEEDED.value
)
return False
return handle_user_idle
def create_user_idle_handler(engine: "PipecatEngine") -> UserIdleHandler:
"""Return a UserIdleHandler that manages user-idle timeouts with state."""
return UserIdleHandler(engine)
# ---------------------------------------------------------------------------

View file

@ -49,8 +49,7 @@ from api.tasks.campaign_tasks import (
from api.tasks.knowledge_base_processing import process_knowledge_base_document
from api.tasks.run_integrations import run_integrations_post_workflow_run
from api.tasks.s3_upload import (
upload_audio_to_s3,
upload_transcript_to_s3,
process_workflow_completion,
upload_voicemail_audio_to_s3,
)
@ -59,9 +58,8 @@ class WorkerSettings:
functions = [
calculate_workflow_run_cost,
run_integrations_post_workflow_run,
upload_audio_to_s3,
upload_transcript_to_s3,
upload_voicemail_audio_to_s3,
process_workflow_completion,
sync_campaign_source,
process_campaign_batch,
monitor_campaign_progress,

View file

@ -1,8 +1,7 @@
class FunctionNames:
CALCULATE_WORKFLOW_RUN_COST = "calculate_workflow_run_cost"
RUN_INTEGRATIONS_POST_WORKFLOW_RUN = "run_integrations_post_workflow_run"
UPLOAD_AUDIO_TO_S3 = "upload_audio_to_s3"
UPLOAD_TRANSCRIPT_TO_S3 = "upload_transcript_to_s3"
PROCESS_WORKFLOW_COMPLETION = "process_workflow_completion"
UPLOAD_VOICEMAIL_AUDIO_TO_S3 = "upload_voicemail_audio_to_s3"
SYNC_CAMPAIGN_SOURCE = "sync_campaign_source"
PROCESS_CAMPAIGN_BATCH = "process_campaign_batch"

View file

@ -1,10 +1,11 @@
"""Execute webhook integrations after workflow run completion."""
from typing import Any, Dict
from typing import Any, Dict, Optional
import httpx
from loguru import logger
from api.constants import BACKEND_API_ENDPOINT
from api.db import db_client
from api.db.models import WorkflowRunModel
from api.utils.credential_auth import build_auth_header
@ -54,10 +55,13 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
logger.info(f"Found {len(webhook_nodes)} webhook nodes to execute")
# Step 4: Build render context
render_context = _build_render_context(workflow_run)
# Step 4: Generate public access token (on-demand, only when webhooks exist)
public_token = await db_client.ensure_public_access_token(workflow_run_id)
# Step 5: Execute each webhook node
# Step 5: Build render context
render_context = _build_render_context(workflow_run, public_token)
# Step 6: Execute each webhook node
for node in webhook_nodes:
webhook_data = node.get("data", {})
try:
@ -77,9 +81,19 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
raise
def _build_render_context(workflow_run: WorkflowRunModel) -> Dict[str, Any]:
"""Build the context dict for template rendering."""
return {
def _build_render_context(
workflow_run: WorkflowRunModel, public_token: Optional[str] = None
) -> Dict[str, Any]:
"""Build the context dict for template rendering.
Args:
workflow_run: The workflow run model
public_token: Optional public access token for download URLs
Returns:
Dict containing all fields available for template rendering
"""
context = {
# Top-level fields
"workflow_run_id": workflow_run.id,
"workflow_run_name": workflow_run.name,
@ -89,10 +103,25 @@ def _build_render_context(workflow_run: WorkflowRunModel) -> Dict[str, Any]:
"initial_context": workflow_run.initial_context or {},
"gathered_context": workflow_run.gathered_context or {},
"cost_info": workflow_run.usage_info or {},
"recording_url": getattr(workflow_run, "recording_url", None),
"transcript_url": getattr(workflow_run, "transcript_url", None),
}
# Add public download URLs if token is available
if public_token:
base_url = (
f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow/{public_token}"
)
context["recording_url"] = (
f"{base_url}/recording" if workflow_run.recording_url else None
)
context["transcript_url"] = (
f"{base_url}/transcript" if workflow_run.transcript_url else None
)
else:
context["recording_url"] = workflow_run.recording_url
context["transcript_url"] = workflow_run.transcript_url
return context
async def _execute_webhook_node(
webhook_data: Dict[str, Any],

View file

@ -1,129 +1,27 @@
import os
from typing import Optional
from loguru import logger
from pipecat.utils.context import set_current_run_id
from api.db import db_client
from api.services.storage import get_current_storage_backend, storage_fs
async def upload_audio_to_s3(ctx, workflow_run_id: int, temp_file_path: str):
"""Upload audio file from temp path to S3."""
run_id = str(workflow_run_id)
set_current_run_id(run_id)
logger.info(f"Starting audio upload to S3 from {temp_file_path}")
try:
# Verify temp file exists
if not os.path.exists(temp_file_path):
logger.error(f"Temp audio file not found: {temp_file_path}")
raise FileNotFoundError(f"Temp audio file not found: {temp_file_path}")
file_size = os.path.getsize(temp_file_path)
logger.debug(f"Audio file size: {file_size} bytes")
recording_url = f"recordings/{workflow_run_id}.wav"
storage_backend = get_current_storage_backend()
logger.info(
f"UPLOAD: Using {storage_backend.name} (value: {storage_backend.value}) for audio upload - workflow_run_id: {workflow_run_id}"
)
await storage_fs.aupload_file(temp_file_path, recording_url)
# Update DB with recording URL and storage backend
await db_client.update_workflow_run(
run_id=workflow_run_id,
recording_url=recording_url,
storage_backend=storage_backend.value,
)
logger.info(
f"Successfully uploaded audio to {storage_backend.name}: {recording_url} (stored backend: {storage_backend.name})"
)
except Exception as e:
logger.error(f"Error uploading audio to S3 for workflow {workflow_run_id}: {e}")
raise
finally:
# Clean up temp file
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
logger.debug(f"Cleaned up temp audio file: {temp_file_path}")
except Exception as e:
logger.warning(
f"Failed to clean up temp audio file {temp_file_path}: {e}"
)
async def upload_transcript_to_s3(ctx, workflow_run_id: int, temp_file_path: str):
"""Upload transcript file from temp path to S3."""
run_id = str(workflow_run_id)
set_current_run_id(run_id)
logger.info(f"Starting transcript upload to S3 from {temp_file_path}")
try:
# Verify temp file exists
if not os.path.exists(temp_file_path):
logger.error(f"Temp transcript file not found: {temp_file_path}")
raise FileNotFoundError(f"Temp transcript file not found: {temp_file_path}")
file_size = os.path.getsize(temp_file_path)
logger.debug(f"Transcript file size: {file_size} bytes")
transcript_url = f"transcripts/{workflow_run_id}.txt"
storage_backend = get_current_storage_backend()
logger.info(
f"UPLOAD: Using {storage_backend.name} (value: {storage_backend.value}) for transcript upload - workflow_run_id: {workflow_run_id}"
)
await storage_fs.aupload_file(temp_file_path, transcript_url)
# Update DB with transcript URL and storage backend
await db_client.update_workflow_run(
run_id=workflow_run_id,
transcript_url=transcript_url,
storage_backend=storage_backend.value,
)
logger.info(
f"Successfully uploaded transcript to {storage_backend.name}: {transcript_url} (stored backend: {storage_backend.name})"
)
except Exception as e:
logger.error(
f"Error uploading transcript to S3 for workflow {workflow_run_id}: {e}"
)
raise
finally:
# Clean up temp file
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
logger.debug(f"Cleaned up temp transcript file: {temp_file_path}")
except Exception as e:
logger.warning(
f"Failed to clean up temp transcript file {temp_file_path}: {e}"
)
from api.tasks.run_integrations import run_integrations_post_workflow_run
from pipecat.utils.context import set_current_run_id
async def upload_voicemail_audio_to_s3(
ctx,
_ctx,
workflow_run_id: int,
temp_file_path: str,
s3_key: str,
):
"""Upload voicemail detection audio from temp file to S3.
This function is similar to upload_audio_to_s3 but handles voicemail-specific
paths and doesn't update the workflow run's recording_url field.
Handles voicemail-specific paths and doesn't update the workflow run's
recording_url field.
Args:
ctx: ARQ context
_ctx: ARQ context (unused)
workflow_run_id: The workflow run ID
temp_file_path: Path to the temporary WAV file
s3_key: The S3 key where the file should be uploaded
@ -161,7 +59,7 @@ async def upload_voicemail_audio_to_s3(
)
raise
finally:
# Clean up temp file (same pattern as upload_audio_to_s3)
# Clean up temp file
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
@ -170,3 +68,104 @@ async def upload_voicemail_audio_to_s3(
logger.warning(
f"Failed to clean up temp voicemail audio file {temp_file_path}: {e}"
)
async def process_workflow_completion(
_ctx,
workflow_run_id: int,
audio_temp_path: Optional[str] = None,
transcript_temp_path: Optional[str] = None,
):
"""Process workflow completion: upload artifacts and run integrations.
This task combines audio upload, transcript upload, and webhook integrations
into a single sequential task to ensure integrations run after uploads complete.
Args:
_ctx: ARQ context (unused)
workflow_run_id: The workflow run ID
audio_temp_path: Optional path to temp audio file
transcript_temp_path: Optional path to temp transcript file
"""
run_id = str(workflow_run_id)
set_current_run_id(run_id)
logger.info(f"Processing workflow completion for run {workflow_run_id}")
storage_backend = get_current_storage_backend()
# Step 1: Upload audio if provided
if audio_temp_path:
try:
if os.path.exists(audio_temp_path):
file_size = os.path.getsize(audio_temp_path)
logger.debug(f"Audio file size: {file_size} bytes")
recording_url = f"recordings/{workflow_run_id}.wav"
logger.info(
f"Uploading audio to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
await storage_fs.aupload_file(audio_temp_path, recording_url)
await db_client.update_workflow_run(
run_id=workflow_run_id,
recording_url=recording_url,
storage_backend=storage_backend.value,
)
logger.info(f"Successfully uploaded audio: {recording_url}")
else:
logger.warning(f"Audio temp file not found: {audio_temp_path}")
except Exception as e:
logger.error(f"Error uploading audio for workflow {workflow_run_id}: {e}")
finally:
if audio_temp_path and os.path.exists(audio_temp_path):
try:
os.remove(audio_temp_path)
logger.debug(f"Cleaned up temp audio file: {audio_temp_path}")
except Exception as e:
logger.warning(f"Failed to clean up temp audio file: {e}")
# Step 2: Upload transcript if provided
if transcript_temp_path:
try:
if os.path.exists(transcript_temp_path):
file_size = os.path.getsize(transcript_temp_path)
logger.debug(f"Transcript file size: {file_size} bytes")
transcript_url = f"transcripts/{workflow_run_id}.txt"
logger.info(
f"Uploading transcript to {storage_backend.name} - workflow_run_id: {workflow_run_id}"
)
await storage_fs.aupload_file(transcript_temp_path, transcript_url)
await db_client.update_workflow_run(
run_id=workflow_run_id,
transcript_url=transcript_url,
storage_backend=storage_backend.value,
)
logger.info(f"Successfully uploaded transcript: {transcript_url}")
else:
logger.warning(
f"Transcript temp file not found: {transcript_temp_path}"
)
except Exception as e:
logger.error(
f"Error uploading transcript for workflow {workflow_run_id}: {e}"
)
finally:
if transcript_temp_path and os.path.exists(transcript_temp_path):
try:
os.remove(transcript_temp_path)
logger.debug(
f"Cleaned up temp transcript file: {transcript_temp_path}"
)
except Exception as e:
logger.warning(f"Failed to clean up temp transcript file: {e}")
# Step 3: Run webhook integrations (after uploads are complete)
try:
await run_integrations_post_workflow_run(_ctx, workflow_run_id)
except Exception as e:
logger.error(f"Error running integrations for workflow {workflow_run_id}: {e}")
logger.info(f"Completed workflow completion processing for run {workflow_run_id}")

View file

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Dict
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from unittest.mock import Mock
import pytest
@ -28,6 +28,87 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
START_CALL_SYSTEM_PROMPT = "start_call_system_prompt"
END_CALL_SYSTEM_PROMPT = "end_call_system_prompt"
# Default workflow definition for mocking database WorkflowModel
DEFAULT_WORKFLOW_DEFINITION = {
"nodes": [
{
"id": "1",
"type": "startCall",
"position": {"x": 0, "y": 0},
"data": {
"name": "Start",
"prompt": START_CALL_SYSTEM_PROMPT,
"is_start": True,
"allow_interrupt": False,
"add_global_prompt": False,
},
},
{
"id": "2",
"type": "endCall",
"position": {"x": 0, "y": 200},
"data": {
"name": "End",
"prompt": END_CALL_SYSTEM_PROMPT,
"is_end": True,
"allow_interrupt": False,
"add_global_prompt": False,
},
},
],
"edges": [
{
"id": "1-2",
"source": "1",
"target": "2",
"data": {"label": "End", "condition": "End the call"},
}
],
}
@dataclass
class MockWorkflowModel:
"""Mock database WorkflowModel for testing.
This mimics the structure of the database WorkflowModel, not the parsed WorkflowGraph.
Use this when mocking db_client.get_workflow() responses.
"""
workflow_id: int = 1
organization_id: int = 1
workflow_configurations: Dict[str, Any] = field(default_factory=dict)
workflow_definition_with_fallback: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
if not self.workflow_definition_with_fallback:
self.workflow_definition_with_fallback = DEFAULT_WORKFLOW_DEFINITION.copy()
@dataclass
class MockWorkflowRun:
"""Mock database WorkflowRun for testing.
Use this when mocking db_client.get_workflow_run() responses.
"""
is_completed: bool = False
initial_context: Dict[str, Any] = field(default_factory=dict)
gathered_context: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MockUserConfig:
"""Mock user configuration for testing.
Use this when mocking db_client.get_user_configurations() responses.
"""
stt: Optional[Any] = None
tts: Optional[Any] = None
llm: Optional[Any] = None
embeddings: Optional[Any] = None
class MockTransportProcessor(FrameProcessor):
"""
@ -41,7 +122,7 @@ class MockTransportProcessor(FrameProcessor):
Args:
emit_bot_speaking: If True, also emits BotSpeakingFrame on TTSAudioRawFrame
which is needed for UserIdleProcessor to start conversation tracking. Default True.
which is needed for user idle tracking to start conversation tracking. Default True.
"""
def __init__(
@ -63,7 +144,7 @@ class MockTransportProcessor(FrameProcessor):
BotStartedSpeakingFrame(), direction=FrameDirection.UPSTREAM
)
elif isinstance(frame, TTSAudioRawFrame):
# Emit BotSpeakingFrame - this is what triggers the UserIdleProcessor
# Emit BotSpeakingFrame - this is what triggers user idle tracking
# to start conversation tracking
if self._emit_bot_speaking:
await self.push_frame(BotSpeakingFrame())
@ -101,6 +182,24 @@ def mock_engine():
return engine
@pytest.fixture
def mock_workflow_model():
"""Create a mock WorkflowModel for testing database responses."""
return MockWorkflowModel()
@pytest.fixture
def mock_workflow_run():
"""Create a mock WorkflowRun for testing database responses."""
return MockWorkflowRun()
@pytest.fixture
def mock_user_config():
"""Create a mock user configuration for testing."""
return MockUserConfig()
@pytest.fixture
def sample_tools():
"""Create sample mock tools for testing."""

View file

@ -42,7 +42,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
)
from pipecat.tests import MockLLMService, MockTTSService
# Define prompts for test nodes
START_NODE_PROMPT = "Start Node System Prompt"
AGENT_NODE_PROMPT = "Agent Node System Prompt"
@ -143,14 +142,20 @@ class ContextCapturingMockLLM(MockLLMService):
msg_copy = dict(msg)
# Copy content to avoid reference issues
if "content" in msg_copy:
msg_copy["content"] = str(msg_copy["content"]) if msg_copy["content"] else None
msg_copy["content"] = (
str(msg_copy["content"]) if msg_copy["content"] else None
)
messages_snapshot.append(msg_copy)
self.captured_contexts.append({
"step": self._current_step,
"messages": messages_snapshot,
"system_prompt": messages_snapshot[0]["content"] if messages_snapshot else None,
})
self.captured_contexts.append(
{
"step": self._current_step,
"messages": messages_snapshot,
"system_prompt": messages_snapshot[0]["content"]
if messages_snapshot
else None,
}
)
# Call parent implementation to stream the mock chunks
return await super()._stream_chat_completions_universal_context(context)
@ -306,14 +311,26 @@ class TestContextUpdateBeforeNextCompletion:
transition completes. The test verifies the context is still correctly updated.
"""
# Step 0 (Start node): call collect_info to transition to agent
step_0_chunks = MockLLMService.create_multiple_function_call_chunks([
{"name": "collect_info", "arguments": {}, "tool_call_id": "call_transition_1"},
])
step_0_chunks = MockLLMService.create_multiple_function_call_chunks(
[
{
"name": "collect_info",
"arguments": {},
"tool_call_id": "call_transition_1",
},
]
)
# Step 1 (Agent node): call end_call to transition to end
step_1_chunks = MockLLMService.create_multiple_function_call_chunks([
{"name": "end_call", "arguments": {}, "tool_call_id": "call_transition_2"},
])
step_1_chunks = MockLLMService.create_multiple_function_call_chunks(
[
{
"name": "end_call",
"arguments": {},
"tool_call_id": "call_transition_2",
},
]
)
# Step 2 (End node): text response (end node has no outgoing edges)
step_2_chunks = MockLLMService.create_text_chunks("Goodbye!")
@ -327,7 +344,7 @@ class TestContextUpdateBeforeNextCompletion:
)
# Should have been called 3 times: start node, agent node, end node
assert llm.get_current_step() == 2, (
assert llm.get_current_step() == 3, (
f"Expected 3 LLM generations (start, agent, end), got {llm.get_current_step()}"
)
@ -376,14 +393,26 @@ class TestContextUpdateBeforeNextCompletion:
is handled correctly.
"""
# Step 0 (Start node): call collect_info to transition to agent
step_0_chunks = MockLLMService.create_multiple_function_call_chunks([
{"name": "collect_info", "arguments": {}, "tool_call_id": "call_transition_1"},
])
step_0_chunks = MockLLMService.create_multiple_function_call_chunks(
[
{
"name": "collect_info",
"arguments": {},
"tool_call_id": "call_transition_1",
},
]
)
# Step 1 (Agent node): call end_call to transition to end
step_1_chunks = MockLLMService.create_multiple_function_call_chunks([
{"name": "end_call", "arguments": {}, "tool_call_id": "call_transition_2"},
])
step_1_chunks = MockLLMService.create_multiple_function_call_chunks(
[
{
"name": "end_call",
"arguments": {},
"tool_call_id": "call_transition_2",
},
]
)
# Step 2 (End node): text response
step_2_chunks = MockLLMService.create_text_chunks("Goodbye!")
@ -397,7 +426,7 @@ class TestContextUpdateBeforeNextCompletion:
)
# Verify all three nodes were executed
assert llm.get_current_step() == 2, (
assert llm.get_current_step() == 3, (
f"Expected 3 steps, got {llm.get_current_step()}"
)
@ -408,8 +437,7 @@ class TestContextUpdateBeforeNextCompletion:
assert AGENT_NODE_PROMPT in llm.get_system_prompt_at_step(1)
# Step 2: End node - should have end prompt
# FIXME - EndFrame is getting processed before LLMContextFrame
# assert END_NODE_PROMPT in llm.get_system_prompt_at_step(2)
assert END_NODE_PROMPT in llm.get_system_prompt_at_step(2)
# Verify each subsequent step has the previous tool results
step_1_ctx = llm.get_context_at_step(1)
@ -423,14 +451,14 @@ class TestContextUpdateBeforeNextCompletion:
assert step_1_has_tool, "Agent node should see collect_info tool result"
# Step 2 should have tool results from both transitions
# FIXME - EndFrame is getting processed before LLMContextFrame
# step_2_tool_messages = [
# msg for msg in step_2_ctx["messages"]
# if msg.get("role") == "tool" or msg.get("tool_call_id")
# ]
# assert len(step_2_tool_messages) >= 2, (
# f"End node should see at least 2 tool results, got {len(step_2_tool_messages)}"
# )
step_2_tool_messages = [
msg
for msg in step_2_ctx["messages"]
if msg.get("role") == "tool" or msg.get("tool_call_id")
]
assert len(step_2_tool_messages) >= 2, (
f"End node should see at least 2 tool results, got {len(step_2_tool_messages)}"
)
@pytest.mark.asyncio
async def test_context_messages_preserve_conversation_history(
@ -444,14 +472,26 @@ class TestContextUpdateBeforeNextCompletion:
- Tool call messages and results
"""
# Step 0 (Start node): call collect_info to transition to agent
step_0_chunks = MockLLMService.create_multiple_function_call_chunks([
{"name": "collect_info", "arguments": {}, "tool_call_id": "call_transition_1"},
])
step_0_chunks = MockLLMService.create_multiple_function_call_chunks(
[
{
"name": "collect_info",
"arguments": {},
"tool_call_id": "call_transition_1",
},
]
)
# Step 1 (Agent node): call end_call to transition to end
step_1_chunks = MockLLMService.create_multiple_function_call_chunks([
{"name": "end_call", "arguments": {}, "tool_call_id": "call_transition_2"},
])
step_1_chunks = MockLLMService.create_multiple_function_call_chunks(
[
{
"name": "end_call",
"arguments": {},
"tool_call_id": "call_transition_2",
},
]
)
# Step 2 (End node): text response
step_2_chunks = MockLLMService.create_text_chunks("Goodbye!")
@ -472,18 +512,15 @@ class TestContextUpdateBeforeNextCompletion:
assert len(ctx_1["messages"]) > len(ctx_0["messages"]), (
"Context at step 1 should have more messages than step 0"
)
# FIXME
# assert len(ctx_2["messages"]) > len(ctx_1["messages"]), (
# "Context at step 2 should have more messages than step 1"
# )
assert len(ctx_2["messages"]) > len(ctx_1["messages"]), (
"Context at step 2 should have more messages than step 1"
)
# Verify assistant messages are accumulated
# FIXME
# assistant_messages_at_step_2 = [
# msg for msg in ctx_2["messages"]
# if msg.get("role") == "assistant"
# ]
# assert len(assistant_messages_at_step_2) >= 2, (
# "Should have at least 2 assistant messages by step 2"
# )
assistant_messages_at_step_2 = [
msg for msg in ctx_2["messages"] if msg.get("role") == "assistant"
]
assert len(assistant_messages_at_step_2) >= 2, (
"Should have at least 2 assistant messages by step 2"
)

View file

@ -0,0 +1,100 @@
import asyncio
import pytest
from loguru import logger
from pipecat.frames.frames import (
EndTaskFrame,
Frame,
InterruptionTaskFrame,
LLMRunFrame,
)
from pipecat.pipeline.base_task import PipelineTaskParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class MockTransport(FrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)
class BusyWaitProcessor(FrameProcessor):
def __init__(self, wait_time=5.0, **kwargs):
super().__init__(**kwargs)
self._wait_time = wait_time
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMRunFrame):
# Simulate a delay, which can happen sometimes due to slow LLM Inferencing or
# other reasons
try:
logger.debug(f"{self} sleeping with frame: {frame}")
await asyncio.sleep(5)
logger.debug(f"{self} woke up with frame: {frame}")
except asyncio.CancelledError:
logger.debug(f"{self} was cancelled")
raise
await self.push_frame(frame, direction)
@pytest.mark.asyncio
async def test_interruption_with_blocked_end_frame():
busy_wait_processor = BusyWaitProcessor(wait_time=5)
transport = MockTransport()
pipeline = Pipeline([transport, busy_wait_processor])
task = PipelineTask(pipeline)
async def run_pipeline():
loop = asyncio.get_running_loop()
params = PipelineTaskParams(loop=loop)
await task.run(params=params)
async def queue_frame():
await task.queue_frames([LLMRunFrame()])
# Send EndTaskFrame to simulate EndFrame
await asyncio.sleep(0.1)
await transport.queue_frame(EndTaskFrame(), direction=FrameDirection.UPSTREAM)
# Simulate an Interruption, which can happen if the user
# has started to speak
await asyncio.sleep(0.1)
await transport.queue_frame(
InterruptionTaskFrame(), direction=FrameDirection.UPSTREAM
)
# Create tasks explicitly for better control
pipeline_task = asyncio.create_task(run_pipeline())
queue_task = asyncio.create_task(queue_frame())
# Wait with timeout
done, pending = await asyncio.wait(
[pipeline_task, queue_task],
timeout=1.0,
return_when=asyncio.ALL_COMPLETED,
)
# If there are pending tasks, we timed out
if pending:
# Cancel all pending tasks
for t in pending:
t.cancel()
# Give limited time for cleanup, then move on regardless
try:
await asyncio.wait_for(
asyncio.gather(*pending, return_exceptions=True),
timeout=1.0,
)
except asyncio.TimeoutError:
pass # Cleanup took too long, continue anyway
pytest.fail("Test timed out after 1 second")

View file

@ -1,10 +1,10 @@
"""
Simulates a user idle condition and tests the behaviour
of the user idle processor.
of the user idle handler.
This module tests the behavior when the user becomes idle during a conversation,
ensuring the UserIdleProcessor properly triggers the callback and the engine
handles it correctly.
ensuring the user_idle_timeout in LLMUserAggregatorParams properly triggers
the on_user_turn_idle event and the engine handles it correctly.
"""
import asyncio
@ -23,8 +23,8 @@ from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response import LLMAssistantAggregatorParams
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.tests import MockLLMService, MockTTSService
@ -32,8 +32,8 @@ async def run_pipeline_with_user_idle(
workflow: WorkflowGraph,
user_idle_timeout: float = 0.2,
mock_steps: list | None = None,
) -> tuple[MockLLMService, LLMContext, UserIdleProcessor]:
"""Run a pipeline with UserIdleProcessor and simulate user idle condition.
) -> tuple[MockLLMService, LLMContext]:
"""Run a pipeline with user_idle_timeout and simulate user idle condition.
Args:
workflow: The workflow graph to use.
@ -42,7 +42,7 @@ async def run_pipeline_with_user_idle(
defaults to a simple greeting followed by text responses.
Returns:
Tuple of (MockLLMService, LLMContext, UserIdleProcessor) for assertions.
Tuple of (MockLLMService, LLMContext) for assertions.
"""
# Create mock responses - bot will speak first, then respond to idle prompts
# Step 1: Initial greeting
@ -64,10 +64,11 @@ async def run_pipeline_with_user_idle(
# Create LLM context
context = LLMContext()
# Create context aggregator with both user and assistant aggregators
# Create context aggregator with user_idle_timeout in user_params
assistant_params = LLMAssistantAggregatorParams(expect_stripped_words=True)
user_params = LLMUserAggregatorParams(user_idle_timeout=user_idle_timeout)
context_aggregator = LLMContextAggregatorPair(
context, assistant_params=assistant_params
context, assistant_params=assistant_params, user_params=user_params
)
user_context_aggregator = context_aggregator.user()
assistant_context_aggregator = context_aggregator.assistant()
@ -81,18 +82,20 @@ async def run_pipeline_with_user_idle(
workflow_run_id=1,
)
# Create UserIdleProcessor with engine's callback and a short timeout
user_idle_processor = UserIdleProcessor(
callback=engine.create_user_idle_callback(),
timeout=user_idle_timeout,
)
# Register user idle event handlers
user_idle_handler = engine.create_user_idle_handler()
# Build the pipeline:
# llm -> mock_transport -> user_idle_processor -> assistant_context_aggregator
# The user_context_aggregator would normally be at the start for user input
@user_context_aggregator.event_handler("on_user_turn_idle")
async def on_user_turn_idle(aggregator):
await user_idle_handler.handle_idle(aggregator)
@user_context_aggregator.event_handler("on_user_turn_started")
async def on_user_turn_started(aggregator, strategy):
user_idle_handler.reset()
# Build the pipeline
pipeline = Pipeline(
[
user_idle_processor,
user_context_aggregator,
llm,
tts,
@ -154,11 +157,11 @@ async def run_pipeline_with_user_idle(
return_exceptions=True,
)
return llm, context, user_idle_processor
return llm, context
class TestUserIdleHandler:
"""Test user idle handling through PipecatEngine and UserIdleProcessor."""
"""Test user idle handling through PipecatEngine and UserIdleHandler."""
@pytest.mark.asyncio
async def test_user_idle_triggers_callback(self, simple_workflow: WorkflowGraph):
@ -167,13 +170,13 @@ class TestUserIdleHandler:
This test verifies that when:
1. The bot starts speaking (triggers conversation tracking)
2. No user input is received for the timeout period
3. The UserIdleProcessor triggers the idle callback
3. The on_user_turn_idle event triggers the idle handler
The engine's user idle callback should:
The engine's user idle handler should:
- First retry: Send a message asking if user is still there
- Second retry: Send goodbye message and end the call
"""
llm, context, user_idle_processor = await run_pipeline_with_user_idle(
llm, context = await run_pipeline_with_user_idle(
workflow=simple_workflow,
user_idle_timeout=0.2, # Short timeout for faster test
)
@ -220,7 +223,7 @@ class TestUserIdleHandler:
MockLLMService.create_text_chunks("Response 3"),
]
llm, context, user_idle_processor = await run_pipeline_with_user_idle(
llm, context = await run_pipeline_with_user_idle(
workflow=three_node_workflow,
user_idle_timeout=0.2,
mock_steps=mock_steps,