feat: allow recordings in tool transitions

This commit is contained in:
Abhishek Kumar 2026-04-10 16:18:01 +05:30
parent 3a272d3a44
commit ffe9a99401
38 changed files with 1555 additions and 692 deletions

View file

@ -0,0 +1,108 @@
"""dedup recordings to org-scoped unique audio
Revision ID: 3cd3155084a2
Revises: e7254d2c6c18
Create Date: 2026-04-10 12:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3cd3155084a2"
down_revision: Union[str, None] = "e7254d2c6c18"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
# 1. Identify duplicate groups: same (org, transcript, tts config).
# Within each group the earliest row (by created_at) is canonical;
# every other row is an alias that will be remapped and soft-deleted.
rows = conn.execute(
sa.text("""
WITH ranked AS (
SELECT
recording_id,
organization_id,
transcript,
tts_provider,
tts_model,
tts_voice_id,
ROW_NUMBER() OVER (
PARTITION BY organization_id, transcript,
tts_provider, tts_model, tts_voice_id
ORDER BY created_at ASC
) AS rn
FROM workflow_recordings
WHERE is_active = true
),
canonical AS (
SELECT recording_id AS canonical_id,
organization_id, transcript,
tts_provider, tts_model, tts_voice_id
FROM ranked
WHERE rn = 1
)
SELECT r.recording_id AS alias_id, c.canonical_id
FROM ranked r
JOIN canonical c
ON r.organization_id = c.organization_id
AND r.transcript = c.transcript
AND r.tts_provider = c.tts_provider
AND r.tts_model = c.tts_model
AND r.tts_voice_id = c.tts_voice_id
WHERE r.rn > 1
""")
).fetchall()
if not rows:
return
# 2. Replace alias recording_ids with canonical ones in workflow JSON.
# Both draft definitions (workflows.workflow_definition) and published
# versions (workflow_definitions.workflow_json) are updated.
for alias_id, canonical_id in rows:
alias_pattern = f"RECORDING_ID: {alias_id}"
canonical_pattern = f"RECORDING_ID: {canonical_id}"
conn.execute(
sa.text("""
UPDATE workflows
SET workflow_definition =
REPLACE(workflow_definition::text, :alias, :canonical)::json
WHERE workflow_definition::text LIKE '%%' || :alias || '%%'
"""),
{"alias": alias_pattern, "canonical": canonical_pattern},
)
conn.execute(
sa.text("""
UPDATE workflow_definitions
SET workflow_json =
REPLACE(workflow_json::text, :alias, :canonical)::json
WHERE workflow_json::text LIKE '%%' || :alias || '%%'
"""),
{"alias": alias_pattern, "canonical": canonical_pattern},
)
# 3. Soft-delete every alias row.
alias_ids = [r[0] for r in rows]
conn.execute(
sa.text("""
UPDATE workflow_recordings
SET is_active = false
WHERE recording_id = ANY(:ids)
AND is_active = true
"""),
{"ids": alias_ids},
)
def downgrade() -> None:
# Deduplication is a one-way data migration. The soft-deleted rows
# still exist in the table; a manual restore is possible if needed.
pass

View file

@ -1,7 +1,7 @@
"""unique recording id per org and workflow
"""make recordings org-scoped instead of workflow-scoped
Revision ID: 67a5cf3e09d0
Revises: e7254d2c6c18
Revises: 3cd3155084a2
Create Date: 2026-04-09 17:03:38.302041
"""
@ -13,13 +13,13 @@ from alembic import op
# revision identifiers, used by Alembic.
revision: str = "67a5cf3e09d0"
down_revision: Union[str, None] = "e7254d2c6c18"
down_revision: Union[str, None] = "3cd3155084a2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Widen column from 16 to 64 chars for descriptive names
# 1. Widen recording_id from 16 to 64 chars for descriptive names
op.alter_column(
"workflow_recordings",
"recording_id",
@ -27,40 +27,79 @@ def upgrade() -> None:
type_=sa.String(length=64),
existing_nullable=False,
)
# Drop the old globally-unique index
op.drop_index(
op.f("ix_workflow_recordings_recording_id"), table_name="workflow_recordings"
# 2. Make workflow_id nullable — recordings are now org-scoped
op.alter_column(
"workflow_recordings",
"workflow_id",
existing_type=sa.Integer(),
nullable=True,
)
# Re-create as non-unique index for lookups
# 3. Drop the old globally-unique index on recording_id
op.drop_index(
"ix_workflow_recordings_recording_id",
table_name="workflow_recordings",
)
# 4. Re-create as non-unique index (for lookups)
op.create_index(
"ix_workflow_recordings_recording_id",
"workflow_recordings",
["recording_id"],
unique=False,
)
# Add composite unique constraint (recording_id, organization_id, workflow_id)
# 5. Add unique constraint (recording_id, organization_id)
op.create_unique_constraint(
"uq_workflow_recordings_recording_id_org_wf",
"uq_workflow_recordings_recording_id_org",
"workflow_recordings",
["recording_id", "organization_id", "workflow_id"],
["recording_id", "organization_id"],
)
# 6. Drop the workflow+TTS scope index (no longer relevant)
op.drop_index(
"ix_workflow_recordings_tts_scope",
table_name="workflow_recordings",
)
def downgrade() -> None:
# Re-create the TTS scope index
op.create_index(
"ix_workflow_recordings_tts_scope",
"workflow_recordings",
["workflow_id", "tts_provider", "tts_model", "tts_voice_id"],
)
# Drop the org-scoped unique constraint
op.drop_constraint(
"uq_workflow_recordings_recording_id_org_wf",
"uq_workflow_recordings_recording_id_org",
"workflow_recordings",
type_="unique",
)
# Drop non-unique index and re-create as unique
op.drop_index(
"ix_workflow_recordings_recording_id", table_name="workflow_recordings"
"ix_workflow_recordings_recording_id",
table_name="workflow_recordings",
)
op.create_index(
op.f("ix_workflow_recordings_recording_id"),
"ix_workflow_recordings_recording_id",
"workflow_recordings",
["recording_id"],
unique=True,
)
# Make workflow_id NOT NULL again
op.alter_column(
"workflow_recordings",
"workflow_id",
existing_type=sa.Integer(),
nullable=False,
)
# Revert recording_id width
op.alter_column(
"workflow_recordings",
"recording_id",

View file

@ -0,0 +1,42 @@
"""make tts columns nullable on workflow_recordings
Revision ID: a1b2c3d4e5f6
Revises: 67a5cf3e09d0
Create Date: 2026-04-10 12:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: str = "67a5cf3e09d0"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.alter_column(
"workflow_recordings", "tts_provider", existing_type=sa.String(), nullable=True
)
op.alter_column(
"workflow_recordings", "tts_model", existing_type=sa.String(), nullable=True
)
op.alter_column(
"workflow_recordings", "tts_voice_id", existing_type=sa.String(), nullable=True
)
def downgrade() -> None:
op.alter_column(
"workflow_recordings", "tts_voice_id", existing_type=sa.String(), nullable=False
)
op.alter_column(
"workflow_recordings", "tts_model", existing_type=sa.String(), nullable=False
)
op.alter_column(
"workflow_recordings", "tts_provider", existing_type=sa.String(), nullable=False
)

View file

@ -1005,7 +1005,7 @@ class KnowledgeBaseDocumentModel(Base):
class WorkflowRecordingModel(Base):
"""Model for storing audio recordings scoped to a workflow and TTS configuration.
"""Model for storing audio recordings scoped to an organization.
Recordings are used in hybrid prompts where parts of the output are pre-recorded
audio rather than dynamically generated TTS.
@ -1020,16 +1020,16 @@ class WorkflowRecordingModel(Base):
# Scoping
workflow_id = Column(
Integer, ForeignKey("workflows.id", ondelete="CASCADE"), nullable=False
Integer, ForeignKey("workflows.id", ondelete="CASCADE"), nullable=True
)
organization_id = Column(
Integer, ForeignKey("organizations.id", ondelete="CASCADE"), nullable=False
)
# TTS configuration scope
tts_provider = Column(String, nullable=False)
tts_model = Column(String, nullable=False)
tts_voice_id = Column(String, nullable=False)
# TTS configuration metadata (optional, legacy)
tts_provider = Column(String, nullable=True)
tts_model = Column(String, nullable=True)
tts_voice_id = Column(String, nullable=True)
# Content
transcript = Column(Text, nullable=False)
@ -1065,19 +1065,11 @@ class WorkflowRecordingModel(Base):
UniqueConstraint(
"recording_id",
"organization_id",
"workflow_id",
name="uq_workflow_recordings_recording_id_org_wf",
name="uq_workflow_recordings_recording_id_org",
),
Index("ix_workflow_recordings_workflow_id", "workflow_id"),
Index("ix_workflow_recordings_org_id", "organization_id"),
Index("ix_workflow_recordings_recording_id", "recording_id"),
Index(
"ix_workflow_recordings_tts_scope",
"workflow_id",
"tts_provider",
"tts_model",
"tts_voice_id",
),
)

View file

@ -5,7 +5,7 @@ import string
from typing import List, Optional
from loguru import logger
from sqlalchemy import func, select
from sqlalchemy import func, select, text
from api.db.base_client import BaseDBClient
from api.db.models import WorkflowRecordingModel
@ -23,30 +23,30 @@ class WorkflowRecordingClient(BaseDBClient):
async def create_recording(
self,
recording_id: str,
workflow_id: int,
organization_id: int,
tts_provider: str,
tts_model: str,
tts_voice_id: str,
transcript: str,
storage_key: str,
storage_backend: str,
created_by: int,
workflow_id: Optional[int] = None,
tts_provider: Optional[str] = None,
tts_model: Optional[str] = None,
tts_voice_id: Optional[str] = None,
metadata: Optional[dict] = None,
) -> WorkflowRecordingModel:
"""Create a new workflow recording record.
Args:
recording_id: Short unique recording identifier
workflow_id: ID of the workflow
organization_id: ID of the organization
tts_provider: TTS provider name
tts_model: TTS model name
tts_voice_id: TTS voice identifier
transcript: User-provided transcript
storage_key: S3/MinIO storage key
storage_backend: Storage backend (s3 or minio)
created_by: ID of the user
workflow_id: Optional workflow ID (legacy)
tts_provider: Optional TTS provider name
tts_model: Optional TTS model name
tts_voice_id: Optional TTS voice identifier
metadata: Optional extra metadata
Returns:
@ -71,10 +71,7 @@ class WorkflowRecordingClient(BaseDBClient):
await session.commit()
await session.refresh(recording)
logger.info(
f"Created recording {recording_id} for workflow {workflow_id}, "
f"org {organization_id}"
)
logger.info(f"Created recording {recording_id} for org {organization_id}")
return recording
async def get_recordings(
@ -85,7 +82,7 @@ class WorkflowRecordingClient(BaseDBClient):
tts_model: Optional[str] = None,
tts_voice_id: Optional[str] = None,
) -> List[WorkflowRecordingModel]:
"""Get recordings for an organization, optionally filtered by workflow and TTS config.
"""Get recordings for an organization, optionally filtered.
Args:
organization_id: ID of the organization
@ -121,14 +118,12 @@ class WorkflowRecordingClient(BaseDBClient):
self,
recording_id: str,
organization_id: int,
workflow_id: int,
) -> Optional[WorkflowRecordingModel]:
"""Get a recording by its string recording_id (unique per org + workflow).
"""Get a recording by its short ID.
Args:
recording_id: The descriptive recording ID
recording_id: The short unique recording ID
organization_id: ID of the organization
workflow_id: ID of the workflow
Returns:
WorkflowRecordingModel if found, None otherwise
@ -137,7 +132,6 @@ class WorkflowRecordingClient(BaseDBClient):
query = select(WorkflowRecordingModel).where(
WorkflowRecordingModel.recording_id == recording_id,
WorkflowRecordingModel.organization_id == organization_id,
WorkflowRecordingModel.workflow_id == workflow_id,
WorkflowRecordingModel.is_active == True,
)
@ -170,13 +164,11 @@ class WorkflowRecordingClient(BaseDBClient):
async def has_active_recordings(
self,
workflow_id: int,
organization_id: int,
) -> bool:
"""Check if a workflow has any active recordings.
"""Check if an organization has any active recordings.
Args:
workflow_id: ID of the workflow
organization_id: ID of the organization
Returns:
@ -187,7 +179,6 @@ class WorkflowRecordingClient(BaseDBClient):
select(func.count())
.select_from(WorkflowRecordingModel)
.where(
WorkflowRecordingModel.workflow_id == workflow_id,
WorkflowRecordingModel.organization_id == organization_id,
WorkflowRecordingModel.is_active == True,
)
@ -196,14 +187,13 @@ class WorkflowRecordingClient(BaseDBClient):
return result.scalar_one() > 0
async def check_recording_id_exists(
self, recording_id: str, organization_id: int, workflow_id: int
self, recording_id: str, organization_id: int
) -> bool:
"""Check if a recording ID already exists within an organization and workflow.
"""Check if a recording ID already exists within an organization.
Args:
recording_id: The recording ID to check
organization_id: ID of the organization
workflow_id: ID of the workflow
Returns:
True if exists, False otherwise
@ -212,7 +202,6 @@ class WorkflowRecordingClient(BaseDBClient):
query = select(WorkflowRecordingModel.id).where(
WorkflowRecordingModel.recording_id == recording_id,
WorkflowRecordingModel.organization_id == organization_id,
WorkflowRecordingModel.workflow_id == workflow_id,
WorkflowRecordingModel.is_active == True,
)
result = await session.execute(query)
@ -257,6 +246,80 @@ class WorkflowRecordingClient(BaseDBClient):
)
return recording
async def replace_recording_id_in_workflows(
self,
old_id: str,
new_id: str,
organization_id: int,
) -> int:
"""Replace all occurrences of a recording ID in workflow definitions.
Updates both draft definitions (workflows.workflow_definition) and
versioned definitions (workflow_definitions.workflow_json), skipping
workflow_definitions with status 'legacy'.
Args:
old_id: The old recording ID to find
new_id: The new recording ID to replace with
organization_id: ID of the organization (scopes to org workflows)
Returns:
Total number of rows updated across both tables
"""
# Match the exact pattern used in prompts: "RECORDING_ID: <id>"
old_pattern = f"RECORDING_ID: {old_id}"
new_pattern = f"RECORDING_ID: {new_id}"
total = 0
async with self.async_session() as session:
# Update workflows.workflow_definition (draft definitions)
result = await session.execute(
text("""
UPDATE workflows
SET workflow_definition =
REPLACE(workflow_definition::text, :old_pat, :new_pat)::json
WHERE organization_id = :org_id
AND workflow_definition::text LIKE '%%' || :old_pat || '%%'
"""),
{
"old_pat": old_pattern,
"new_pat": new_pattern,
"org_id": organization_id,
},
)
total += result.rowcount
# Update workflow_definitions.workflow_json (versioned definitions)
# Skip legacy definitions
result = await session.execute(
text("""
UPDATE workflow_definitions wd
SET workflow_json =
REPLACE(wd.workflow_json::text, :old_pat, :new_pat)::json
FROM workflows w
WHERE wd.workflow_id = w.id
AND w.organization_id = :org_id
AND wd.status != 'legacy'
AND wd.workflow_json::text LIKE '%%' || :old_pat || '%%'
"""),
{
"old_pat": old_pattern,
"new_pat": new_pattern,
"org_id": organization_id,
},
)
total += result.rowcount
await session.commit()
if total > 0:
logger.info(
f"Replaced recording ID '{old_id}' -> '{new_id}' "
f"in {total} workflow definition(s), org {organization_id}"
)
return total
async def delete_recording(
self,
recording_id: str,

View file

@ -44,17 +44,29 @@ class HttpApiConfig(BaseModel):
timeout_ms: Optional[int] = Field(
default=5000, description="Request timeout in milliseconds"
)
customMessage: Optional[str] = Field(
default=None, description="Custom message to play after tool execution"
)
customMessageType: Optional[Literal["text", "audio"]] = Field(
default=None, description="Type of custom message: text or audio"
)
customMessageRecordingId: Optional[str] = Field(
default=None, description="Recording ID for audio custom message"
)
class EndCallConfig(BaseModel):
"""Configuration for End Call tools."""
messageType: Literal["none", "custom"] = Field(
messageType: Literal["none", "custom", "audio"] = Field(
default="none", description="Type of goodbye message"
)
customMessage: Optional[str] = Field(
default=None, description="Custom message to play before ending the call"
)
audioRecordingId: Optional[str] = Field(
default=None, description="Recording ID for audio goodbye message"
)
endCallReason: bool = Field(
default=False,
description="When enabled, LLM must provide a reason for ending the call. "
@ -73,12 +85,15 @@ class TransferCallConfig(BaseModel):
destination: str = Field(
description="Phone number or SIP endpoint to transfer the call to (E.164 format e.g., +1234567890, or SIP endpoint e.g., PJSIP/1234)"
)
messageType: Literal["none", "custom"] = Field(
messageType: Literal["none", "custom", "audio"] = Field(
default="none", description="Type of message to play before transfer"
)
customMessage: Optional[str] = Field(
default=None, description="Custom message to play before transferring the call"
)
audioRecordingId: Optional[str] = Field(
default=None, description="Recording ID for audio message before transfer"
)
timeout: int = Field(
default=30,
ge=5,

View file

@ -26,13 +26,11 @@ from api.services.storage import storage_fs
router = APIRouter(prefix="/workflow-recordings", tags=["workflow-recordings"])
async def _generate_unique_recording_id(organization_id: int, workflow_id: int) -> str:
"""Generate a unique short recording ID within an organization and workflow."""
async def _generate_unique_recording_id(organization_id: int) -> str:
"""Generate a unique short recording ID within an organization."""
for _ in range(10):
rid = generate_short_id(8)
exists = await db_client.check_recording_id_exists(
rid, organization_id, workflow_id
)
exists = await db_client.check_recording_id_exists(rid, organization_id)
if not exists:
return rid
raise HTTPException(
@ -73,12 +71,12 @@ async def get_upload_urls(
items = []
for fd in request.files:
recording_id = await _generate_unique_recording_id(
user.selected_organization_id, request.workflow_id
user.selected_organization_id
)
storage_key = (
f"recordings/{user.selected_organization_id}"
f"/{request.workflow_id}/{recording_id}"
f"/{recording_id}"
f"/{fd.filename}"
)
@ -105,7 +103,7 @@ async def get_upload_urls(
logger.info(
f"Generated {len(items)} recording upload URL(s), "
f"workflow {request.workflow_id}, org {user.selected_organization_id}"
f"org {user.selected_organization_id}"
)
return BatchRecordingUploadResponseSchema(items=items)
@ -136,22 +134,20 @@ async def create_recordings(
for rec_req in request.recordings:
recording = await db_client.create_recording(
recording_id=rec_req.recording_id,
workflow_id=rec_req.workflow_id,
organization_id=user.selected_organization_id,
tts_provider=rec_req.tts_provider,
tts_model=rec_req.tts_model,
tts_voice_id=rec_req.tts_voice_id,
transcript=rec_req.transcript,
storage_key=rec_req.storage_key,
storage_backend=backend.value,
created_by=user.id,
tts_provider=rec_req.tts_provider,
tts_model=rec_req.tts_model,
tts_voice_id=rec_req.tts_voice_id,
metadata=rec_req.metadata,
)
results.append(_build_response(recording))
logger.info(
f"Created {len(results)} recording(s) for "
f"workflow {request.recordings[0].workflow_id}"
f"Created {len(results)} recording(s) for org {user.selected_organization_id}"
)
return BatchRecordingCreateResponseSchema(recordings=results)
@ -185,7 +181,7 @@ async def list_recordings(
] = None,
user=Depends(get_user),
):
"""List recordings for the organization, optionally filtered by workflow and TTS configuration."""
"""List recordings for the organization, optionally filtered."""
try:
recordings = await db_client.get_recordings(
organization_id=user.selected_organization_id,
@ -256,7 +252,6 @@ async def update_recording(
if not new_id:
raise HTTPException(status_code=400, detail="Recording ID cannot be empty")
# Look up by integer PK — globally unique, no ambiguity
existing = await db_client.get_recording_by_id(
id, user.selected_organization_id
)
@ -266,16 +261,17 @@ async def update_recording(
if new_id == existing.recording_id:
return _build_response(existing)
# Check if the new ID is already taken within this org + workflow
exists = await db_client.check_recording_id_exists(
new_id, user.selected_organization_id, existing.workflow_id
new_id, user.selected_organization_id
)
if exists:
raise HTTPException(
status_code=409,
detail=f"Recording ID '{new_id}' is already in use in this workflow",
detail=f"Recording ID '{new_id}' is already in use",
)
old_id = existing.recording_id
recording = await db_client.update_recording_id(
id=id,
new_recording_id=new_id,
@ -285,6 +281,18 @@ async def update_recording(
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
# Replace old recording ID in all non-legacy workflow definitions
updated = await db_client.replace_recording_id_in_workflows(
old_id=old_id,
new_id=new_id,
organization_id=user.selected_organization_id,
)
if updated:
logger.info(
f"Updated {updated} workflow definition(s) with new recording ID "
f"'{old_id}' -> '{new_id}'"
)
return _build_response(recording)
except HTTPException:

View file

@ -32,7 +32,6 @@ class FileDescriptor(BaseModel):
class BatchRecordingUploadRequestSchema(BaseModel):
"""Request schema for getting presigned upload URLs for one or more files."""
workflow_id: int = Field(..., description="Workflow ID these recordings belong to")
files: List[FileDescriptor] = Field(
..., min_length=1, max_length=20, description="List of files to upload"
)
@ -50,10 +49,13 @@ class RecordingCreateRequestSchema(BaseModel):
"""Request schema for creating a recording record after upload."""
recording_id: str = Field(..., description="Short recording ID from upload step")
workflow_id: int = Field(..., description="Workflow ID")
tts_provider: str = Field(..., description="TTS provider (e.g. elevenlabs)")
tts_model: str = Field(..., description="TTS model name")
tts_voice_id: str = Field(..., description="TTS voice identifier")
tts_provider: Optional[str] = Field(
default=None, description="TTS provider (e.g. elevenlabs)"
)
tts_model: Optional[str] = Field(default=None, description="TTS model name")
tts_voice_id: Optional[str] = Field(
default=None, description="TTS voice identifier"
)
transcript: str = Field(
..., description="User-provided transcript of the recording"
)
@ -68,11 +70,11 @@ class RecordingResponseSchema(BaseModel):
id: int
recording_id: str
workflow_id: int
workflow_id: Optional[int] = None
organization_id: int
tts_provider: str
tts_model: str
tts_voice_id: str
tts_provider: Optional[str] = None
tts_model: Optional[str] = None
tts_voice_id: Optional[str] = None
transcript: str
storage_key: str
storage_backend: str
@ -105,7 +107,8 @@ class RecordingUpdateRequestSchema(BaseModel):
...,
min_length=1,
max_length=64,
description="New descriptive recording ID",
pattern=r"^[a-zA-Z0-9_-]+$",
description="New descriptive recording ID (letters, numbers, hyphens, underscores only)",
)

View file

@ -0,0 +1,188 @@
"""Utilities for playing audio through the pipeline transport.
Provides one-shot and looping playback of raw PCM audio. All playback
should be routed through ``transport.output().queue_frame`` so the audio
reaches the caller without passing through STT (which would otherwise
generate phantom transcriptions).
"""
import asyncio
import uuid
from typing import Awaitable, Callable, Dict, Optional, Tuple
import numpy as np
from loguru import logger
from pipecat.frames.frames import (
Frame,
OutputAudioRawFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
TTSTextFrame,
)
try:
import soundfile as sf
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use audio playback, you need to `pip install soundfile`.")
raise Exception(f"Missing module: {e}")
# ---------------------------------------------------------------------------
# Audio file loading / caching
# ---------------------------------------------------------------------------
_audio_cache: Dict[Tuple[str, int], bytes] = {}
def load_audio_file(file_path: str, sample_rate: int) -> Optional[bytes]:
"""Load an audio file as PCM-16 bytes, caching the result.
Args:
file_path: Path to a WAV audio file.
sample_rate: Target sample rate (used as cache key; no resampling
is performed here).
Returns:
Raw PCM-16 bytes, or *None* on failure.
"""
cache_key = (file_path, sample_rate)
if cache_key in _audio_cache:
logger.debug(f"Using cached audio for {file_path} at {sample_rate}Hz")
return _audio_cache[cache_key]
try:
logger.info(f"Loading audio from {file_path} at {sample_rate}Hz")
sound, file_sample_rate = sf.read(file_path, dtype="int16")
logger.info(
f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}"
)
# Ensure mono (take first channel if stereo)
if len(sound.shape) > 1:
sound = sound[:, 0]
if file_sample_rate != sample_rate:
logger.warning(
f"Audio file has sample rate {file_sample_rate}, expected {sample_rate}"
)
audio_bytes = sound.astype(np.int16).tobytes()
_audio_cache[cache_key] = audio_bytes
logger.info(f"Audio loaded: {len(sound)} samples at {sample_rate}Hz")
return audio_bytes
except Exception as e:
logger.error(f"Failed to load audio file {file_path}: {e}")
return None
def clear_audio_cache() -> None:
"""Clear the audio file cache to free memory."""
_audio_cache.clear()
logger.info("Audio cache cleared")
# ---------------------------------------------------------------------------
# Playback helpers
# ---------------------------------------------------------------------------
async def play_audio(
audio_data: bytes,
*,
sample_rate: int,
queue_frame: Callable[[Frame], Awaitable[None]],
transcript: Optional[str] = None,
append_to_context: bool = False,
) -> None:
"""Play raw PCM-16 audio once.
Pushes ``TTSStarted -> TTSAudioRaw -> TTSStopped`` so downstream
processors (audio buffer, context aggregators) handle the audio
correctly.
When *transcript* is provided a ``TTSTextFrame`` is also pushed so
that observers (e.g. ``RealtimeFeedbackObserver``) can relay the
spoken text to the UI.
Args:
audio_data: Raw 16-bit mono PCM bytes.
sample_rate: Pipeline sample rate (e.g. 16000).
queue_frame: Frame sink -- typically ``transport.output().queue_frame``.
transcript: Optional transcript of the recording.
append_to_context: Whether the transcript should be appended to
the LLM assistant context. Defaults to False.
"""
context_id = str(uuid.uuid4())
await queue_frame(TTSStartedFrame(context_id=context_id))
if transcript:
tts_text = TTSTextFrame(
text=transcript, aggregated_by="recording", context_id=context_id
)
tts_text.append_to_context = append_to_context
await queue_frame(tts_text)
await queue_frame(
TTSAudioRawFrame(
audio=audio_data,
sample_rate=sample_rate,
num_channels=1,
context_id=context_id,
)
)
await queue_frame(TTSStoppedFrame(context_id=context_id))
async def play_audio_loop(
*,
stop_event: asyncio.Event,
sample_rate: int,
queue_frame: Callable[[Frame], Awaitable[None]],
audio_file: Optional[str] = None,
) -> None:
"""Play audio in a loop until *stop_event* is set.
Used for hold music during call transfers and ringers during
pre-call data fetches.
Args:
stop_event: Set this event to terminate the loop.
sample_rate: Target sample rate for audio playback.
queue_frame: Frame sink -- typically ``transport.output().queue_frame``.
audio_file: Path to a WAV file. When *None* the default
``transfer_hold_ring_{sample_rate}.wav`` asset is used.
"""
if audio_file is None:
from api.constants import APP_ROOT_DIR
audio_file = str(
APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav"
)
audio_data = load_audio_file(audio_file, sample_rate)
if not audio_data:
logger.warning(f"Audio loop: failed to load {audio_file}, skipping")
return
num_samples = len(audio_data) // 2 # 16-bit PCM = 2 bytes per sample
duration = num_samples / sample_rate
logger.debug(f"Audio loop: playing at {sample_rate}Hz")
try:
while not stop_event.is_set():
frame = OutputAudioRawFrame(
audio=audio_data,
sample_rate=sample_rate,
num_channels=1,
)
await queue_frame(frame)
try:
await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5)
break
except asyncio.TimeoutError:
pass
except Exception as e:
logger.error(f"Audio loop error: {e}")
logger.debug("Audio loop: stopped")

View file

@ -6,17 +6,16 @@ from api.db import db_client
from api.enums import WorkflowRunState
from api.services.campaign.circuit_breaker import circuit_breaker
from api.services.pipecat.audio_config import AudioConfig
from api.services.pipecat.audio_playback import play_audio, play_audio_loop
from api.services.pipecat.in_memory_buffers import (
InMemoryAudioBuffer,
InMemoryLogsBuffer,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
from api.services.pipecat.recording_playback import queue_recording_audio
from api.services.pipecat.tracing_config import get_trace_url
from api.services.workflow.pipecat_engine import PipecatEngine
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
from api.utils.hold_audio import play_hold_audio_loop
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
@ -90,7 +89,11 @@ def register_event_handlers(
stop_ringer = asyncio.Event()
sample_rate = audio_config.pipeline_sample_rate or 16000
ringer_task = asyncio.create_task(
play_hold_audio_loop(task, stop_ringer, sample_rate)
play_audio_loop(
stop_event=stop_ringer,
sample_rate=sample_rate,
queue_frame=transport.output().queue_frame,
)
)
try:
fetch_result = await pre_call_fetch_task
@ -127,12 +130,16 @@ def register_event_handlers(
and fetch_recording_audio
):
logger.debug(f"Playing audio greeting recording: {greeting_value}")
audio_data = await fetch_recording_audio(greeting_value)
if audio_data:
await queue_recording_audio(
audio_data,
result = await fetch_recording_audio(
recording_pk=int(greeting_value)
)
if result:
await play_audio(
result.audio,
sample_rate=audio_config.pipeline_sample_rate or 16000,
queue_frame=task.queue_frame,
queue_frame=transport.output().queue_frame,
transcript=result.transcript,
append_to_context=True,
)
else:
logger.warning(

View file

@ -170,7 +170,10 @@ class RealtimeFeedbackObserver(BaseObserver):
frame_direction = data.direction
# Skip already processed frames (frames can be observed multiple times)
if frame.id in self._frames_seen:
if (
frame.id in self._frames_seen
or frame_direction != FrameDirection.DOWNSTREAM
):
return
self._frames_seen.add(frame.id)

View file

@ -7,7 +7,7 @@ subsequent plays (even from other workers) are instantaneous.
"""
import os
from typing import Awaitable, Callable, Optional
from typing import Awaitable, Callable, NamedTuple, Optional
import numpy as np
from loguru import logger
@ -22,17 +22,23 @@ from .audio_file_cache import (
write_cache_file,
)
class RecordingAudio(NamedTuple):
"""Audio bytes paired with the recording's transcript (when available)."""
audio: bytes
transcript: Optional[str] = None
# ---------------------------------------------------------------------------
# Cache path helper
# ---------------------------------------------------------------------------
def _cache_path(
organization_id: int, workflow_id: int, recording_id: str, sample_rate: int
) -> str:
def _cache_path(organization_id: int, recording_id: str, sample_rate: int) -> str:
"""Return the on-disk path for a cached PCM file."""
return os.path.join(
CACHE_DIR, f"{organization_id}_{workflow_id}_{recording_id}_{sample_rate}.pcm"
CACHE_DIR, f"{organization_id}_{recording_id}_{sample_rate}.pcm"
)
@ -43,59 +49,96 @@ def _cache_path(
def create_recording_audio_fetcher(
organization_id: int,
workflow_id: int,
pipeline_sample_rate: int,
) -> Callable[[str], Awaitable[Optional[bytes]]]:
"""Create an async callback that returns raw PCM bytes for a recording_id.
) -> Callable[..., Awaitable[Optional[bytes]]]:
"""Create an async callback that returns raw PCM bytes for a recording.
The returned callable:
1. Checks the filesystem cache (keyed by org/workflow/recording + sample rate).
2. On miss, looks up the recording in the DB, downloads the audio file
from S3/MinIO, converts it to 16-bit mono PCM at *pipeline_sample_rate*,
trims leading/trailing silence, caches the result on disk, and returns it.
The returned callable accepts **one** of two keyword arguments:
- ``recording_pk`` the immutable integer primary key (used by
dropdown-based selections: greeting, edges, tool configs).
- ``recording_id`` the human-readable string ID (used by
prompt-based ``RECORDING_ID: xxx`` references).
Flow:
1. Checks the filesystem cache (keyed by org + pk + sample rate).
2. On miss, looks up the recording in the DB, downloads the audio
from S3/MinIO, converts to 16-bit mono PCM, trims silence, and
caches the result on disk.
Args:
organization_id: Organization owning the recordings.
workflow_id: Workflow the recordings belong to.
pipeline_sample_rate: Target PCM sample rate for the pipeline.
Returns:
``async (recording_id: str) -> Optional[bytes]``
"""
from api.db import db_client
from api.services.storage import get_storage_for_backend
# Resolve storage instances once per backend at creation time, not per fetch.
_storage_cache: dict[str, object] = {}
_transcript_cache: dict[str, Optional[str]] = {}
def _get_storage(backend: str):
if backend not in _storage_cache:
_storage_cache[backend] = get_storage_for_backend(backend)
return _storage_cache[backend]
async def fetch(recording_id: str) -> Optional[bytes]:
cached = _cache_path(
organization_id, workflow_id, recording_id, pipeline_sample_rate
)
async def _lookup_recording(
cache_key: str,
recording_pk: Optional[int],
recording_id: Optional[str],
):
"""DB lookup with transcript caching."""
if recording_pk is not None:
recording = await db_client.get_recording_by_id(
recording_pk, organization_id
)
else:
recording = await db_client.get_recording_by_recording_id(
recording_id, organization_id
)
if recording:
_transcript_cache[cache_key] = recording.transcript or None
return recording
async def fetch(
*,
recording_pk: Optional[int] = None,
recording_id: Optional[str] = None,
) -> Optional[RecordingAudio]:
if recording_pk is None and recording_id is None:
logger.warning("fetch called with neither recording_pk nor recording_id")
return None
# Use pk for cache key when available, otherwise recording_id
cache_key = str(recording_pk) if recording_pk is not None else recording_id
cached = _cache_path(organization_id, cache_key, pipeline_sample_rate)
# 1. Serve from filesystem cache
if os.path.exists(cached):
logger.debug(f"Recording {recording_id} served from disk cache")
return read_cached_file(cached)
logger.debug(f"Recording {cache_key} served from disk cache")
audio = read_cached_file(cached)
# Transcript may already be in memory from a prior fetch;
# if not, do a lightweight DB lookup.
if cache_key not in _transcript_cache:
await _lookup_recording(cache_key, recording_pk, recording_id)
return RecordingAudio(
audio=audio, transcript=_transcript_cache.get(cache_key)
)
# 2. DB lookup
recording = await db_client.get_recording_by_recording_id(
recording_id, organization_id, workflow_id
)
recording = await _lookup_recording(cache_key, recording_pk, recording_id)
if not recording:
logger.warning(f"Recording {recording_id} not found in database")
logger.warning(f"Recording {cache_key} not found in database")
return None
# 3. Download, convert, trim, and cache
pcm_data = await _download_and_convert(
recording, pipeline_sample_rate, _get_storage
)
return pcm_data
if pcm_data is None:
return None
return RecordingAudio(
audio=pcm_data, transcript=_transcript_cache.get(cache_key)
)
return fetch
@ -106,11 +149,10 @@ def create_recording_audio_fetcher(
async def warm_recording_cache(
workflow_id: int,
organization_id: int,
pipeline_sample_rate: int,
) -> None:
"""Pre-fetch all active recordings for a workflow into the disk cache.
"""Pre-fetch all active recordings for an organization into the disk cache.
Launched as a background ``asyncio.Task`` at pipeline startup so that
recordings are ready before the first playback request. Errors are logged
@ -120,9 +162,7 @@ async def warm_recording_cache(
from api.services.storage import get_storage_for_backend
try:
recordings = await db_client.get_recordings(
organization_id=organization_id, workflow_id=workflow_id
)
recordings = await db_client.get_recordings(organization_id=organization_id)
if not recordings:
return
@ -131,18 +171,19 @@ async def warm_recording_cache(
r
for r in recordings
if not os.path.exists(
_cache_path(
organization_id, workflow_id, r.recording_id, pipeline_sample_rate
)
_cache_path(organization_id, str(r.id), pipeline_sample_rate)
)
and not os.path.exists(
_cache_path(organization_id, r.recording_id, pipeline_sample_rate)
)
]
if not uncached:
logger.debug(f"Recording cache already warm for workflow {workflow_id}")
logger.debug(f"Recording cache already warm for org {organization_id}")
return
logger.info(
f"Warming recording cache: {len(uncached)}/{len(recordings)} "
f"recording(s) for workflow {workflow_id}"
f"recording(s) for org {organization_id}"
)
# Resolve storage instances once per backend, not per recording
@ -168,7 +209,7 @@ async def warm_recording_cache(
f"Cache warm: error processing {recording.recording_id}"
)
logger.info(f"Recording cache warm complete for workflow {workflow_id}")
logger.info(f"Recording cache warm complete for org {organization_id}")
except Exception:
logger.exception("Recording cache warm failed")
@ -201,7 +242,6 @@ async def _download_and_convert(
# Write to disk cache
cached = _cache_path(
recording.organization_id,
recording.workflow_id,
recording.recording_id,
sample_rate,
)

View file

@ -1,41 +0,0 @@
"""Shared helper for pushing pre-recorded audio frames into a pipeline."""
import uuid
from typing import Awaitable, Callable
from pipecat.frames.frames import (
Frame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
async def queue_recording_audio(
audio_data: bytes,
*,
sample_rate: int,
queue_frame: Callable[[Frame], Awaitable[None]],
) -> None:
"""Push TTSStarted → TTSAudioRaw → TTSStopped frames.
This is the canonical way to play pre-recorded PCM audio through the
pipeline outside of the RecordingRouterProcessor (which uses its own
``push_frame`` path).
Args:
audio_data: Raw 16-bit mono PCM bytes.
sample_rate: Pipeline sample rate (e.g. 16000).
queue_frame: Typically ``task.queue_frame``.
"""
context_id = str(uuid.uuid4())
await queue_frame(TTSStartedFrame(context_id=context_id))
await queue_frame(
TTSAudioRawFrame(
audio=audio_data,
sample_rate=sample_rate,
num_channels=1,
context_id=context_id,
)
)
await queue_frame(TTSStoppedFrame(context_id=context_id))

View file

@ -245,8 +245,8 @@ class RecordingRouterProcessor(FrameProcessor):
"""
logger.info(f"Playing pre-recorded audio: {recording_id}")
audio_data = await self._fetch_recording_audio(recording_id)
if not audio_data:
result = await self._fetch_recording_audio(recording_id=recording_id)
if not result:
logger.warning(
f"Failed to fetch recording {recording_id}, no audio will play"
)
@ -256,7 +256,7 @@ class RecordingRouterProcessor(FrameProcessor):
await self.push_frame(TTSStartedFrame(context_id=context_id))
await self.push_frame(
TTSAudioRawFrame(
audio=audio_data,
audio=result.audio,
sample_rate=self._audio_sample_rate,
num_channels=1,
context_id=context_id,
@ -264,10 +264,10 @@ class RecordingRouterProcessor(FrameProcessor):
)
await self.push_frame(TTSStoppedFrame(context_id=context_id))
duration_secs = len(audio_data) / (self._audio_sample_rate * 2)
duration_secs = len(result.audio) / (self._audio_sample_rate * 2)
logger.debug(
f"Finished pushing recording {recording_id} "
f"({len(audio_data)} bytes, {duration_secs:.1f}s)"
f"({len(result.audio)} bytes, {duration_secs:.1f}s)"
)
# ------------------------------------------------------------------

View file

@ -695,9 +695,7 @@ async def _run_pipeline(
# Check if the workflow has any active recordings so the engine can
# include recording response mode instructions in all node prompts.
has_recordings = await db_client.has_active_recordings(
workflow_id, workflow.organization_id
)
has_recordings = await db_client.has_active_recordings(workflow.organization_id)
context_compaction_enabled = (workflow.workflow_configurations or {}).get(
"context_compaction_enabled", False
@ -832,7 +830,6 @@ async def _run_pipeline(
# and audio transition speech)
fetch_audio = create_recording_audio_fetcher(
organization_id=workflow.organization_id,
workflow_id=workflow_id,
pipeline_sample_rate=audio_config.pipeline_sample_rate,
)
engine.set_fetch_recording_audio(fetch_audio)
@ -885,7 +882,6 @@ async def _run_pipeline(
# before the first playback request.
asyncio.create_task(
warm_recording_cache(
workflow_id=workflow_id,
organization_id=workflow.organization_id,
pipeline_sample_rate=audio_config.pipeline_sample_rate,
)
@ -920,8 +916,9 @@ async def _run_pipeline(
# Create pipeline task with audio configuration
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
# Now set the task on the engine
# Now set the task and transport output on the engine
engine.set_task(task)
engine.set_transport_output(transport.output())
# Initialize the engine to set the initial context with
# System Prompt and Tools

View file

@ -230,7 +230,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
api_key=user_config.tts.api_key,
settings=DeepgramTTSSettings(voice=user_config.tts.voice),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.OPENAI.value:
@ -238,7 +238,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
api_key=user_config.tts.api_key,
settings=OpenAITTSSettings(model=user_config.tts.model),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.ELEVENLABS.value:
@ -258,7 +258,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
similarity_boost=0.75,
),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.CARTESIA.value:
@ -284,7 +284,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
),
),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.DOGRAH.value:
@ -299,7 +299,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
speed=user_config.tts.speed,
),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.CAMB.value:
@ -312,7 +312,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
voice_id=voice_id,
model=user_config.tts.model,
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
)
# Set language directly as BCP-47 code (bypasses Language enum conversion)
tts._settings.language = language
@ -327,7 +327,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
speed=user_config.tts.speed,
),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.RIME.value:
@ -352,7 +352,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
api_key=user_config.tts.api_key,
settings=RimeTTSSettings(**settings_kwargs),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.SARVAM.value:
@ -382,7 +382,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
language=pipecat_language,
),
text_filters=[xml_function_tag_filter],
skip_aggregator_types=["recording_router"],
skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
else:

View file

@ -1,4 +1,4 @@
"""Service for duplicating workflows including recordings."""
"""Service for duplicating workflows."""
import copy
import posixpath
@ -44,7 +44,9 @@ async def duplicate_workflow(
organization_id: int,
user_id: int,
):
"""Duplicate a workflow including its definition, config, recordings, and triggers.
"""Duplicate a workflow including its definition, config, and triggers.
Recordings are org-scoped and shared, so they are not duplicated.
Args:
workflow_id: The source workflow ID to duplicate
@ -118,15 +120,7 @@ async def duplicate_workflow(
organization_id=organization_id,
)
# 6. Copy recordings (recording_ids are preserved since they're scoped per workflow)
await _duplicate_recordings(
source_workflow_id=workflow_id,
new_workflow_id=new_workflow.id,
organization_id=organization_id,
user_id=user_id,
)
# 7. Sync triggers for the new workflow
# 6. Sync triggers for the new workflow
if workflow_definition:
trigger_paths = _extract_trigger_paths(workflow_definition)
if trigger_paths:
@ -139,66 +133,6 @@ async def duplicate_workflow(
return new_workflow
async def _duplicate_recordings(
source_workflow_id: int,
new_workflow_id: int,
organization_id: int,
user_id: int,
) -> None:
"""Duplicate all recordings for a workflow.
Copies each recording file to a new storage path scoped under the new
workflow ID. Recording IDs are preserved since they are unique per
(org, workflow).
"""
recordings = await db_client.get_recordings(
workflow_id=source_workflow_id,
organization_id=organization_id,
)
if not recordings:
return
for rec in recordings:
try:
# Build new storage key: recordings/{org_id}/{new_workflow_id}/{recording_id}/{filename}
filename = posixpath.basename(rec.storage_key)
new_storage_key = (
f"recordings/{organization_id}"
f"/{new_workflow_id}/{rec.recording_id}"
f"/{filename}"
)
copied = await _copy_storage_object(
rec.storage_key, new_storage_key, rec.storage_backend
)
if not copied:
logger.warning(
f"Failed to copy recording file {rec.recording_id}, skipping"
)
continue
await db_client.create_recording(
recording_id=rec.recording_id,
workflow_id=new_workflow_id,
organization_id=organization_id,
tts_provider=rec.tts_provider,
tts_model=rec.tts_model,
tts_voice_id=rec.tts_voice_id,
transcript=rec.transcript,
storage_key=new_storage_key,
storage_backend=rec.storage_backend,
created_by=user_id,
metadata=copy.deepcopy(rec.recording_metadata),
)
logger.info(f"Duplicated recording {rec.recording_id}")
except Exception as e:
logger.error(f"Error duplicating recording {rec.recording_id}: {e}")
continue
async def _copy_storage_object(
source_key: str, dest_key: str, storage_backend: str
) -> bool:

View file

@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Union
from api.services.pipecat.recording_playback import queue_recording_audio
from api.services.pipecat.audio_playback import play_audio
from api.services.workflow.disposition_mapper import (
apply_disposition_mapping,
get_organization_id_from_workflow_run,
@ -115,6 +115,10 @@ class PipecatEngine:
# Audio configuration (set via set_audio_config from _run_pipeline)
self._audio_config = None
# Transport output processor for injecting audio directly into the
# output, bypassing STT (set via set_transport_output from _run_pipeline)
self._transport_output = None
# Recording audio fetcher (set via set_fetch_recording_audio from _run_pipeline)
self._fetch_recording_audio = None
@ -221,16 +225,17 @@ class PipecatEngine:
f"Playing transition audio: {transition_speech_recording_id}"
)
self._queued_speech_mute_state = "waiting"
audio_data = await self._fetch_recording_audio(
transition_speech_recording_id
result = await self._fetch_recording_audio(
recording_pk=int(transition_speech_recording_id)
)
if audio_data:
await queue_recording_audio(
audio_data,
if result:
await play_audio(
result.audio,
sample_rate=self._audio_config.pipeline_sample_rate
if self._audio_config
else 16000,
queue_frame=self.task.queue_frame,
queue_frame=self._transport_output.queue_frame,
transcript=result.transcript,
)
else:
logger.warning(
@ -753,6 +758,14 @@ class PipecatEngine:
"""Set the audio configuration for the pipeline."""
self._audio_config = audio_config
def set_transport_output(self, transport_output) -> None:
"""Set the transport output processor for direct audio playback.
Audio queued here bypasses STT and the rest of the pipeline,
going straight to the caller.
"""
self._transport_output = transport_output
def set_fetch_recording_audio(self, fetch_fn) -> None:
"""Set the recording audio fetcher callback."""
self._fetch_recording_audio = fetch_fn

View file

@ -168,7 +168,6 @@ def create_aggregation_correction_callback(engine: "PipecatEngine"):
reference = engine._current_llm_generation_reference_text
if not reference:
logger.warning("No reference text available for aggregation correction")
return corrupted
# Apply the correction algorithm

View file

@ -16,7 +16,7 @@ from loguru import logger
from api.db import db_client
from api.enums import ToolCategory, WorkflowRunMode
from api.services.pipecat.recording_playback import queue_recording_audio
from api.services.pipecat.audio_playback import play_audio, play_audio_loop
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.factory import get_telephony_provider
from api.services.telephony.transfer_event_protocol import TransferContext
@ -28,7 +28,6 @@ from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
)
from api.utils.hold_audio import play_hold_audio_loop
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.frames.frames import (
FunctionCallResultProperties,
@ -88,20 +87,23 @@ class CustomToolManager:
message_type = config.get("messageType", "none")
if message_type == "audio":
recording_id = config.get("audioRecordingId", "")
if recording_id and self._engine._fetch_recording_audio:
audio_data = await self._engine._fetch_recording_audio(recording_id)
if audio_data:
await queue_recording_audio(
audio_data,
recording_pk = config.get("audioRecordingId")
if recording_pk and self._engine._fetch_recording_audio:
result = await self._engine._fetch_recording_audio(
recording_pk=int(recording_pk)
)
if result:
await play_audio(
result.audio,
sample_rate=self._engine._audio_config.pipeline_sample_rate
if self._engine._audio_config
else 16000,
queue_frame=self._engine.task.queue_frame,
queue_frame=self._engine._transport_output.queue_frame,
transcript=result.transcript,
)
return True
else:
logger.warning(f"Failed to fetch recording {recording_id}")
logger.warning(f"Failed to fetch recording pk={recording_pk}")
return False
if message_type == "custom":
@ -292,22 +294,23 @@ class CustomToolManager:
custom_msg_type = config.get("customMessageType", "text")
custom_message = config.get("customMessage", "")
if custom_msg_type == "audio":
recording_id = config.get("customMessageRecordingId", "")
if recording_id and self._engine._fetch_recording_audio:
recording_pk = config.get("customMessageRecordingId")
if recording_pk and self._engine._fetch_recording_audio:
logger.info(
f"Playing audio message before HTTP tool: {recording_id}"
f"Playing audio message before HTTP tool: pk={recording_pk}"
)
self._engine._queued_speech_mute_state = "waiting"
audio_data = await self._engine._fetch_recording_audio(
recording_id
result = await self._engine._fetch_recording_audio(
recording_pk=int(recording_pk)
)
if audio_data:
await queue_recording_audio(
audio_data,
if result:
await play_audio(
result.audio,
sample_rate=self._engine._audio_config.pipeline_sample_rate
if self._engine._audio_config
else 16000,
queue_frame=self._engine.task.queue_frame,
queue_frame=self._engine._transport_output.queue_frame,
transcript=result.transcript,
)
elif custom_message:
logger.info(
@ -587,10 +590,10 @@ class CustomToolManager:
# Start hold music as background task
hold_music_task = asyncio.create_task(
play_hold_audio_loop(
self._engine.task,
hold_music_stop_event,
sample_rate,
play_audio_loop(
stop_event=hold_music_stop_event,
sample_rate=sample_rate,
queue_frame=self._engine._transport_output.queue_frame,
)
)

View file

@ -1,151 +0,0 @@
"""
Hold audio utility for loading, caching, and playing hold music files.
This module provides functionality to load hold music audio files at specific sample rates
with caching to improve performance during multiple calls, and a reusable loop that queues
audio frames until a stop event is set.
"""
import asyncio
from typing import Dict, Optional, Tuple
import numpy as np
from loguru import logger
from pipecat.frames.frames import OutputAudioRawFrame
try:
import soundfile as sf
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use hold audio, you need to `pip install soundfile`.")
raise Exception(f"Missing module: {e}")
# Global cache for loaded hold music data
_hold_audio_cache: Dict[Tuple[str, int], np.ndarray] = {}
def load_hold_audio(file_path: str, sample_rate: int) -> Optional[bytes]:
"""Load hold music audio file at the specified sample rate with caching.
Args:
file_path: Path to the hold music audio file
sample_rate: Target sample rate (8000 or 16000 Hz supported)
Returns:
Audio data as bytes (PCM16) or None if loading failed
"""
cache_key = (file_path, sample_rate)
# Check cache first
if cache_key in _hold_audio_cache:
logger.debug(f"Using cached hold audio for {file_path} at {sample_rate}Hz")
audio_data = _hold_audio_cache[cache_key]
return audio_data.tobytes()
try:
logger.info(f"Loading hold audio from {file_path} at {sample_rate}Hz")
# Load audio file
sound, file_sample_rate = sf.read(file_path, dtype="int16")
logger.info(
f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}"
)
# Ensure mono audio (take first channel if stereo)
if len(sound.shape) > 1:
sound = sound[:, 0]
# Resample if needed
if file_sample_rate != sample_rate:
logger.warning(
f"Hold music file has sample rate {file_sample_rate}, expected {sample_rate}"
)
# For now, we'll use the audio as-is and let the transport handle resampling
# In a production system, you might want to use librosa or scipy for proper resampling
# Convert to int16 and cache
audio_data = sound.astype(np.int16)
_hold_audio_cache[cache_key] = audio_data
logger.info(
f"Hold audio loaded successfully: {len(audio_data)} samples at {sample_rate}Hz"
)
return audio_data.tobytes()
except Exception as e:
logger.error(f"Failed to load hold audio file {file_path}: {e}")
return None
def clear_hold_audio_cache():
"""Clear the hold audio cache to free memory."""
global _hold_audio_cache
_hold_audio_cache.clear()
logger.info("Hold audio cache cleared")
def get_cache_info() -> Dict[str, int]:
"""Get information about the current cache state.
Returns:
Dictionary with cache statistics
"""
return {
"cached_files": len(_hold_audio_cache),
"total_cache_size": sum(len(data) for data in _hold_audio_cache.values()),
}
async def play_hold_audio_loop(
task,
stop_event: asyncio.Event,
sample_rate: int = 16000,
hold_music_file: Optional[str] = None,
) -> None:
"""Play hold/ring-back audio in a loop until *stop_event* is set.
This is a shared helper used by call-transfer hold music and the
pre-call data fetch ringer. The caller is responsible for creating
the ``asyncio.Event`` and setting it when playback should stop.
Args:
task: A ``PipelineTask`` (or anything with ``queue_frame``).
stop_event: Set this event to terminate the loop.
sample_rate: Target sample rate for audio playback.
hold_music_file: Path to a WAV file. When *None* the default
``transfer_hold_ring_{sample_rate}.wav`` asset is used.
"""
if hold_music_file is None:
from api.constants import APP_ROOT_DIR
hold_music_file = str(
APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav"
)
hold_audio_data = load_hold_audio(hold_music_file, sample_rate)
if not hold_audio_data:
logger.warning(f"Hold audio loop: failed to load {hold_music_file}, skipping")
return
num_samples = len(hold_audio_data) // 2 # 16-bit PCM = 2 bytes per sample
duration = num_samples / sample_rate
logger.debug(f"Hold audio loop: playing at {sample_rate}Hz")
try:
while not stop_event.is_set():
frame = OutputAudioRawFrame(
audio=hold_audio_data,
sample_rate=sample_rate,
num_channels=1,
)
await task.queue_frame(frame)
try:
await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5)
break
except asyncio.TimeoutError:
pass
except Exception as e:
logger.error(f"Hold audio loop: error: {e}")
logger.debug("Hold audio loop: stopped")