From ffe9a9940155ca09ab94a2c51a708952bed30f3f Mon Sep 17 00:00:00 2001
From: Abhishek Kumar
Date: Fri, 10 Apr 2026 16:18:01 +0530
Subject: [PATCH] feat: allow recordings in tool transitions
---
...cd3155084a2_dedup_org_scoped_recordings.py | 108 ++++
...7a5cf3e09d0_unique_recording_id_per_org.py | 67 ++-
.../a1b2c3d4e5f6_make_tts_columns_nullable.py | 42 ++
api/db/models.py | 22 +-
api/db/workflow_recording_client.py | 117 ++++-
api/routes/tool.py | 19 +-
api/routes/workflow_recording.py | 46 +-
api/schemas/workflow_recording.py | 23 +-
api/services/pipecat/audio_playback.py | 188 +++++++
api/services/pipecat/event_handlers.py | 23 +-
.../pipecat/realtime_feedback_observer.py | 5 +-
api/services/pipecat/recording_audio_cache.py | 122 +++--
api/services/pipecat/recording_playback.py | 41 --
.../pipecat/recording_router_processor.py | 10 +-
api/services/pipecat/run_pipeline.py | 9 +-
api/services/pipecat/service_factory.py | 18 +-
api/services/workflow/duplicate.py | 76 +--
api/services/workflow/pipecat_engine.py | 27 +-
.../workflow/pipecat_engine_callbacks.py | 1 -
.../workflow/pipecat_engine_custom_tools.py | 49 +-
api/utils/hold_audio.py | 151 ------
docs/voice-agent/pre-recorded-audio.mdx | 26 +-
pipecat | 2 +-
ui/src/app/files/page.tsx | 75 +--
ui/src/app/recordings/RecordingsList.tsx | 113 +----
.../app/recordings/RecordingsUploadDialog.tsx | 465 ++++++++++++++++++
ui/src/app/recordings/page.tsx | 38 +-
ui/src/app/tools/[toolUuid]/page.tsx | 26 +-
.../workflow/[workflowId]/RenderWorkflow.tsx | 18 +-
.../components/RecordingsDialog.tsx | 7 +-
.../run/[runId]/hooks/useWebSocketRTC.tsx | 11 +
.../workflow/[workflowId]/settings/page.tsx | 23 +-
ui/src/client/sdk.gen.ts | 2 +-
ui/src/client/types.gen.ts | 62 ++-
ui/src/components/flow/TextOrAudioInput.tsx | 163 +++++-
ui/src/components/flow/edges/CustomEdge.tsx | 4 +-
ui/src/components/ui/dialog.tsx | 17 +
ui/src/components/ui/popover.tsx | 31 +-
38 files changed, 1555 insertions(+), 692 deletions(-)
create mode 100644 api/alembic/versions/3cd3155084a2_dedup_org_scoped_recordings.py
create mode 100644 api/alembic/versions/a1b2c3d4e5f6_make_tts_columns_nullable.py
create mode 100644 api/services/pipecat/audio_playback.py
delete mode 100644 api/services/pipecat/recording_playback.py
delete mode 100644 api/utils/hold_audio.py
create mode 100644 ui/src/app/recordings/RecordingsUploadDialog.tsx
diff --git a/api/alembic/versions/3cd3155084a2_dedup_org_scoped_recordings.py b/api/alembic/versions/3cd3155084a2_dedup_org_scoped_recordings.py
new file mode 100644
index 0000000..67998cf
--- /dev/null
+++ b/api/alembic/versions/3cd3155084a2_dedup_org_scoped_recordings.py
@@ -0,0 +1,108 @@
+"""dedup recordings to org-scoped unique audio
+
+Revision ID: 3cd3155084a2
+Revises: e7254d2c6c18
+Create Date: 2026-04-10 12:00:00.000000
+
+"""
+
+from typing import Sequence, Union
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision: str = "3cd3155084a2"
+down_revision: Union[str, None] = "e7254d2c6c18"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ conn = op.get_bind()
+
+ # 1. Identify duplicate groups: same (org, transcript, tts config).
+ # Within each group the earliest row (by created_at) is canonical;
+ # every other row is an alias that will be remapped and soft-deleted.
+ rows = conn.execute(
+ sa.text("""
+ WITH ranked AS (
+ SELECT
+ recording_id,
+ organization_id,
+ transcript,
+ tts_provider,
+ tts_model,
+ tts_voice_id,
+ ROW_NUMBER() OVER (
+ PARTITION BY organization_id, transcript,
+ tts_provider, tts_model, tts_voice_id
+ ORDER BY created_at ASC
+ ) AS rn
+ FROM workflow_recordings
+ WHERE is_active = true
+ ),
+ canonical AS (
+ SELECT recording_id AS canonical_id,
+ organization_id, transcript,
+ tts_provider, tts_model, tts_voice_id
+ FROM ranked
+ WHERE rn = 1
+ )
+ SELECT r.recording_id AS alias_id, c.canonical_id
+ FROM ranked r
+ JOIN canonical c
+ ON r.organization_id = c.organization_id
+ AND r.transcript = c.transcript
+ AND r.tts_provider = c.tts_provider
+ AND r.tts_model = c.tts_model
+ AND r.tts_voice_id = c.tts_voice_id
+ WHERE r.rn > 1
+ """)
+ ).fetchall()
+
+ if not rows:
+ return
+
+ # 2. Replace alias recording_ids with canonical ones in workflow JSON.
+ # Both draft definitions (workflows.workflow_definition) and published
+ # versions (workflow_definitions.workflow_json) are updated.
+ for alias_id, canonical_id in rows:
+ alias_pattern = f"RECORDING_ID: {alias_id}"
+ canonical_pattern = f"RECORDING_ID: {canonical_id}"
+ conn.execute(
+ sa.text("""
+ UPDATE workflows
+ SET workflow_definition =
+ REPLACE(workflow_definition::text, :alias, :canonical)::json
+ WHERE workflow_definition::text LIKE '%%' || :alias || '%%'
+ """),
+ {"alias": alias_pattern, "canonical": canonical_pattern},
+ )
+ conn.execute(
+ sa.text("""
+ UPDATE workflow_definitions
+ SET workflow_json =
+ REPLACE(workflow_json::text, :alias, :canonical)::json
+ WHERE workflow_json::text LIKE '%%' || :alias || '%%'
+ """),
+ {"alias": alias_pattern, "canonical": canonical_pattern},
+ )
+
+ # 3. Soft-delete every alias row.
+ alias_ids = [r[0] for r in rows]
+ conn.execute(
+ sa.text("""
+ UPDATE workflow_recordings
+ SET is_active = false
+ WHERE recording_id = ANY(:ids)
+ AND is_active = true
+ """),
+ {"ids": alias_ids},
+ )
+
+
+def downgrade() -> None:
+ # Deduplication is a one-way data migration. The soft-deleted rows
+ # still exist in the table; a manual restore is possible if needed.
+ pass
diff --git a/api/alembic/versions/67a5cf3e09d0_unique_recording_id_per_org.py b/api/alembic/versions/67a5cf3e09d0_unique_recording_id_per_org.py
index 5ec4b1e..0b9ddfa 100644
--- a/api/alembic/versions/67a5cf3e09d0_unique_recording_id_per_org.py
+++ b/api/alembic/versions/67a5cf3e09d0_unique_recording_id_per_org.py
@@ -1,7 +1,7 @@
-"""unique recording id per org and workflow
+"""make recordings org-scoped instead of workflow-scoped
Revision ID: 67a5cf3e09d0
-Revises: e7254d2c6c18
+Revises: 3cd3155084a2
Create Date: 2026-04-09 17:03:38.302041
"""
@@ -13,13 +13,13 @@ from alembic import op
# revision identifiers, used by Alembic.
revision: str = "67a5cf3e09d0"
-down_revision: Union[str, None] = "e7254d2c6c18"
+down_revision: Union[str, None] = "3cd3155084a2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
- # Widen column from 16 to 64 chars for descriptive names
+ # 1. Widen recording_id from 16 to 64 chars for descriptive names
op.alter_column(
"workflow_recordings",
"recording_id",
@@ -27,40 +27,79 @@ def upgrade() -> None:
type_=sa.String(length=64),
existing_nullable=False,
)
- # Drop the old globally-unique index
- op.drop_index(
- op.f("ix_workflow_recordings_recording_id"), table_name="workflow_recordings"
+
+ # 2. Make workflow_id nullable — recordings are now org-scoped
+ op.alter_column(
+ "workflow_recordings",
+ "workflow_id",
+ existing_type=sa.Integer(),
+ nullable=True,
)
- # Re-create as non-unique index for lookups
+
+ # 3. Drop the old globally-unique index on recording_id
+ op.drop_index(
+ "ix_workflow_recordings_recording_id",
+ table_name="workflow_recordings",
+ )
+
+ # 4. Re-create as non-unique index (for lookups)
op.create_index(
"ix_workflow_recordings_recording_id",
"workflow_recordings",
["recording_id"],
unique=False,
)
- # Add composite unique constraint (recording_id, organization_id, workflow_id)
+
+ # 5. Add unique constraint (recording_id, organization_id)
op.create_unique_constraint(
- "uq_workflow_recordings_recording_id_org_wf",
+ "uq_workflow_recordings_recording_id_org",
"workflow_recordings",
- ["recording_id", "organization_id", "workflow_id"],
+ ["recording_id", "organization_id"],
+ )
+
+ # 6. Drop the workflow+TTS scope index (no longer relevant)
+ op.drop_index(
+ "ix_workflow_recordings_tts_scope",
+ table_name="workflow_recordings",
)
def downgrade() -> None:
+ # Re-create the TTS scope index
+ op.create_index(
+ "ix_workflow_recordings_tts_scope",
+ "workflow_recordings",
+ ["workflow_id", "tts_provider", "tts_model", "tts_voice_id"],
+ )
+
+ # Drop the org-scoped unique constraint
op.drop_constraint(
- "uq_workflow_recordings_recording_id_org_wf",
+ "uq_workflow_recordings_recording_id_org",
"workflow_recordings",
type_="unique",
)
+
+ # Drop non-unique index and re-create as unique
op.drop_index(
- "ix_workflow_recordings_recording_id", table_name="workflow_recordings"
+ "ix_workflow_recordings_recording_id",
+ table_name="workflow_recordings",
)
op.create_index(
- op.f("ix_workflow_recordings_recording_id"),
+ "ix_workflow_recordings_recording_id",
"workflow_recordings",
["recording_id"],
unique=True,
)
+
+ # Make workflow_id NOT NULL again
+ op.alter_column(
+ "workflow_recordings",
+ "workflow_id",
+ existing_type=sa.Integer(),
+ nullable=False,
+ )
+
+ # Revert recording_id width
op.alter_column(
"workflow_recordings",
"recording_id",
diff --git a/api/alembic/versions/a1b2c3d4e5f6_make_tts_columns_nullable.py b/api/alembic/versions/a1b2c3d4e5f6_make_tts_columns_nullable.py
new file mode 100644
index 0000000..563c45f
--- /dev/null
+++ b/api/alembic/versions/a1b2c3d4e5f6_make_tts_columns_nullable.py
@@ -0,0 +1,42 @@
+"""make tts columns nullable on workflow_recordings
+
+Revision ID: a1b2c3d4e5f6
+Revises: 67a5cf3e09d0
+Create Date: 2026-04-10 12:00:00.000000
+
+"""
+
+from typing import Sequence, Union
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision: str = "a1b2c3d4e5f6"
+down_revision: str = "67a5cf3e09d0"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.alter_column(
+ "workflow_recordings", "tts_provider", existing_type=sa.String(), nullable=True
+ )
+ op.alter_column(
+ "workflow_recordings", "tts_model", existing_type=sa.String(), nullable=True
+ )
+ op.alter_column(
+ "workflow_recordings", "tts_voice_id", existing_type=sa.String(), nullable=True
+ )
+
+
+def downgrade() -> None:
+ op.alter_column(
+ "workflow_recordings", "tts_voice_id", existing_type=sa.String(), nullable=False
+ )
+ op.alter_column(
+ "workflow_recordings", "tts_model", existing_type=sa.String(), nullable=False
+ )
+ op.alter_column(
+ "workflow_recordings", "tts_provider", existing_type=sa.String(), nullable=False
+ )
diff --git a/api/db/models.py b/api/db/models.py
index 3968fc0..b8d4b1d 100644
--- a/api/db/models.py
+++ b/api/db/models.py
@@ -1005,7 +1005,7 @@ class KnowledgeBaseDocumentModel(Base):
class WorkflowRecordingModel(Base):
- """Model for storing audio recordings scoped to a workflow and TTS configuration.
+ """Model for storing audio recordings scoped to an organization.
Recordings are used in hybrid prompts where parts of the output are pre-recorded
audio rather than dynamically generated TTS.
@@ -1020,16 +1020,16 @@ class WorkflowRecordingModel(Base):
# Scoping
workflow_id = Column(
- Integer, ForeignKey("workflows.id", ondelete="CASCADE"), nullable=False
+ Integer, ForeignKey("workflows.id", ondelete="CASCADE"), nullable=True
)
organization_id = Column(
Integer, ForeignKey("organizations.id", ondelete="CASCADE"), nullable=False
)
- # TTS configuration scope
- tts_provider = Column(String, nullable=False)
- tts_model = Column(String, nullable=False)
- tts_voice_id = Column(String, nullable=False)
+ # TTS configuration metadata (optional, legacy)
+ tts_provider = Column(String, nullable=True)
+ tts_model = Column(String, nullable=True)
+ tts_voice_id = Column(String, nullable=True)
# Content
transcript = Column(Text, nullable=False)
@@ -1065,19 +1065,11 @@ class WorkflowRecordingModel(Base):
UniqueConstraint(
"recording_id",
"organization_id",
- "workflow_id",
- name="uq_workflow_recordings_recording_id_org_wf",
+ name="uq_workflow_recordings_recording_id_org",
),
Index("ix_workflow_recordings_workflow_id", "workflow_id"),
Index("ix_workflow_recordings_org_id", "organization_id"),
Index("ix_workflow_recordings_recording_id", "recording_id"),
- Index(
- "ix_workflow_recordings_tts_scope",
- "workflow_id",
- "tts_provider",
- "tts_model",
- "tts_voice_id",
- ),
)
diff --git a/api/db/workflow_recording_client.py b/api/db/workflow_recording_client.py
index 698d3b6..b0f1682 100644
--- a/api/db/workflow_recording_client.py
+++ b/api/db/workflow_recording_client.py
@@ -5,7 +5,7 @@ import string
from typing import List, Optional
from loguru import logger
-from sqlalchemy import func, select
+from sqlalchemy import func, select, text
from api.db.base_client import BaseDBClient
from api.db.models import WorkflowRecordingModel
@@ -23,30 +23,30 @@ class WorkflowRecordingClient(BaseDBClient):
async def create_recording(
self,
recording_id: str,
- workflow_id: int,
organization_id: int,
- tts_provider: str,
- tts_model: str,
- tts_voice_id: str,
transcript: str,
storage_key: str,
storage_backend: str,
created_by: int,
+ workflow_id: Optional[int] = None,
+ tts_provider: Optional[str] = None,
+ tts_model: Optional[str] = None,
+ tts_voice_id: Optional[str] = None,
metadata: Optional[dict] = None,
) -> WorkflowRecordingModel:
"""Create a new workflow recording record.
Args:
recording_id: Short unique recording identifier
- workflow_id: ID of the workflow
organization_id: ID of the organization
- tts_provider: TTS provider name
- tts_model: TTS model name
- tts_voice_id: TTS voice identifier
transcript: User-provided transcript
storage_key: S3/MinIO storage key
storage_backend: Storage backend (s3 or minio)
created_by: ID of the user
+ workflow_id: Optional workflow ID (legacy)
+ tts_provider: Optional TTS provider name
+ tts_model: Optional TTS model name
+ tts_voice_id: Optional TTS voice identifier
metadata: Optional extra metadata
Returns:
@@ -71,10 +71,7 @@ class WorkflowRecordingClient(BaseDBClient):
await session.commit()
await session.refresh(recording)
- logger.info(
- f"Created recording {recording_id} for workflow {workflow_id}, "
- f"org {organization_id}"
- )
+ logger.info(f"Created recording {recording_id} for org {organization_id}")
return recording
async def get_recordings(
@@ -85,7 +82,7 @@ class WorkflowRecordingClient(BaseDBClient):
tts_model: Optional[str] = None,
tts_voice_id: Optional[str] = None,
) -> List[WorkflowRecordingModel]:
- """Get recordings for an organization, optionally filtered by workflow and TTS config.
+ """Get recordings for an organization, optionally filtered.
Args:
organization_id: ID of the organization
@@ -121,14 +118,12 @@ class WorkflowRecordingClient(BaseDBClient):
self,
recording_id: str,
organization_id: int,
- workflow_id: int,
) -> Optional[WorkflowRecordingModel]:
- """Get a recording by its string recording_id (unique per org + workflow).
+ """Get a recording by its short ID.
Args:
- recording_id: The descriptive recording ID
+ recording_id: The short unique recording ID
organization_id: ID of the organization
- workflow_id: ID of the workflow
Returns:
WorkflowRecordingModel if found, None otherwise
@@ -137,7 +132,6 @@ class WorkflowRecordingClient(BaseDBClient):
query = select(WorkflowRecordingModel).where(
WorkflowRecordingModel.recording_id == recording_id,
WorkflowRecordingModel.organization_id == organization_id,
- WorkflowRecordingModel.workflow_id == workflow_id,
WorkflowRecordingModel.is_active == True,
)
@@ -170,13 +164,11 @@ class WorkflowRecordingClient(BaseDBClient):
async def has_active_recordings(
self,
- workflow_id: int,
organization_id: int,
) -> bool:
- """Check if a workflow has any active recordings.
+ """Check if an organization has any active recordings.
Args:
- workflow_id: ID of the workflow
organization_id: ID of the organization
Returns:
@@ -187,7 +179,6 @@ class WorkflowRecordingClient(BaseDBClient):
select(func.count())
.select_from(WorkflowRecordingModel)
.where(
- WorkflowRecordingModel.workflow_id == workflow_id,
WorkflowRecordingModel.organization_id == organization_id,
WorkflowRecordingModel.is_active == True,
)
@@ -196,14 +187,13 @@ class WorkflowRecordingClient(BaseDBClient):
return result.scalar_one() > 0
async def check_recording_id_exists(
- self, recording_id: str, organization_id: int, workflow_id: int
+ self, recording_id: str, organization_id: int
) -> bool:
- """Check if a recording ID already exists within an organization and workflow.
+ """Check if a recording ID already exists within an organization.
Args:
recording_id: The recording ID to check
organization_id: ID of the organization
- workflow_id: ID of the workflow
Returns:
True if exists, False otherwise
@@ -212,7 +202,6 @@ class WorkflowRecordingClient(BaseDBClient):
query = select(WorkflowRecordingModel.id).where(
WorkflowRecordingModel.recording_id == recording_id,
WorkflowRecordingModel.organization_id == organization_id,
- WorkflowRecordingModel.workflow_id == workflow_id,
WorkflowRecordingModel.is_active == True,
)
result = await session.execute(query)
@@ -257,6 +246,80 @@ class WorkflowRecordingClient(BaseDBClient):
)
return recording
+ async def replace_recording_id_in_workflows(
+ self,
+ old_id: str,
+ new_id: str,
+ organization_id: int,
+ ) -> int:
+ """Replace all occurrences of a recording ID in workflow definitions.
+
+ Updates both draft definitions (workflows.workflow_definition) and
+ versioned definitions (workflow_definitions.workflow_json), skipping
+ workflow_definitions with status 'legacy'.
+
+ Args:
+ old_id: The old recording ID to find
+ new_id: The new recording ID to replace with
+ organization_id: ID of the organization (scopes to org workflows)
+
+ Returns:
+ Total number of rows updated across both tables
+ """
+ # Match the exact pattern used in prompts: "RECORDING_ID: "
+ old_pattern = f"RECORDING_ID: {old_id}"
+ new_pattern = f"RECORDING_ID: {new_id}"
+
+ total = 0
+ async with self.async_session() as session:
+ # Update workflows.workflow_definition (draft definitions)
+ result = await session.execute(
+ text("""
+ UPDATE workflows
+ SET workflow_definition =
+ REPLACE(workflow_definition::text, :old_pat, :new_pat)::json
+ WHERE organization_id = :org_id
+ AND workflow_definition::text LIKE '%%' || :old_pat || '%%'
+ """),
+ {
+ "old_pat": old_pattern,
+ "new_pat": new_pattern,
+ "org_id": organization_id,
+ },
+ )
+ total += result.rowcount
+
+ # Update workflow_definitions.workflow_json (versioned definitions)
+ # Skip legacy definitions
+ result = await session.execute(
+ text("""
+ UPDATE workflow_definitions wd
+ SET workflow_json =
+ REPLACE(wd.workflow_json::text, :old_pat, :new_pat)::json
+ FROM workflows w
+ WHERE wd.workflow_id = w.id
+ AND w.organization_id = :org_id
+ AND wd.status != 'legacy'
+ AND wd.workflow_json::text LIKE '%%' || :old_pat || '%%'
+ """),
+ {
+ "old_pat": old_pattern,
+ "new_pat": new_pattern,
+ "org_id": organization_id,
+ },
+ )
+ total += result.rowcount
+
+ await session.commit()
+
+ if total > 0:
+ logger.info(
+ f"Replaced recording ID '{old_id}' -> '{new_id}' "
+ f"in {total} workflow definition(s), org {organization_id}"
+ )
+
+ return total
+
async def delete_recording(
self,
recording_id: str,
diff --git a/api/routes/tool.py b/api/routes/tool.py
index a0eeffd..f3caa94 100644
--- a/api/routes/tool.py
+++ b/api/routes/tool.py
@@ -44,17 +44,29 @@ class HttpApiConfig(BaseModel):
timeout_ms: Optional[int] = Field(
default=5000, description="Request timeout in milliseconds"
)
+ customMessage: Optional[str] = Field(
+ default=None, description="Custom message to play after tool execution"
+ )
+ customMessageType: Optional[Literal["text", "audio"]] = Field(
+ default=None, description="Type of custom message: text or audio"
+ )
+ customMessageRecordingId: Optional[str] = Field(
+ default=None, description="Recording ID for audio custom message"
+ )
class EndCallConfig(BaseModel):
"""Configuration for End Call tools."""
- messageType: Literal["none", "custom"] = Field(
+ messageType: Literal["none", "custom", "audio"] = Field(
default="none", description="Type of goodbye message"
)
customMessage: Optional[str] = Field(
default=None, description="Custom message to play before ending the call"
)
+ audioRecordingId: Optional[str] = Field(
+ default=None, description="Recording ID for audio goodbye message"
+ )
endCallReason: bool = Field(
default=False,
description="When enabled, LLM must provide a reason for ending the call. "
@@ -73,12 +85,15 @@ class TransferCallConfig(BaseModel):
destination: str = Field(
description="Phone number or SIP endpoint to transfer the call to (E.164 format e.g., +1234567890, or SIP endpoint e.g., PJSIP/1234)"
)
- messageType: Literal["none", "custom"] = Field(
+ messageType: Literal["none", "custom", "audio"] = Field(
default="none", description="Type of message to play before transfer"
)
customMessage: Optional[str] = Field(
default=None, description="Custom message to play before transferring the call"
)
+ audioRecordingId: Optional[str] = Field(
+ default=None, description="Recording ID for audio message before transfer"
+ )
timeout: int = Field(
default=30,
ge=5,
diff --git a/api/routes/workflow_recording.py b/api/routes/workflow_recording.py
index 047bd8c..9f30997 100644
--- a/api/routes/workflow_recording.py
+++ b/api/routes/workflow_recording.py
@@ -26,13 +26,11 @@ from api.services.storage import storage_fs
router = APIRouter(prefix="/workflow-recordings", tags=["workflow-recordings"])
-async def _generate_unique_recording_id(organization_id: int, workflow_id: int) -> str:
- """Generate a unique short recording ID within an organization and workflow."""
+async def _generate_unique_recording_id(organization_id: int) -> str:
+ """Generate a unique short recording ID within an organization."""
for _ in range(10):
rid = generate_short_id(8)
- exists = await db_client.check_recording_id_exists(
- rid, organization_id, workflow_id
- )
+ exists = await db_client.check_recording_id_exists(rid, organization_id)
if not exists:
return rid
raise HTTPException(
@@ -73,12 +71,12 @@ async def get_upload_urls(
items = []
for fd in request.files:
recording_id = await _generate_unique_recording_id(
- user.selected_organization_id, request.workflow_id
+ user.selected_organization_id
)
storage_key = (
f"recordings/{user.selected_organization_id}"
- f"/{request.workflow_id}/{recording_id}"
+ f"/{recording_id}"
f"/{fd.filename}"
)
@@ -105,7 +103,7 @@ async def get_upload_urls(
logger.info(
f"Generated {len(items)} recording upload URL(s), "
- f"workflow {request.workflow_id}, org {user.selected_organization_id}"
+ f"org {user.selected_organization_id}"
)
return BatchRecordingUploadResponseSchema(items=items)
@@ -136,22 +134,20 @@ async def create_recordings(
for rec_req in request.recordings:
recording = await db_client.create_recording(
recording_id=rec_req.recording_id,
- workflow_id=rec_req.workflow_id,
organization_id=user.selected_organization_id,
- tts_provider=rec_req.tts_provider,
- tts_model=rec_req.tts_model,
- tts_voice_id=rec_req.tts_voice_id,
transcript=rec_req.transcript,
storage_key=rec_req.storage_key,
storage_backend=backend.value,
created_by=user.id,
+ tts_provider=rec_req.tts_provider,
+ tts_model=rec_req.tts_model,
+ tts_voice_id=rec_req.tts_voice_id,
metadata=rec_req.metadata,
)
results.append(_build_response(recording))
logger.info(
- f"Created {len(results)} recording(s) for "
- f"workflow {request.recordings[0].workflow_id}"
+ f"Created {len(results)} recording(s) for org {user.selected_organization_id}"
)
return BatchRecordingCreateResponseSchema(recordings=results)
@@ -185,7 +181,7 @@ async def list_recordings(
] = None,
user=Depends(get_user),
):
- """List recordings for the organization, optionally filtered by workflow and TTS configuration."""
+ """List recordings for the organization, optionally filtered."""
try:
recordings = await db_client.get_recordings(
organization_id=user.selected_organization_id,
@@ -256,7 +252,6 @@ async def update_recording(
if not new_id:
raise HTTPException(status_code=400, detail="Recording ID cannot be empty")
- # Look up by integer PK — globally unique, no ambiguity
existing = await db_client.get_recording_by_id(
id, user.selected_organization_id
)
@@ -266,16 +261,17 @@ async def update_recording(
if new_id == existing.recording_id:
return _build_response(existing)
- # Check if the new ID is already taken within this org + workflow
exists = await db_client.check_recording_id_exists(
- new_id, user.selected_organization_id, existing.workflow_id
+ new_id, user.selected_organization_id
)
if exists:
raise HTTPException(
status_code=409,
- detail=f"Recording ID '{new_id}' is already in use in this workflow",
+ detail=f"Recording ID '{new_id}' is already in use",
)
+ old_id = existing.recording_id
+
recording = await db_client.update_recording_id(
id=id,
new_recording_id=new_id,
@@ -285,6 +281,18 @@ async def update_recording(
if not recording:
raise HTTPException(status_code=404, detail="Recording not found")
+ # Replace old recording ID in all non-legacy workflow definitions
+ updated = await db_client.replace_recording_id_in_workflows(
+ old_id=old_id,
+ new_id=new_id,
+ organization_id=user.selected_organization_id,
+ )
+ if updated:
+ logger.info(
+ f"Updated {updated} workflow definition(s) with new recording ID "
+ f"'{old_id}' -> '{new_id}'"
+ )
+
return _build_response(recording)
except HTTPException:
diff --git a/api/schemas/workflow_recording.py b/api/schemas/workflow_recording.py
index 894ac23..0c2d358 100644
--- a/api/schemas/workflow_recording.py
+++ b/api/schemas/workflow_recording.py
@@ -32,7 +32,6 @@ class FileDescriptor(BaseModel):
class BatchRecordingUploadRequestSchema(BaseModel):
"""Request schema for getting presigned upload URLs for one or more files."""
- workflow_id: int = Field(..., description="Workflow ID these recordings belong to")
files: List[FileDescriptor] = Field(
..., min_length=1, max_length=20, description="List of files to upload"
)
@@ -50,10 +49,13 @@ class RecordingCreateRequestSchema(BaseModel):
"""Request schema for creating a recording record after upload."""
recording_id: str = Field(..., description="Short recording ID from upload step")
- workflow_id: int = Field(..., description="Workflow ID")
- tts_provider: str = Field(..., description="TTS provider (e.g. elevenlabs)")
- tts_model: str = Field(..., description="TTS model name")
- tts_voice_id: str = Field(..., description="TTS voice identifier")
+ tts_provider: Optional[str] = Field(
+ default=None, description="TTS provider (e.g. elevenlabs)"
+ )
+ tts_model: Optional[str] = Field(default=None, description="TTS model name")
+ tts_voice_id: Optional[str] = Field(
+ default=None, description="TTS voice identifier"
+ )
transcript: str = Field(
..., description="User-provided transcript of the recording"
)
@@ -68,11 +70,11 @@ class RecordingResponseSchema(BaseModel):
id: int
recording_id: str
- workflow_id: int
+ workflow_id: Optional[int] = None
organization_id: int
- tts_provider: str
- tts_model: str
- tts_voice_id: str
+ tts_provider: Optional[str] = None
+ tts_model: Optional[str] = None
+ tts_voice_id: Optional[str] = None
transcript: str
storage_key: str
storage_backend: str
@@ -105,7 +107,8 @@ class RecordingUpdateRequestSchema(BaseModel):
...,
min_length=1,
max_length=64,
- description="New descriptive recording ID",
+ pattern=r"^[a-zA-Z0-9_-]+$",
+ description="New descriptive recording ID (letters, numbers, hyphens, underscores only)",
)
diff --git a/api/services/pipecat/audio_playback.py b/api/services/pipecat/audio_playback.py
new file mode 100644
index 0000000..f0b058e
--- /dev/null
+++ b/api/services/pipecat/audio_playback.py
@@ -0,0 +1,188 @@
+"""Utilities for playing audio through the pipeline transport.
+
+Provides one-shot and looping playback of raw PCM audio. All playback
+should be routed through ``transport.output().queue_frame`` so the audio
+reaches the caller without passing through STT (which would otherwise
+generate phantom transcriptions).
+"""
+
+import asyncio
+import uuid
+from typing import Awaitable, Callable, Dict, Optional, Tuple
+
+import numpy as np
+from loguru import logger
+
+from pipecat.frames.frames import (
+ Frame,
+ OutputAudioRawFrame,
+ TTSAudioRawFrame,
+ TTSStartedFrame,
+ TTSStoppedFrame,
+ TTSTextFrame,
+)
+
+try:
+ import soundfile as sf
+except ModuleNotFoundError as e:
+ logger.error(f"Exception: {e}")
+ logger.error("In order to use audio playback, you need to `pip install soundfile`.")
+ raise Exception(f"Missing module: {e}")
+
+
+# ---------------------------------------------------------------------------
+# Audio file loading / caching
+# ---------------------------------------------------------------------------
+
+_audio_cache: Dict[Tuple[str, int], bytes] = {}
+
+
+def load_audio_file(file_path: str, sample_rate: int) -> Optional[bytes]:
+ """Load an audio file as PCM-16 bytes, caching the result.
+
+ Args:
+ file_path: Path to a WAV audio file.
+ sample_rate: Target sample rate (used as cache key; no resampling
+ is performed here).
+
+ Returns:
+ Raw PCM-16 bytes, or *None* on failure.
+ """
+ cache_key = (file_path, sample_rate)
+ if cache_key in _audio_cache:
+ logger.debug(f"Using cached audio for {file_path} at {sample_rate}Hz")
+ return _audio_cache[cache_key]
+
+ try:
+ logger.info(f"Loading audio from {file_path} at {sample_rate}Hz")
+ sound, file_sample_rate = sf.read(file_path, dtype="int16")
+ logger.info(
+ f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}"
+ )
+
+ # Ensure mono (take first channel if stereo)
+ if len(sound.shape) > 1:
+ sound = sound[:, 0]
+
+ if file_sample_rate != sample_rate:
+ logger.warning(
+ f"Audio file has sample rate {file_sample_rate}, expected {sample_rate}"
+ )
+
+ audio_bytes = sound.astype(np.int16).tobytes()
+ _audio_cache[cache_key] = audio_bytes
+ logger.info(f"Audio loaded: {len(sound)} samples at {sample_rate}Hz")
+ return audio_bytes
+
+ except Exception as e:
+ logger.error(f"Failed to load audio file {file_path}: {e}")
+ return None
+
+
+def clear_audio_cache() -> None:
+ """Clear the audio file cache to free memory."""
+ _audio_cache.clear()
+ logger.info("Audio cache cleared")
+
+
+# ---------------------------------------------------------------------------
+# Playback helpers
+# ---------------------------------------------------------------------------
+
+
+async def play_audio(
+ audio_data: bytes,
+ *,
+ sample_rate: int,
+ queue_frame: Callable[[Frame], Awaitable[None]],
+ transcript: Optional[str] = None,
+ append_to_context: bool = False,
+) -> None:
+ """Play raw PCM-16 audio once.
+
+ Pushes ``TTSStarted -> TTSAudioRaw -> TTSStopped`` so downstream
+ processors (audio buffer, context aggregators) handle the audio
+ correctly.
+
+ When *transcript* is provided a ``TTSTextFrame`` is also pushed so
+ that observers (e.g. ``RealtimeFeedbackObserver``) can relay the
+ spoken text to the UI.
+
+ Args:
+ audio_data: Raw 16-bit mono PCM bytes.
+ sample_rate: Pipeline sample rate (e.g. 16000).
+ queue_frame: Frame sink -- typically ``transport.output().queue_frame``.
+ transcript: Optional transcript of the recording.
+ append_to_context: Whether the transcript should be appended to
+ the LLM assistant context. Defaults to False.
+ """
+ context_id = str(uuid.uuid4())
+ await queue_frame(TTSStartedFrame(context_id=context_id))
+ if transcript:
+ tts_text = TTSTextFrame(
+ text=transcript, aggregated_by="recording", context_id=context_id
+ )
+ tts_text.append_to_context = append_to_context
+ await queue_frame(tts_text)
+ await queue_frame(
+ TTSAudioRawFrame(
+ audio=audio_data,
+ sample_rate=sample_rate,
+ num_channels=1,
+ context_id=context_id,
+ )
+ )
+ await queue_frame(TTSStoppedFrame(context_id=context_id))
+
+
+async def play_audio_loop(
+ *,
+ stop_event: asyncio.Event,
+ sample_rate: int,
+ queue_frame: Callable[[Frame], Awaitable[None]],
+ audio_file: Optional[str] = None,
+) -> None:
+ """Play audio in a loop until *stop_event* is set.
+
+ Used for hold music during call transfers and ringers during
+ pre-call data fetches.
+
+ Args:
+ stop_event: Set this event to terminate the loop.
+ sample_rate: Target sample rate for audio playback.
+ queue_frame: Frame sink -- typically ``transport.output().queue_frame``.
+ audio_file: Path to a WAV file. When *None* the default
+ ``transfer_hold_ring_{sample_rate}.wav`` asset is used.
+ """
+ if audio_file is None:
+ from api.constants import APP_ROOT_DIR
+
+ audio_file = str(
+ APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav"
+ )
+
+ audio_data = load_audio_file(audio_file, sample_rate)
+ if not audio_data:
+ logger.warning(f"Audio loop: failed to load {audio_file}, skipping")
+ return
+
+ num_samples = len(audio_data) // 2 # 16-bit PCM = 2 bytes per sample
+ duration = num_samples / sample_rate
+
+ logger.debug(f"Audio loop: playing at {sample_rate}Hz")
+ try:
+ while not stop_event.is_set():
+ frame = OutputAudioRawFrame(
+ audio=audio_data,
+ sample_rate=sample_rate,
+ num_channels=1,
+ )
+ await queue_frame(frame)
+ try:
+ await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5)
+ break
+ except asyncio.TimeoutError:
+ pass
+ except Exception as e:
+ logger.error(f"Audio loop error: {e}")
+ logger.debug("Audio loop: stopped")
diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py
index da5a7cd..410751f 100644
--- a/api/services/pipecat/event_handlers.py
+++ b/api/services/pipecat/event_handlers.py
@@ -6,17 +6,16 @@ from api.db import db_client
from api.enums import WorkflowRunState
from api.services.campaign.circuit_breaker import circuit_breaker
from api.services.pipecat.audio_config import AudioConfig
+from api.services.pipecat.audio_playback import play_audio, play_audio_loop
from api.services.pipecat.in_memory_buffers import (
InMemoryAudioBuffer,
InMemoryLogsBuffer,
)
from api.services.pipecat.pipeline_metrics_aggregator import PipelineMetricsAggregator
-from api.services.pipecat.recording_playback import queue_recording_audio
from api.services.pipecat.tracing_config import get_trace_url
from api.services.workflow.pipecat_engine import PipecatEngine
from api.tasks.arq import enqueue_job
from api.tasks.function_names import FunctionNames
-from api.utils.hold_audio import play_hold_audio_loop
from pipecat.frames.frames import (
Frame,
LLMContextFrame,
@@ -90,7 +89,11 @@ def register_event_handlers(
stop_ringer = asyncio.Event()
sample_rate = audio_config.pipeline_sample_rate or 16000
ringer_task = asyncio.create_task(
- play_hold_audio_loop(task, stop_ringer, sample_rate)
+ play_audio_loop(
+ stop_event=stop_ringer,
+ sample_rate=sample_rate,
+ queue_frame=transport.output().queue_frame,
+ )
)
try:
fetch_result = await pre_call_fetch_task
@@ -127,12 +130,16 @@ def register_event_handlers(
and fetch_recording_audio
):
logger.debug(f"Playing audio greeting recording: {greeting_value}")
- audio_data = await fetch_recording_audio(greeting_value)
- if audio_data:
- await queue_recording_audio(
- audio_data,
+ result = await fetch_recording_audio(
+ recording_pk=int(greeting_value)
+ )
+ if result:
+ await play_audio(
+ result.audio,
sample_rate=audio_config.pipeline_sample_rate or 16000,
- queue_frame=task.queue_frame,
+ queue_frame=transport.output().queue_frame,
+ transcript=result.transcript,
+ append_to_context=True,
)
else:
logger.warning(
diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py
index fe5f641..35b393d 100644
--- a/api/services/pipecat/realtime_feedback_observer.py
+++ b/api/services/pipecat/realtime_feedback_observer.py
@@ -170,7 +170,10 @@ class RealtimeFeedbackObserver(BaseObserver):
frame_direction = data.direction
# Skip already processed frames (frames can be observed multiple times)
- if frame.id in self._frames_seen:
+ if (
+ frame.id in self._frames_seen
+ or frame_direction != FrameDirection.DOWNSTREAM
+ ):
return
self._frames_seen.add(frame.id)
diff --git a/api/services/pipecat/recording_audio_cache.py b/api/services/pipecat/recording_audio_cache.py
index 0c1dfe9..23be271 100644
--- a/api/services/pipecat/recording_audio_cache.py
+++ b/api/services/pipecat/recording_audio_cache.py
@@ -7,7 +7,7 @@ subsequent plays (even from other workers) are instantaneous.
"""
import os
-from typing import Awaitable, Callable, Optional
+from typing import Awaitable, Callable, NamedTuple, Optional
import numpy as np
from loguru import logger
@@ -22,17 +22,23 @@ from .audio_file_cache import (
write_cache_file,
)
+
+class RecordingAudio(NamedTuple):
+ """Audio bytes paired with the recording's transcript (when available)."""
+
+ audio: bytes
+ transcript: Optional[str] = None
+
+
# ---------------------------------------------------------------------------
# Cache path helper
# ---------------------------------------------------------------------------
-def _cache_path(
- organization_id: int, workflow_id: int, recording_id: str, sample_rate: int
-) -> str:
+def _cache_path(organization_id: int, recording_id: str, sample_rate: int) -> str:
"""Return the on-disk path for a cached PCM file."""
return os.path.join(
- CACHE_DIR, f"{organization_id}_{workflow_id}_{recording_id}_{sample_rate}.pcm"
+ CACHE_DIR, f"{organization_id}_{recording_id}_{sample_rate}.pcm"
)
@@ -43,59 +49,96 @@ def _cache_path(
def create_recording_audio_fetcher(
organization_id: int,
- workflow_id: int,
pipeline_sample_rate: int,
-) -> Callable[[str], Awaitable[Optional[bytes]]]:
- """Create an async callback that returns raw PCM bytes for a recording_id.
+) -> Callable[..., Awaitable[Optional[bytes]]]:
+ """Create an async callback that returns raw PCM bytes for a recording.
- The returned callable:
- 1. Checks the filesystem cache (keyed by org/workflow/recording + sample rate).
- 2. On miss, looks up the recording in the DB, downloads the audio file
- from S3/MinIO, converts it to 16-bit mono PCM at *pipeline_sample_rate*,
- trims leading/trailing silence, caches the result on disk, and returns it.
+ The returned callable accepts **one** of two keyword arguments:
+ - ``recording_pk`` – the immutable integer primary key (used by
+ dropdown-based selections: greeting, edges, tool configs).
+ - ``recording_id`` – the human-readable string ID (used by
+ prompt-based ``RECORDING_ID: xxx`` references).
+
+ Flow:
+ 1. Checks the filesystem cache (keyed by org + pk + sample rate).
+ 2. On miss, looks up the recording in the DB, downloads the audio
+ from S3/MinIO, converts to 16-bit mono PCM, trims silence, and
+ caches the result on disk.
Args:
organization_id: Organization owning the recordings.
- workflow_id: Workflow the recordings belong to.
pipeline_sample_rate: Target PCM sample rate for the pipeline.
-
- Returns:
- ``async (recording_id: str) -> Optional[bytes]``
"""
from api.db import db_client
from api.services.storage import get_storage_for_backend
- # Resolve storage instances once per backend at creation time, not per fetch.
_storage_cache: dict[str, object] = {}
+ _transcript_cache: dict[str, Optional[str]] = {}
def _get_storage(backend: str):
if backend not in _storage_cache:
_storage_cache[backend] = get_storage_for_backend(backend)
return _storage_cache[backend]
- async def fetch(recording_id: str) -> Optional[bytes]:
- cached = _cache_path(
- organization_id, workflow_id, recording_id, pipeline_sample_rate
- )
+ async def _lookup_recording(
+ cache_key: str,
+ recording_pk: Optional[int],
+ recording_id: Optional[str],
+ ):
+ """DB lookup with transcript caching."""
+ if recording_pk is not None:
+ recording = await db_client.get_recording_by_id(
+ recording_pk, organization_id
+ )
+ else:
+ recording = await db_client.get_recording_by_recording_id(
+ recording_id, organization_id
+ )
+ if recording:
+ _transcript_cache[cache_key] = recording.transcript or None
+ return recording
+
+ async def fetch(
+ *,
+ recording_pk: Optional[int] = None,
+ recording_id: Optional[str] = None,
+ ) -> Optional[RecordingAudio]:
+ if recording_pk is None and recording_id is None:
+ logger.warning("fetch called with neither recording_pk nor recording_id")
+ return None
+
+ # Use pk for cache key when available, otherwise recording_id
+ cache_key = str(recording_pk) if recording_pk is not None else recording_id
+ cached = _cache_path(organization_id, cache_key, pipeline_sample_rate)
# 1. Serve from filesystem cache
if os.path.exists(cached):
- logger.debug(f"Recording {recording_id} served from disk cache")
- return read_cached_file(cached)
+ logger.debug(f"Recording {cache_key} served from disk cache")
+ audio = read_cached_file(cached)
+ # Transcript may already be in memory from a prior fetch;
+ # if not, do a lightweight DB lookup.
+ if cache_key not in _transcript_cache:
+ await _lookup_recording(cache_key, recording_pk, recording_id)
+ return RecordingAudio(
+ audio=audio, transcript=_transcript_cache.get(cache_key)
+ )
# 2. DB lookup
- recording = await db_client.get_recording_by_recording_id(
- recording_id, organization_id, workflow_id
- )
+ recording = await _lookup_recording(cache_key, recording_pk, recording_id)
+
if not recording:
- logger.warning(f"Recording {recording_id} not found in database")
+ logger.warning(f"Recording {cache_key} not found in database")
return None
# 3. Download, convert, trim, and cache
pcm_data = await _download_and_convert(
recording, pipeline_sample_rate, _get_storage
)
- return pcm_data
+ if pcm_data is None:
+ return None
+ return RecordingAudio(
+ audio=pcm_data, transcript=_transcript_cache.get(cache_key)
+ )
return fetch
@@ -106,11 +149,10 @@ def create_recording_audio_fetcher(
async def warm_recording_cache(
- workflow_id: int,
organization_id: int,
pipeline_sample_rate: int,
) -> None:
- """Pre-fetch all active recordings for a workflow into the disk cache.
+ """Pre-fetch all active recordings for an organization into the disk cache.
Launched as a background ``asyncio.Task`` at pipeline startup so that
recordings are ready before the first playback request. Errors are logged
@@ -120,9 +162,7 @@ async def warm_recording_cache(
from api.services.storage import get_storage_for_backend
try:
- recordings = await db_client.get_recordings(
- organization_id=organization_id, workflow_id=workflow_id
- )
+ recordings = await db_client.get_recordings(organization_id=organization_id)
if not recordings:
return
@@ -131,18 +171,19 @@ async def warm_recording_cache(
r
for r in recordings
if not os.path.exists(
- _cache_path(
- organization_id, workflow_id, r.recording_id, pipeline_sample_rate
- )
+ _cache_path(organization_id, str(r.id), pipeline_sample_rate)
+ )
+ and not os.path.exists(
+ _cache_path(organization_id, r.recording_id, pipeline_sample_rate)
)
]
if not uncached:
- logger.debug(f"Recording cache already warm for workflow {workflow_id}")
+ logger.debug(f"Recording cache already warm for org {organization_id}")
return
logger.info(
f"Warming recording cache: {len(uncached)}/{len(recordings)} "
- f"recording(s) for workflow {workflow_id}"
+ f"recording(s) for org {organization_id}"
)
# Resolve storage instances once per backend, not per recording
@@ -168,7 +209,7 @@ async def warm_recording_cache(
f"Cache warm: error processing {recording.recording_id}"
)
- logger.info(f"Recording cache warm complete for workflow {workflow_id}")
+ logger.info(f"Recording cache warm complete for org {organization_id}")
except Exception:
logger.exception("Recording cache warm failed")
@@ -201,7 +242,6 @@ async def _download_and_convert(
# Write to disk cache
cached = _cache_path(
recording.organization_id,
- recording.workflow_id,
recording.recording_id,
sample_rate,
)
diff --git a/api/services/pipecat/recording_playback.py b/api/services/pipecat/recording_playback.py
deleted file mode 100644
index 356eba5..0000000
--- a/api/services/pipecat/recording_playback.py
+++ /dev/null
@@ -1,41 +0,0 @@
-"""Shared helper for pushing pre-recorded audio frames into a pipeline."""
-
-import uuid
-from typing import Awaitable, Callable
-
-from pipecat.frames.frames import (
- Frame,
- TTSAudioRawFrame,
- TTSStartedFrame,
- TTSStoppedFrame,
-)
-
-
-async def queue_recording_audio(
- audio_data: bytes,
- *,
- sample_rate: int,
- queue_frame: Callable[[Frame], Awaitable[None]],
-) -> None:
- """Push TTSStarted → TTSAudioRaw → TTSStopped frames.
-
- This is the canonical way to play pre-recorded PCM audio through the
- pipeline outside of the RecordingRouterProcessor (which uses its own
- ``push_frame`` path).
-
- Args:
- audio_data: Raw 16-bit mono PCM bytes.
- sample_rate: Pipeline sample rate (e.g. 16000).
- queue_frame: Typically ``task.queue_frame``.
- """
- context_id = str(uuid.uuid4())
- await queue_frame(TTSStartedFrame(context_id=context_id))
- await queue_frame(
- TTSAudioRawFrame(
- audio=audio_data,
- sample_rate=sample_rate,
- num_channels=1,
- context_id=context_id,
- )
- )
- await queue_frame(TTSStoppedFrame(context_id=context_id))
diff --git a/api/services/pipecat/recording_router_processor.py b/api/services/pipecat/recording_router_processor.py
index d1291a9..5db501a 100644
--- a/api/services/pipecat/recording_router_processor.py
+++ b/api/services/pipecat/recording_router_processor.py
@@ -245,8 +245,8 @@ class RecordingRouterProcessor(FrameProcessor):
"""
logger.info(f"Playing pre-recorded audio: {recording_id}")
- audio_data = await self._fetch_recording_audio(recording_id)
- if not audio_data:
+ result = await self._fetch_recording_audio(recording_id=recording_id)
+ if not result:
logger.warning(
f"Failed to fetch recording {recording_id}, no audio will play"
)
@@ -256,7 +256,7 @@ class RecordingRouterProcessor(FrameProcessor):
await self.push_frame(TTSStartedFrame(context_id=context_id))
await self.push_frame(
TTSAudioRawFrame(
- audio=audio_data,
+ audio=result.audio,
sample_rate=self._audio_sample_rate,
num_channels=1,
context_id=context_id,
@@ -264,10 +264,10 @@ class RecordingRouterProcessor(FrameProcessor):
)
await self.push_frame(TTSStoppedFrame(context_id=context_id))
- duration_secs = len(audio_data) / (self._audio_sample_rate * 2)
+ duration_secs = len(result.audio) / (self._audio_sample_rate * 2)
logger.debug(
f"Finished pushing recording {recording_id} "
- f"({len(audio_data)} bytes, {duration_secs:.1f}s)"
+ f"({len(result.audio)} bytes, {duration_secs:.1f}s)"
)
# ------------------------------------------------------------------
diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py
index dadaf4c..1463f26 100644
--- a/api/services/pipecat/run_pipeline.py
+++ b/api/services/pipecat/run_pipeline.py
@@ -695,9 +695,7 @@ async def _run_pipeline(
# Check if the workflow has any active recordings so the engine can
# include recording response mode instructions in all node prompts.
- has_recordings = await db_client.has_active_recordings(
- workflow_id, workflow.organization_id
- )
+ has_recordings = await db_client.has_active_recordings(workflow.organization_id)
context_compaction_enabled = (workflow.workflow_configurations or {}).get(
"context_compaction_enabled", False
@@ -832,7 +830,6 @@ async def _run_pipeline(
# and audio transition speech)
fetch_audio = create_recording_audio_fetcher(
organization_id=workflow.organization_id,
- workflow_id=workflow_id,
pipeline_sample_rate=audio_config.pipeline_sample_rate,
)
engine.set_fetch_recording_audio(fetch_audio)
@@ -885,7 +882,6 @@ async def _run_pipeline(
# before the first playback request.
asyncio.create_task(
warm_recording_cache(
- workflow_id=workflow_id,
organization_id=workflow.organization_id,
pipeline_sample_rate=audio_config.pipeline_sample_rate,
)
@@ -920,8 +916,9 @@ async def _run_pipeline(
# Create pipeline task with audio configuration
task = create_pipeline_task(pipeline, workflow_run_id, audio_config)
- # Now set the task on the engine
+ # Now set the task and transport output on the engine
engine.set_task(task)
+ engine.set_transport_output(transport.output())
# Initialize the engine to set the initial context with
# System Prompt and Tools
diff --git a/api/services/pipecat/service_factory.py b/api/services/pipecat/service_factory.py
index 6a6bfe3..14c73ae 100644
--- a/api/services/pipecat/service_factory.py
+++ b/api/services/pipecat/service_factory.py
@@ -230,7 +230,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
api_key=user_config.tts.api_key,
settings=DeepgramTTSSettings(voice=user_config.tts.voice),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.OPENAI.value:
@@ -238,7 +238,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
api_key=user_config.tts.api_key,
settings=OpenAITTSSettings(model=user_config.tts.model),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.ELEVENLABS.value:
@@ -258,7 +258,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
similarity_boost=0.75,
),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.CARTESIA.value:
@@ -284,7 +284,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
),
),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.DOGRAH.value:
@@ -299,7 +299,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
speed=user_config.tts.speed,
),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.CAMB.value:
@@ -312,7 +312,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
voice_id=voice_id,
model=user_config.tts.model,
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
)
# Set language directly as BCP-47 code (bypasses Language enum conversion)
tts._settings.language = language
@@ -327,7 +327,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
speed=user_config.tts.speed,
),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.RIME.value:
@@ -352,7 +352,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
api_key=user_config.tts.api_key,
settings=RimeTTSSettings(**settings_kwargs),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
elif user_config.tts.provider == ServiceProviders.SARVAM.value:
@@ -382,7 +382,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"):
language=pipecat_language,
),
text_filters=[xml_function_tag_filter],
- skip_aggregator_types=["recording_router"],
+ skip_aggregator_types=["recording_router", "recording"],
silence_time_s=1.0,
)
else:
diff --git a/api/services/workflow/duplicate.py b/api/services/workflow/duplicate.py
index f88682b..dc7b4c5 100644
--- a/api/services/workflow/duplicate.py
+++ b/api/services/workflow/duplicate.py
@@ -1,4 +1,4 @@
-"""Service for duplicating workflows including recordings."""
+"""Service for duplicating workflows."""
import copy
import posixpath
@@ -44,7 +44,9 @@ async def duplicate_workflow(
organization_id: int,
user_id: int,
):
- """Duplicate a workflow including its definition, config, recordings, and triggers.
+ """Duplicate a workflow including its definition, config, and triggers.
+
+ Recordings are org-scoped and shared, so they are not duplicated.
Args:
workflow_id: The source workflow ID to duplicate
@@ -118,15 +120,7 @@ async def duplicate_workflow(
organization_id=organization_id,
)
- # 6. Copy recordings (recording_ids are preserved since they're scoped per workflow)
- await _duplicate_recordings(
- source_workflow_id=workflow_id,
- new_workflow_id=new_workflow.id,
- organization_id=organization_id,
- user_id=user_id,
- )
-
- # 7. Sync triggers for the new workflow
+ # 6. Sync triggers for the new workflow
if workflow_definition:
trigger_paths = _extract_trigger_paths(workflow_definition)
if trigger_paths:
@@ -139,66 +133,6 @@ async def duplicate_workflow(
return new_workflow
-async def _duplicate_recordings(
- source_workflow_id: int,
- new_workflow_id: int,
- organization_id: int,
- user_id: int,
-) -> None:
- """Duplicate all recordings for a workflow.
-
- Copies each recording file to a new storage path scoped under the new
- workflow ID. Recording IDs are preserved since they are unique per
- (org, workflow).
- """
- recordings = await db_client.get_recordings(
- workflow_id=source_workflow_id,
- organization_id=organization_id,
- )
-
- if not recordings:
- return
-
- for rec in recordings:
- try:
- # Build new storage key: recordings/{org_id}/{new_workflow_id}/{recording_id}/{filename}
- filename = posixpath.basename(rec.storage_key)
- new_storage_key = (
- f"recordings/{organization_id}"
- f"/{new_workflow_id}/{rec.recording_id}"
- f"/{filename}"
- )
-
- copied = await _copy_storage_object(
- rec.storage_key, new_storage_key, rec.storage_backend
- )
- if not copied:
- logger.warning(
- f"Failed to copy recording file {rec.recording_id}, skipping"
- )
- continue
-
- await db_client.create_recording(
- recording_id=rec.recording_id,
- workflow_id=new_workflow_id,
- organization_id=organization_id,
- tts_provider=rec.tts_provider,
- tts_model=rec.tts_model,
- tts_voice_id=rec.tts_voice_id,
- transcript=rec.transcript,
- storage_key=new_storage_key,
- storage_backend=rec.storage_backend,
- created_by=user_id,
- metadata=copy.deepcopy(rec.recording_metadata),
- )
-
- logger.info(f"Duplicated recording {rec.recording_id}")
-
- except Exception as e:
- logger.error(f"Error duplicating recording {rec.recording_id}: {e}")
- continue
-
-
async def _copy_storage_object(
source_key: str, dest_key: str, storage_backend: str
) -> bool:
diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py
index 0184d16..784de4d 100644
--- a/api/services/workflow/pipecat_engine.py
+++ b/api/services/workflow/pipecat_engine.py
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Union
-from api.services.pipecat.recording_playback import queue_recording_audio
+from api.services.pipecat.audio_playback import play_audio
from api.services.workflow.disposition_mapper import (
apply_disposition_mapping,
get_organization_id_from_workflow_run,
@@ -115,6 +115,10 @@ class PipecatEngine:
# Audio configuration (set via set_audio_config from _run_pipeline)
self._audio_config = None
+ # Transport output processor for injecting audio directly into the
+ # output, bypassing STT (set via set_transport_output from _run_pipeline)
+ self._transport_output = None
+
# Recording audio fetcher (set via set_fetch_recording_audio from _run_pipeline)
self._fetch_recording_audio = None
@@ -221,16 +225,17 @@ class PipecatEngine:
f"Playing transition audio: {transition_speech_recording_id}"
)
self._queued_speech_mute_state = "waiting"
- audio_data = await self._fetch_recording_audio(
- transition_speech_recording_id
+ result = await self._fetch_recording_audio(
+ recording_pk=int(transition_speech_recording_id)
)
- if audio_data:
- await queue_recording_audio(
- audio_data,
+ if result:
+ await play_audio(
+ result.audio,
sample_rate=self._audio_config.pipeline_sample_rate
if self._audio_config
else 16000,
- queue_frame=self.task.queue_frame,
+ queue_frame=self._transport_output.queue_frame,
+ transcript=result.transcript,
)
else:
logger.warning(
@@ -753,6 +758,14 @@ class PipecatEngine:
"""Set the audio configuration for the pipeline."""
self._audio_config = audio_config
+ def set_transport_output(self, transport_output) -> None:
+ """Set the transport output processor for direct audio playback.
+
+ Audio queued here bypasses STT and the rest of the pipeline,
+ going straight to the caller.
+ """
+ self._transport_output = transport_output
+
def set_fetch_recording_audio(self, fetch_fn) -> None:
"""Set the recording audio fetcher callback."""
self._fetch_recording_audio = fetch_fn
diff --git a/api/services/workflow/pipecat_engine_callbacks.py b/api/services/workflow/pipecat_engine_callbacks.py
index 8fb1511..83990bf 100644
--- a/api/services/workflow/pipecat_engine_callbacks.py
+++ b/api/services/workflow/pipecat_engine_callbacks.py
@@ -168,7 +168,6 @@ def create_aggregation_correction_callback(engine: "PipecatEngine"):
reference = engine._current_llm_generation_reference_text
if not reference:
- logger.warning("No reference text available for aggregation correction")
return corrupted
# Apply the correction algorithm
diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py
index e120941..0ee12fd 100644
--- a/api/services/workflow/pipecat_engine_custom_tools.py
+++ b/api/services/workflow/pipecat_engine_custom_tools.py
@@ -16,7 +16,7 @@ from loguru import logger
from api.db import db_client
from api.enums import ToolCategory, WorkflowRunMode
-from api.services.pipecat.recording_playback import queue_recording_audio
+from api.services.pipecat.audio_playback import play_audio, play_audio_loop
from api.services.telephony.call_transfer_manager import get_call_transfer_manager
from api.services.telephony.factory import get_telephony_provider
from api.services.telephony.transfer_event_protocol import TransferContext
@@ -28,7 +28,6 @@ from api.services.workflow.tools.custom_tool import (
execute_http_tool,
tool_to_function_schema,
)
-from api.utils.hold_audio import play_hold_audio_loop
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.frames.frames import (
FunctionCallResultProperties,
@@ -88,20 +87,23 @@ class CustomToolManager:
message_type = config.get("messageType", "none")
if message_type == "audio":
- recording_id = config.get("audioRecordingId", "")
- if recording_id and self._engine._fetch_recording_audio:
- audio_data = await self._engine._fetch_recording_audio(recording_id)
- if audio_data:
- await queue_recording_audio(
- audio_data,
+ recording_pk = config.get("audioRecordingId")
+ if recording_pk and self._engine._fetch_recording_audio:
+ result = await self._engine._fetch_recording_audio(
+ recording_pk=int(recording_pk)
+ )
+ if result:
+ await play_audio(
+ result.audio,
sample_rate=self._engine._audio_config.pipeline_sample_rate
if self._engine._audio_config
else 16000,
- queue_frame=self._engine.task.queue_frame,
+ queue_frame=self._engine._transport_output.queue_frame,
+ transcript=result.transcript,
)
return True
else:
- logger.warning(f"Failed to fetch recording {recording_id}")
+ logger.warning(f"Failed to fetch recording pk={recording_pk}")
return False
if message_type == "custom":
@@ -292,22 +294,23 @@ class CustomToolManager:
custom_msg_type = config.get("customMessageType", "text")
custom_message = config.get("customMessage", "")
if custom_msg_type == "audio":
- recording_id = config.get("customMessageRecordingId", "")
- if recording_id and self._engine._fetch_recording_audio:
+ recording_pk = config.get("customMessageRecordingId")
+ if recording_pk and self._engine._fetch_recording_audio:
logger.info(
- f"Playing audio message before HTTP tool: {recording_id}"
+ f"Playing audio message before HTTP tool: pk={recording_pk}"
)
self._engine._queued_speech_mute_state = "waiting"
- audio_data = await self._engine._fetch_recording_audio(
- recording_id
+ result = await self._engine._fetch_recording_audio(
+ recording_pk=int(recording_pk)
)
- if audio_data:
- await queue_recording_audio(
- audio_data,
+ if result:
+ await play_audio(
+ result.audio,
sample_rate=self._engine._audio_config.pipeline_sample_rate
if self._engine._audio_config
else 16000,
- queue_frame=self._engine.task.queue_frame,
+ queue_frame=self._engine._transport_output.queue_frame,
+ transcript=result.transcript,
)
elif custom_message:
logger.info(
@@ -587,10 +590,10 @@ class CustomToolManager:
# Start hold music as background task
hold_music_task = asyncio.create_task(
- play_hold_audio_loop(
- self._engine.task,
- hold_music_stop_event,
- sample_rate,
+ play_audio_loop(
+ stop_event=hold_music_stop_event,
+ sample_rate=sample_rate,
+ queue_frame=self._engine._transport_output.queue_frame,
)
)
diff --git a/api/utils/hold_audio.py b/api/utils/hold_audio.py
deleted file mode 100644
index c914082..0000000
--- a/api/utils/hold_audio.py
+++ /dev/null
@@ -1,151 +0,0 @@
-"""
-Hold audio utility for loading, caching, and playing hold music files.
-
-This module provides functionality to load hold music audio files at specific sample rates
-with caching to improve performance during multiple calls, and a reusable loop that queues
-audio frames until a stop event is set.
-"""
-
-import asyncio
-from typing import Dict, Optional, Tuple
-
-import numpy as np
-from loguru import logger
-
-from pipecat.frames.frames import OutputAudioRawFrame
-
-try:
- import soundfile as sf
-except ModuleNotFoundError as e:
- logger.error(f"Exception: {e}")
- logger.error("In order to use hold audio, you need to `pip install soundfile`.")
- raise Exception(f"Missing module: {e}")
-
-
-# Global cache for loaded hold music data
-_hold_audio_cache: Dict[Tuple[str, int], np.ndarray] = {}
-
-
-def load_hold_audio(file_path: str, sample_rate: int) -> Optional[bytes]:
- """Load hold music audio file at the specified sample rate with caching.
-
- Args:
- file_path: Path to the hold music audio file
- sample_rate: Target sample rate (8000 or 16000 Hz supported)
-
- Returns:
- Audio data as bytes (PCM16) or None if loading failed
- """
- cache_key = (file_path, sample_rate)
-
- # Check cache first
- if cache_key in _hold_audio_cache:
- logger.debug(f"Using cached hold audio for {file_path} at {sample_rate}Hz")
- audio_data = _hold_audio_cache[cache_key]
- return audio_data.tobytes()
-
- try:
- logger.info(f"Loading hold audio from {file_path} at {sample_rate}Hz")
-
- # Load audio file
- sound, file_sample_rate = sf.read(file_path, dtype="int16")
- logger.info(
- f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}"
- )
-
- # Ensure mono audio (take first channel if stereo)
- if len(sound.shape) > 1:
- sound = sound[:, 0]
-
- # Resample if needed
- if file_sample_rate != sample_rate:
- logger.warning(
- f"Hold music file has sample rate {file_sample_rate}, expected {sample_rate}"
- )
- # For now, we'll use the audio as-is and let the transport handle resampling
- # In a production system, you might want to use librosa or scipy for proper resampling
-
- # Convert to int16 and cache
- audio_data = sound.astype(np.int16)
- _hold_audio_cache[cache_key] = audio_data
-
- logger.info(
- f"Hold audio loaded successfully: {len(audio_data)} samples at {sample_rate}Hz"
- )
- return audio_data.tobytes()
-
- except Exception as e:
- logger.error(f"Failed to load hold audio file {file_path}: {e}")
- return None
-
-
-def clear_hold_audio_cache():
- """Clear the hold audio cache to free memory."""
- global _hold_audio_cache
- _hold_audio_cache.clear()
- logger.info("Hold audio cache cleared")
-
-
-def get_cache_info() -> Dict[str, int]:
- """Get information about the current cache state.
-
- Returns:
- Dictionary with cache statistics
- """
- return {
- "cached_files": len(_hold_audio_cache),
- "total_cache_size": sum(len(data) for data in _hold_audio_cache.values()),
- }
-
-
-async def play_hold_audio_loop(
- task,
- stop_event: asyncio.Event,
- sample_rate: int = 16000,
- hold_music_file: Optional[str] = None,
-) -> None:
- """Play hold/ring-back audio in a loop until *stop_event* is set.
-
- This is a shared helper used by call-transfer hold music and the
- pre-call data fetch ringer. The caller is responsible for creating
- the ``asyncio.Event`` and setting it when playback should stop.
-
- Args:
- task: A ``PipelineTask`` (or anything with ``queue_frame``).
- stop_event: Set this event to terminate the loop.
- sample_rate: Target sample rate for audio playback.
- hold_music_file: Path to a WAV file. When *None* the default
- ``transfer_hold_ring_{sample_rate}.wav`` asset is used.
- """
- if hold_music_file is None:
- from api.constants import APP_ROOT_DIR
-
- hold_music_file = str(
- APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav"
- )
-
- hold_audio_data = load_hold_audio(hold_music_file, sample_rate)
- if not hold_audio_data:
- logger.warning(f"Hold audio loop: failed to load {hold_music_file}, skipping")
- return
-
- num_samples = len(hold_audio_data) // 2 # 16-bit PCM = 2 bytes per sample
- duration = num_samples / sample_rate
-
- logger.debug(f"Hold audio loop: playing at {sample_rate}Hz")
- try:
- while not stop_event.is_set():
- frame = OutputAudioRawFrame(
- audio=hold_audio_data,
- sample_rate=sample_rate,
- num_channels=1,
- )
- await task.queue_frame(frame)
- try:
- await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5)
- break
- except asyncio.TimeoutError:
- pass
- except Exception as e:
- logger.error(f"Hold audio loop: error: {e}")
- logger.debug("Hold audio loop: stopped")
diff --git a/docs/voice-agent/pre-recorded-audio.mdx b/docs/voice-agent/pre-recorded-audio.mdx
index aca6dec..d0fe0aa 100644
--- a/docs/voice-agent/pre-recorded-audio.mdx
+++ b/docs/voice-agent/pre-recorded-audio.mdx
@@ -6,15 +6,6 @@ tag: "NEW"
Custom recordings allow you to build **hybrid voice agents** that use your own pre-recorded audio for key parts of the conversation, while falling back to LLM-generated speech (via a cloned voice) for dynamic responses. This gives you the best of both worlds — the emotional depth of real human speech and the flexibility of AI-generated dialogue.
-VIDEO
-
-
## Why use custom recordings?
- **Reduced TTS cost** — Pre-recorded audio is played directly, so you are not charged for TTS synthesis on those segments.
@@ -50,23 +41,20 @@ You can use any TTS provider that supports voice cloning. The steps will vary by
## Step 3: Upload recordings
-Navigate to your agent in the workflow builder and open the **Recordings** panel. You can either upload pre-recorded audio files or record directly in the browser.
+Navigate to the **Recordings** page in the Dograh dashboard. Recordings are shared across all agents in your organization. You can either upload pre-recorded audio files or record directly in the browser.
For each recording:
-1. Click **Record** (or upload a file).
-2. Speak the exact phrase you want the agent to use.
-3. Give the recording a descriptive name (e.g., `greeting`, `invitation`, `venue`).
-4. Verify the transcription is correct — edit it if needed.
-5. Click **Upload**.
+1. Click **Upload Recording**.
+2. Choose an audio file or click **Record** to record in the browser.
+3. Verify the transcription is correct — edit it if needed.
+4. Click **Upload**.
-
-Recordings are scoped to a specific **provider and Voice ID**. If you change either, you will need to re-upload your recordings to ensure consistency between the recorded audio and the cloned voice used for dynamic responses.
-
+You can rename a recording's ID at any time by clicking the edit icon next to it in the recordings list.
## Step 4: Build the workflow
-Open your agent's workflow and write the conversation flow in natural language. To insert a recording, type **`@`** in the prompt editor — this will show a list of all available recordings scoped to your current Voice ID.
+Open your agent's workflow and write the conversation flow in natural language. To insert a recording, type **`@`** in the prompt editor — this will show a list of all available recordings in your organization.
For any user question that falls outside your recordings, the agent automatically generates a dynamic response using the LLM, which is then synthesized using your cloned voice via TTS.
diff --git a/pipecat b/pipecat
index 002c095..5a2e4c8 160000
--- a/pipecat
+++ b/pipecat
@@ -1 +1 @@
-Subproject commit 002c095b2f15a11d4a6ffd85b86592821aa5cd62
+Subproject commit 5a2e4c89118264bfdfe91db059b01e226609f060
diff --git a/ui/src/app/files/page.tsx b/ui/src/app/files/page.tsx
index 54cdbca..f84b0ec 100644
--- a/ui/src/app/files/page.tsx
+++ b/ui/src/app/files/page.tsx
@@ -1,11 +1,18 @@
"use client";
-import { ExternalLink } from "lucide-react";
+import { ExternalLink, Upload } from "lucide-react";
import { useEffect, useState } from "react";
+import { Button } from "@/components/ui/button";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
+import {
+ Dialog,
+ DialogContent,
+ DialogDescription,
+ DialogHeader,
+ DialogTitle,
+} from "@/components/ui/dialog";
import { Skeleton } from "@/components/ui/skeleton";
-import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
import { useAuth } from "@/lib/auth";
import DocumentList from "./DocumentList";
@@ -14,6 +21,7 @@ import DocumentUpload from "./DocumentUpload";
export default function FilesPage() {
const { user, redirectToLogin, loading } = useAuth();
const [refreshKey, setRefreshKey] = useState(0);
+ const [isUploadOpen, setIsUploadOpen] = useState(false);
// Redirect if not authenticated
useEffect(() => {
@@ -23,8 +31,8 @@ export default function FilesPage() {
}, [loading, user, redirectToLogin]);
const handleUploadSuccess = () => {
- // Trigger refresh of document list
setRefreshKey(prev => prev + 1);
+ setIsUploadOpen(false);
};
if (loading || !user) {
@@ -50,44 +58,37 @@ export default function FilesPage() {
-
-
- All Files
- Upload New
-
-
-
-
-
+
+
+
+
Your Documents
- View and manage your uploaded documents
+ Documents shared across all agents in your organization
-
-
-
-
-
-
+
+
setIsUploadOpen(true)}>
+
+ Upload Document
+
+
+
+
+
+
+
-
-
-
- Upload Document
-
- Upload a PDF or document file to add to your knowledge base
-
-
-
-
-
-
-
-
+
+
+
+ Upload Document
+
+ Upload a PDF or document file to add to your knowledge base
+
+
+
+
+
);
}
diff --git a/ui/src/app/recordings/RecordingsList.tsx b/ui/src/app/recordings/RecordingsList.tsx
index e3fba86..dadd3e2 100644
--- a/ui/src/app/recordings/RecordingsList.tsx
+++ b/ui/src/app/recordings/RecordingsList.tsx
@@ -1,67 +1,33 @@
"use client";
import { AudioLines, Check, Pause, Pencil, Play, RefreshCw, Search, Trash2, X } from "lucide-react";
-import { useCallback, useEffect, useMemo, useRef, useState } from "react";
+import { useCallback, useEffect, useState } from "react";
import { toast } from "sonner";
import {
deleteRecordingApiV1WorkflowRecordingsRecordingIdDelete,
- getWorkflowsSummaryApiV1WorkflowSummaryGet,
listRecordingsApiV1WorkflowRecordingsGet,
updateRecordingApiV1WorkflowRecordingsIdPatch,
} from "@/client/sdk.gen";
-import type { RecordingResponseSchema, WorkflowSummaryResponse } from "@/client/types.gen";
-import { Badge } from "@/components/ui/badge";
+import type { RecordingResponseSchema } from "@/client/types.gen";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
-import {
- Select,
- SelectContent,
- SelectItem,
- SelectTrigger,
- SelectValue,
-} from "@/components/ui/select";
import { Skeleton } from "@/components/ui/skeleton";
import { useAudioPlayback } from "@/hooks/useAudioPlayback";
import logger from "@/lib/logger";
-const ALL_VALUE = "__all__";
-
-export default function RecordingsList() {
+export default function RecordingsList({ refreshKey }: { refreshKey?: number }) {
const [recordings, setRecordings] = useState([]);
- const [workflows, setWorkflows] = useState([]);
const [isLoading, setIsLoading] = useState(true);
const [searchQuery, setSearchQuery] = useState("");
const [error, setError] = useState(null);
- // Filters
- const [selectedWorkflow, setSelectedWorkflow] = useState(ALL_VALUE);
-
// Inline edit state
const [editingId, setEditingId] = useState(null);
const [editValue, setEditValue] = useState("");
+ const [editError, setEditError] = useState(null);
const { playingId, toggle: togglePlayback, stop: stopPlayback } = useAudioPlayback();
- const hasFetchedWorkflows = useRef(false);
-
- const workflowMap = useMemo(() => {
- const map = new Map();
- for (const w of workflows) {
- map.set(w.id, w.name);
- }
- return map;
- }, [workflows]);
-
- const fetchWorkflows = useCallback(async () => {
- try {
- const response = await getWorkflowsSummaryApiV1WorkflowSummaryGet();
- if (response.data) {
- setWorkflows(response.data);
- }
- } catch (err) {
- logger.error("Error fetching workflows:", err);
- }
- }, []);
const fetchRecordings = useCallback(async () => {
try {
@@ -69,9 +35,7 @@ export default function RecordingsList() {
setError(null);
const response = await listRecordingsApiV1WorkflowRecordingsGet({
- query: {
- workflow_id: selectedWorkflow !== ALL_VALUE ? Number(selectedWorkflow) : undefined,
- },
+ query: {},
});
if (response.error || !response.data) {
@@ -85,18 +49,11 @@ export default function RecordingsList() {
} finally {
setIsLoading(false);
}
- }, [selectedWorkflow]);
-
- useEffect(() => {
- if (!hasFetchedWorkflows.current) {
- hasFetchedWorkflows.current = true;
- fetchWorkflows();
- }
- }, [fetchWorkflows]);
+ }, []);
useEffect(() => {
fetchRecordings();
- }, [fetchRecordings]);
+ }, [fetchRecordings, refreshKey]);
const handleDelete = async (recordingId: string) => {
if (!confirm("Are you sure you want to delete this recording?")) return;
@@ -129,17 +86,23 @@ export default function RecordingsList() {
const startEditing = (rec: RecordingResponseSchema) => {
setEditingId(rec.recording_id);
setEditValue(rec.recording_id);
+ setEditError(null);
};
const cancelEditing = () => {
setEditingId(null);
setEditValue("");
+ setEditError(null);
};
const saveRecordingId = async (rec: RecordingResponseSchema) => {
const newId = editValue.trim();
if (!newId) {
- toast.error("Recording ID cannot be empty");
+ setEditError("ID cannot be empty");
+ return;
+ }
+ if (!/^[a-zA-Z0-9_-]+$/.test(newId)) {
+ setEditError("Only letters, numbers, hyphens, and underscores");
return;
}
if (newId === rec.recording_id) {
@@ -147,6 +110,7 @@ export default function RecordingsList() {
return;
}
+ setEditError(null);
try {
const response = await updateRecordingApiV1WorkflowRecordingsIdPatch({
path: { id: rec.id },
@@ -158,11 +122,11 @@ export default function RecordingsList() {
throw new Error(errData?.detail || "Failed to update recording ID");
}
- toast.success(`Recording ID updated to "${newId}"`);
+ toast.success(`Recording ID updated to "${newId}". All workflow references have been updated.`);
cancelEditing();
fetchRecordings();
} catch (err) {
- toast.error(err instanceof Error ? err.message : "Failed to update recording ID");
+ setEditError(err instanceof Error ? err.message : "Failed to update recording ID");
}
};
@@ -208,24 +172,6 @@ export default function RecordingsList() {
return (
- {/* Filter */}
-
- Voice Agent
-
-
-
-
-
- All agents
- {workflows.map((w) => (
-
- {w.name}
-
- ))}
-
-
-
-
{/* Search and Refresh */}
@@ -260,14 +206,13 @@ export default function RecordingsList() {
{searchQuery
? "No recordings match your search"
- : "No recordings found for the selected filters"}
+ : "No recordings yet"}
) : (
{filteredRecordings.map((rec) => {
const filename = (rec.metadata?.original_filename as string) || "";
- const workflowName = workflowMap.get(rec.workflow_id);
const isEditing = editingId === rec.recording_id;
return (
@@ -283,15 +228,15 @@ export default function RecordingsList() {
{/* Recording ID (editable) */}
{isEditing ? (
-
+
setEditValue(e.target.value)}
+ onChange={(e) => { setEditValue(e.target.value); setEditError(null); }}
onKeyDown={(e) => {
if (e.key === "Enter") saveRecordingId(rec);
if (e.key === "Escape") cancelEditing();
}}
- className="h-7 text-sm font-mono w-48"
+ className={`h-7 text-sm font-mono w-48 ${editError ? "border-destructive" : ""}`}
maxLength={64}
autoFocus
/>
@@ -311,27 +256,26 @@ export default function RecordingsList() {
>
+ {editError && (
+ {editError}
+ )}
) : (
-
+
{rec.recording_id}
startEditing(rec)}
>
+ Edit ID
)}
- {workflowName && (
-
- {workflowName}
-
- )}
{/* Filename */}
{filename && (
@@ -344,9 +288,6 @@ export default function RecordingsList() {
{rec.transcript}
- {rec.tts_provider}
- {rec.tts_model}
- {rec.tts_voice_id}
{formatDate(rec.created_at)}
diff --git a/ui/src/app/recordings/RecordingsUploadDialog.tsx b/ui/src/app/recordings/RecordingsUploadDialog.tsx
new file mode 100644
index 0000000..d991b7b
--- /dev/null
+++ b/ui/src/app/recordings/RecordingsUploadDialog.tsx
@@ -0,0 +1,465 @@
+"use client";
+
+import { Loader2, Mic, Square, Upload, X } from "lucide-react";
+import { useCallback, useEffect, useRef, useState } from "react";
+
+import {
+ createRecordingsApiV1WorkflowRecordingsPost,
+ getUploadUrlsApiV1WorkflowRecordingsUploadUrlPost,
+ transcribeAudioApiV1WorkflowRecordingsTranscribePost,
+} from "@/client";
+import type { RecordingUploadResponseSchema } from "@/client/types.gen";
+import { Button } from "@/components/ui/button";
+import {
+ Dialog,
+ DialogContent,
+ DialogDescription,
+ DialogHeader,
+ DialogTitle,
+} from "@/components/ui/dialog";
+import { Input } from "@/components/ui/input";
+import { Label } from "@/components/ui/label";
+import {
+ Select,
+ SelectContent,
+ SelectItem,
+ SelectTrigger,
+ SelectValue,
+} from "@/components/ui/select";
+import { Textarea } from "@/components/ui/textarea";
+import { LANGUAGE_DISPLAY_NAMES } from "@/constants/languages";
+interface RecordingsUploadDialogProps {
+ open: boolean;
+ onOpenChange: (open: boolean) => void;
+ onUploadComplete?: () => void;
+}
+
+const MAX_FILE_SIZE = 5 * 1024 * 1024; // 5MB
+
+interface PendingFile {
+ id: string;
+ file: File;
+ transcript: string;
+ isTranscribing: boolean;
+ error?: string;
+}
+
+let pendingFileCounter = 0;
+
+export const RecordingsUploadDialog = ({
+ open,
+ onOpenChange,
+ onUploadComplete,
+}: RecordingsUploadDialogProps) => {
+ const [uploading, setUploading] = useState(false);
+ const [pendingFiles, setPendingFiles] = useState
([]);
+ const [error, setError] = useState(null);
+ const [language, setLanguage] = useState("multi");
+ const [recordingStep, setRecordingStep] = useState<"idle" | "naming" | "recording">("idle");
+ const [recordingFilename, setRecordingFilename] = useState("");
+ const [recordingDuration, setRecordingDuration] = useState(0);
+ const mediaRecorderRef = useRef(null);
+ const audioChunksRef = useRef([]);
+ const recordingTimerRef = useRef | null>(null);
+ const fileInputRef = useRef(null);
+ const languageRef = useRef(language);
+ languageRef.current = language;
+
+ const stopRecordingTimer = useCallback(() => {
+ if (recordingTimerRef.current) {
+ clearInterval(recordingTimerRef.current);
+ recordingTimerRef.current = null;
+ }
+ }, []);
+
+ const stopRecording = useCallback(() => {
+ if (mediaRecorderRef.current && mediaRecorderRef.current.state !== "inactive") {
+ mediaRecorderRef.current.stop();
+ }
+ }, []);
+
+ const resetRecordingState = useCallback(() => {
+ setRecordingStep("idle");
+ setRecordingFilename("");
+ setRecordingDuration(0);
+ }, []);
+
+ useEffect(() => {
+ if (open) {
+ setError(null);
+ setPendingFiles([]);
+ setLanguage("multi");
+ resetRecordingState();
+ }
+ }, [open, resetRecordingState]);
+
+ useEffect(() => {
+ if (!open) {
+ stopRecording();
+ stopRecordingTimer();
+ }
+ }, [open, stopRecording, stopRecordingTimer]);
+
+ const transcribeFile = async (pendingId: string, file: File) => {
+ setPendingFiles((prev) =>
+ prev.map((p) => (p.id === pendingId ? { ...p, isTranscribing: true } : p))
+ );
+ try {
+ const currentLang = languageRef.current;
+ const result = await transcribeAudioApiV1WorkflowRecordingsTranscribePost({
+ body: { file, language: currentLang },
+ });
+ const data = result.data as Record | undefined;
+ if (data?.transcript) {
+ setPendingFiles((prev) =>
+ prev.map((p) =>
+ p.id === pendingId ? { ...p, transcript: data.transcript as string, isTranscribing: false } : p
+ )
+ );
+ } else {
+ setPendingFiles((prev) =>
+ prev.map((p) => (p.id === pendingId ? { ...p, isTranscribing: false } : p))
+ );
+ }
+ } catch {
+ setPendingFiles((prev) =>
+ prev.map((p) =>
+ p.id === pendingId
+ ? { ...p, isTranscribing: false, error: "Auto-transcription failed" }
+ : p
+ )
+ );
+ }
+ };
+
+ const addPendingFiles = (files: File[]) => {
+ const valid: PendingFile[] = [];
+ for (const file of files) {
+ if (file.size > MAX_FILE_SIZE) {
+ setError(`${file.name} (${(file.size / (1024 * 1024)).toFixed(1)}MB) exceeds 5MB limit — skipped.`);
+ continue;
+ }
+ const id = `pending-${++pendingFileCounter}`;
+ valid.push({ id, file, transcript: "", isTranscribing: false });
+ }
+ if (valid.length === 0) return;
+ setPendingFiles((prev) => [...prev, ...valid]);
+ setError(null);
+ for (const pf of valid) {
+ transcribeFile(pf.id, pf.file);
+ }
+ };
+
+ const removePendingFile = (pendingId: string) => {
+ setPendingFiles((prev) => prev.filter((p) => p.id !== pendingId));
+ };
+
+ const updateTranscript = (pendingId: string, transcript: string) => {
+ setPendingFiles((prev) =>
+ prev.map((p) => (p.id === pendingId ? { ...p, transcript } : p))
+ );
+ };
+
+ const startRecording = async () => {
+ try {
+ const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
+ const mediaRecorder = new MediaRecorder(stream);
+ mediaRecorderRef.current = mediaRecorder;
+ audioChunksRef.current = [];
+
+ mediaRecorder.ondataavailable = (e) => {
+ if (e.data.size > 0) audioChunksRef.current.push(e.data);
+ };
+
+ const filename = recordingFilename.trim() || "recording";
+ mediaRecorder.onstop = () => {
+ stream.getTracks().forEach((t) => t.stop());
+ stopRecordingTimer();
+
+ const blob = new Blob(audioChunksRef.current, { type: mediaRecorder.mimeType });
+ if (blob.size > MAX_FILE_SIZE) {
+ setError(`Recording (${(blob.size / (1024 * 1024)).toFixed(1)}MB) exceeds the maximum allowed size of 5MB.`);
+ resetRecordingState();
+ return;
+ }
+ const ext = mediaRecorder.mimeType.includes("webm") ? "webm" : "mp4";
+ const file = new File([blob], `${filename}.${ext}`, { type: mediaRecorder.mimeType });
+ resetRecordingState();
+ addPendingFiles([file]);
+ };
+
+ mediaRecorder.start();
+ setRecordingStep("recording");
+ setRecordingDuration(0);
+ setError(null);
+ recordingTimerRef.current = setInterval(() => {
+ setRecordingDuration((d) => d + 1);
+ }, 1000);
+ } catch {
+ setError("Microphone access denied. Please allow microphone permissions.");
+ resetRecordingState();
+ }
+ };
+
+ const handleFileSelect = (fileList: FileList | null) => {
+ if (!fileList || fileList.length === 0) return;
+ addPendingFiles(Array.from(fileList));
+ if (fileInputRef.current) fileInputRef.current.value = "";
+ };
+
+ const handleUpload = async () => {
+ const ready = pendingFiles.filter((p) => p.transcript.trim() && !p.isTranscribing);
+ if (ready.length === 0) return;
+
+ setUploading(true);
+ setError(null);
+
+ try {
+ const uploadUrlResponse = await getUploadUrlsApiV1WorkflowRecordingsUploadUrlPost({
+ body: {
+ files: ready.map((p) => ({
+ filename: p.file.name,
+ mime_type: p.file.type || "audio/wav",
+ file_size: p.file.size,
+ })),
+ },
+ });
+
+ if (!uploadUrlResponse.data?.items) {
+ throw new Error("Failed to get upload URLs");
+ }
+
+ const items = uploadUrlResponse.data.items;
+
+ await Promise.all(
+ items.map(async (item: RecordingUploadResponseSchema, idx: number) => {
+ const file = ready[idx].file;
+ const uploadResponse = await fetch(item.upload_url, {
+ method: "PUT",
+ body: file,
+ headers: { "Content-Type": file.type || "audio/wav" },
+ });
+ if (!uploadResponse.ok) {
+ throw new Error(`File upload failed for ${file.name}`);
+ }
+ })
+ );
+
+ await createRecordingsApiV1WorkflowRecordingsPost({
+ body: {
+ recordings: items.map((item: RecordingUploadResponseSchema, idx: number) => ({
+ recording_id: item.recording_id,
+ transcript: ready[idx].transcript.trim(),
+ storage_key: item.storage_key,
+ metadata: {
+ original_filename: ready[idx].file.name,
+ file_size_bytes: ready[idx].file.size,
+ mime_type: ready[idx].file.type,
+ language,
+ },
+ })),
+ },
+ });
+
+ setPendingFiles([]);
+ setLanguage("multi");
+ resetRecordingState();
+ if (fileInputRef.current) fileInputRef.current.value = "";
+ onUploadComplete?.();
+ onOpenChange(false);
+ } catch (err) {
+ setError(err instanceof Error ? err.message : "Failed to upload recordings");
+ } finally {
+ setUploading(false);
+ }
+ };
+
+ const isRecording = recordingStep === "recording";
+ const anyTranscribing = pendingFiles.some((p) => p.isTranscribing);
+ const readyCount = pendingFiles.filter((p) => p.transcript.trim() && !p.isTranscribing).length;
+ const isBusy = uploading || isRecording || anyTranscribing;
+
+ return (
+
+
+
+ Upload Recordings
+
+ Upload or record audio files. Use{" "}
+ @ in
+ prompt fields to insert them into your agents.
+
+
+
+ {error && (
+
+ {error}
+
+ )}
+
+ {/* Upload Section */}
+
+ {/* Audio source: file picker or record */}
+
+
Audio Files
+
+ handleFileSelect(e.target.files)}
+ className="hidden"
+ />
+ fileInputRef.current?.click()}
+ disabled={isBusy}
+ >
+
+ Choose audio files (max 5MB each)
+
+ {recordingStep === "idle" && (
+ setRecordingStep("naming")}
+ disabled={uploading || anyTranscribing}
+ >
+
+ Record
+
+ )}
+
+
+
+ {/* Recording: filename + start/stop */}
+ {(recordingStep === "naming" || isRecording) && (
+
+ {recordingStep === "naming" && (
+ <>
+
+ Recording Name
+ setRecordingFilename(e.target.value)}
+ autoFocus
+ />
+
+
+
+
+ Start Recording
+
+
+ Cancel
+
+
+ >
+ )}
+ {isRecording && (
+
+
+
+
+
+
+ {Math.floor(recordingDuration / 60)}:{(recordingDuration % 60).toString().padStart(2, "0")}
+
+ {recordingFilename}
+ stopRecording()}
+ className="ml-auto"
+ >
+
+ Stop
+
+
+ )}
+
+ )}
+
+ {/* Pending files list */}
+ {pendingFiles.length > 0 && (
+
+
+ Pending ({pendingFiles.length} file{pendingFiles.length !== 1 ? "s" : ""})
+
+ {pendingFiles.map((pf) => (
+
+
+
+ {pf.file.name} ({(pf.file.size / (1024 * 1024)).toFixed(1)}MB)
+
+ {pf.isTranscribing && (
+
+ )}
+ removePendingFile(pf.id)}
+ disabled={uploading}
+ >
+
+
+
+ {pf.error && (
+
{pf.error}
+ )}
+
+ ))}
+
+ )}
+
+ {/* Language */}
+
+ Language
+
+
+
+
+
+ {Object.entries(LANGUAGE_DISPLAY_NAMES).map(([code, name]) => (
+
+ {name}
+
+ ))}
+
+
+
+
+
+ {uploading ? (
+
+ ) : (
+
+ )}
+ {uploading
+ ? "Uploading..."
+ : `Upload ${readyCount} Recording${readyCount !== 1 ? "s" : ""}`}
+
+
+
+
+ );
+};
diff --git a/ui/src/app/recordings/page.tsx b/ui/src/app/recordings/page.tsx
index b5bb552..71802f4 100644
--- a/ui/src/app/recordings/page.tsx
+++ b/ui/src/app/recordings/page.tsx
@@ -1,15 +1,20 @@
"use client";
-import { useEffect } from "react";
+import { ExternalLink, Upload } from "lucide-react";
+import { useEffect, useState } from "react";
+import { Button } from "@/components/ui/button";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
import { Skeleton } from "@/components/ui/skeleton";
import { useAuth } from "@/lib/auth";
import RecordingsList from "./RecordingsList";
+import { RecordingsUploadDialog } from "./RecordingsUploadDialog";
export default function RecordingsPage() {
const { user, redirectToLogin, loading } = useAuth();
+ const [isUploadOpen, setIsUploadOpen] = useState(false);
+ const [refreshKey, setRefreshKey] = useState(0);
useEffect(() => {
if (!loading && !user) {
@@ -33,21 +38,40 @@ export default function RecordingsPage() {
Recordings
- View all audio recordings across your voice agents. Filter by agent, provider, model, or voice.
+ Manage audio recordings for your organization. Use{" "}
+ @ in prompt fields to insert them,
+ or as transition messages in tool calls.{" "}
+
+ Learn more
+
- All Recordings
-
- Audio recordings scoped to your organization
-
+
+
+ All Recordings
+
+ Audio recordings shared across all agents in your organization
+
+
+
setIsUploadOpen(true)}>
+
+ Upload Recording
+
+
-
+
+
+ setRefreshKey((k) => k + 1)}
+ />
);
}
diff --git a/ui/src/app/tools/[toolUuid]/page.tsx b/ui/src/app/tools/[toolUuid]/page.tsx
index d69b6a7..07104f4 100644
--- a/ui/src/app/tools/[toolUuid]/page.tsx
+++ b/ui/src/app/tools/[toolUuid]/page.tsx
@@ -6,9 +6,10 @@ import { useCallback, useEffect, useState } from "react";
import {
getToolApiV1ToolsToolUuidGet,
+ listRecordingsApiV1WorkflowRecordingsGet,
updateToolApiV1ToolsToolUuidPut,
} from "@/client/sdk.gen";
-import type { ToolResponse, TransferCallConfig as APITransferCallConfig } from "@/client/types.gen";
+import type { RecordingResponseSchema, ToolResponse, TransferCallConfig as APITransferCallConfig } from "@/client/types.gen";
import type { EndCallConfig } from "@/client/types.gen";
import { type HttpMethod, type KeyValueItem, type ToolParameter, validateUrl } from "@/components/http";
import { Button } from "@/components/ui/button";
@@ -94,6 +95,9 @@ export default function ToolDetailPage() {
const [customMessageType, setCustomMessageType] = useState<'text' | 'audio'>('text');
const [customMessageRecordingId, setCustomMessageRecordingId] = useState("");
+ // Org-level recordings for audio dropdowns
+ const [recordings, setRecordings] = useState
([]);
+
// Redirect if not authenticated
useEffect(() => {
if (!loading && !user) {
@@ -209,9 +213,24 @@ export default function ToolDetailPage() {
}
};
+ const fetchRecordings = useCallback(async () => {
+ if (loading || !user) return;
+ try {
+ const response = await listRecordingsApiV1WorkflowRecordingsGet({
+ query: {},
+ });
+ if (response.data) {
+ setRecordings(response.data.recordings);
+ }
+ } catch {
+ // Non-critical — dropdowns will show "No recordings available"
+ }
+ }, [loading, user]);
+
useEffect(() => {
fetchTool();
- }, [fetchTool]);
+ fetchRecordings();
+ }, [fetchTool, fetchRecordings]);
const handleSave = async () => {
if (!tool) return;
@@ -512,6 +531,7 @@ const data = await response.json();`;
onCustomMessageChange={setCustomMessage}
audioRecordingId={audioRecordingId}
onAudioRecordingIdChange={setAudioRecordingId}
+ recordings={recordings}
endCallReason={endCallReason}
onEndCallReasonChange={handleEndCallReasonChange}
endCallReasonDescription={endCallReasonDescription}
@@ -531,6 +551,7 @@ const data = await response.json();`;
onCustomMessageChange={setCustomMessage}
audioRecordingId={transferAudioRecordingId}
onAudioRecordingIdChange={setTransferAudioRecordingId}
+ recordings={recordings}
timeout={transferTimeout}
onTimeoutChange={setTransferTimeout}
/>
@@ -558,6 +579,7 @@ const data = await response.json();`;
onCustomMessageTypeChange={setCustomMessageType}
customMessageRecordingId={customMessageRecordingId}
onCustomMessageRecordingIdChange={setCustomMessageRecordingId}
+ recordings={recordings}
/>
)}
diff --git a/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx b/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx
index cbb923e..7d27363 100644
--- a/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx
+++ b/ui/src/app/workflow/[workflowId]/RenderWorkflow.tsx
@@ -15,7 +15,6 @@ import type { DocumentResponseSchema, RecordingResponseSchema, ToolResponse } fr
import { FlowEdge, FlowNode, NodeType } from "@/components/flow/types";
import { Button } from '@/components/ui/button';
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip';
-import { useUserConfig } from '@/context/UserConfigContext';
import { WorkflowConfigurations } from '@/types/workflow-configurations';
import AddNodePanel from "../../../components/flow/AddNodePanel";
@@ -64,12 +63,6 @@ interface RenderWorkflowProps {
function RenderWorkflow({ initialWorkflowName, workflowId, initialFlow, initialTemplateContextVariables, initialWorkflowConfigurations, initialVersionNumber, initialVersionStatus, user }: RenderWorkflowProps) {
const router = useRouter();
- const { userConfig } = useUserConfig();
- const ttsOverrides = initialWorkflowConfigurations?.model_overrides?.tts;
- const ttsProvider = ttsOverrides?.provider ?? (userConfig?.tts?.provider as string) ?? "";
- const ttsModel = ttsOverrides?.model ?? (userConfig?.tts?.model as string) ?? "";
- const ttsVoiceId = ttsOverrides?.voice ?? (userConfig?.tts?.voice as string) ?? "";
-
const [isPhoneCallDialogOpen, setIsPhoneCallDialogOpen] = useState(false);
const [isVersionPanelOpen, setIsVersionPanelOpen] = useState(false);
const [versions, setVersions] = useState([]);
@@ -245,15 +238,10 @@ function RenderWorkflow({ initialWorkflowName, workflowId, initialFlow, initialT
setTools(toolsResponse.data);
}
- // Fetch recordings for this workflow filtered by active TTS config
+ // Fetch org-level recordings
try {
const recordingsResponse = await listRecordingsApiV1WorkflowRecordingsGet({
- query: {
- workflow_id: workflowId,
- tts_provider: ttsProvider || undefined,
- tts_model: ttsModel || undefined,
- tts_voice_id: ttsVoiceId || undefined,
- },
+ query: {},
});
if (recordingsResponse.data) {
setRecordings(recordingsResponse.data.recordings);
@@ -267,7 +255,7 @@ function RenderWorkflow({ initialWorkflowName, workflowId, initialFlow, initialT
};
fetchData();
- }, [workflowId, ttsProvider, ttsModel, ttsVoiceId]);
+ }, [workflowId]);
// Memoize defaultEdgeOptions to prevent unnecessary re-renders
const defaultEdgeOptions = useMemo(() => ({
diff --git a/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx b/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx
index e73ea4c..8622bf3 100644
--- a/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx
+++ b/ui/src/app/workflow/[workflowId]/components/RecordingsDialog.tsx
@@ -60,7 +60,6 @@ let pendingFileCounter = 0;
export const RecordingsDialog = ({
open,
onOpenChange,
- workflowId,
onRecordingsChange,
ttsOverrides,
}: RecordingsDialogProps) => {
@@ -87,12 +86,10 @@ export const RecordingsDialog = ({
const ttsVoiceId = ttsOverrides?.voice ?? (userConfig?.tts?.voice as string) ?? "";
const fetchRecordings = useCallback(async () => {
- if (!workflowId) return;
setLoading(true);
try {
const result = await listRecordingsApiV1WorkflowRecordingsGet({
query: {
- workflow_id: workflowId,
tts_provider: ttsProvider || undefined,
tts_model: ttsModel || undefined,
tts_voice_id: ttsVoiceId || undefined,
@@ -106,7 +103,7 @@ export const RecordingsDialog = ({
} finally {
setLoading(false);
}
- }, [workflowId, ttsProvider, ttsModel, ttsVoiceId, onRecordingsChange]);
+ }, [ttsProvider, ttsModel, ttsVoiceId, onRecordingsChange]);
const stopRecordingTimer = useCallback(() => {
if (recordingTimerRef.current) {
@@ -275,7 +272,6 @@ export const RecordingsDialog = ({
const uploadUrlResponse =
await getUploadUrlsApiV1WorkflowRecordingsUploadUrlPost({
body: {
- workflow_id: workflowId,
files: ready.map((p) => ({
filename: p.file.name,
mime_type: p.file.type || "audio/wav",
@@ -312,7 +308,6 @@ export const RecordingsDialog = ({
body: {
recordings: items.map((item: RecordingUploadResponseSchema, idx: number) => ({
recording_id: item.recording_id,
- workflow_id: workflowId,
tts_provider: ttsProvider,
tts_model: ttsModel,
tts_voice_id: ttsVoiceId,
diff --git a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx
index 4b27c9d..eac4967 100644
--- a/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx
+++ b/ui/src/app/workflow/[workflowId]/run/[runId]/hooks/useWebSocketRTC.tsx
@@ -446,6 +446,17 @@ export const useWebSocketRTC = ({ workflowId, workflowRunId, accessToken, initia
if (!firstBotSpeechCompletedRef.current) {
firstBotSpeechCompletedRef.current = true;
}
+ // Finalize the last bot message so "speaking..." indicator is removed
+ setFeedbackMessages(prev => {
+ const lastIdx = prev.length - 1;
+ const last = prev[lastIdx];
+ if (last && last.type === 'bot-text' && !last.final) {
+ const updated = [...prev];
+ updated[lastIdx] = { ...last, final: true };
+ return updated;
+ }
+ return prev;
+ });
break;
case 'rtf-user-mute-started':
diff --git a/ui/src/app/workflow/[workflowId]/settings/page.tsx b/ui/src/app/workflow/[workflowId]/settings/page.tsx
index 51a4108..4d9dae1 100644
--- a/ui/src/app/workflow/[workflowId]/settings/page.tsx
+++ b/ui/src/app/workflow/[workflowId]/settings/page.tsx
@@ -1,6 +1,7 @@
"use client";
import { ArrowLeft, BookA, Brain, ExternalLink, Loader2, Mic, Pause, PhoneOff, Play, Rocket, Settings, Trash2Icon, Upload, Variable, X } from "lucide-react";
+import Link from "next/link";
import { useParams, useRouter } from "next/navigation";
import { useEffect, useMemo, useRef, useState } from "react";
@@ -33,7 +34,6 @@ import {
} from "@/types/workflow-configurations";
import { EmbedDialog } from "../components/EmbedDialog";
-import { RecordingsDialog } from "../components/RecordingsDialog";
import { useWorkflowState } from "../hooks/useWorkflowState";
// ---------------------------------------------------------------------------
@@ -915,7 +915,6 @@ function WorkflowSettingsInner({
const router = useRouter();
const { dirtySections, confirmNavigate } = useUnsavedChangesContext();
- const [isRecordingsDialogOpen, setIsRecordingsDialogOpen] = useState(false);
const [isEmbedDialogOpen, setIsEmbedDialogOpen] = useState(false);
const [activeSection, setActiveSection] = useState("general");
@@ -1057,7 +1056,7 @@ function WorkflowSettingsInner({
onSave={saveWorkflowConfigurations}
/>
- {/* Recordings (dialog trigger) */}
+ {/* Recordings – moved to org-level page */}
@@ -1065,15 +1064,17 @@ function WorkflowSettingsInner({
Recordings
- Upload or record audio for hybrid prompts. Use{" "}
- @ in prompt fields to
- insert them.{" "}
+ Recordings are now managed at the organization level and shared across all agents.
+ Use @ in prompt fields to insert them.{" "}
Learn more
- setIsRecordingsDialogOpen(true)}>
- Manage Recordings
+
+
+ Go to Recordings
+
+
@@ -1128,12 +1129,6 @@ function WorkflowSettingsInner({
{/* Dialogs for complex sections */}
-
(options?: Options) => (options?.client ?? client).get({ url: '/api/v1/workflow-recordings/', ...options });
diff --git a/ui/src/client/types.gen.ts b/ui/src/client/types.gen.ts
index 5b82817..2967dd5 100644
--- a/ui/src/client/types.gen.ts
+++ b/ui/src/client/types.gen.ts
@@ -269,12 +269,6 @@ export type BatchRecordingCreateResponseSchema = {
* Request schema for getting presigned upload URLs for one or more files.
*/
export type BatchRecordingUploadRequestSchema = {
- /**
- * Workflow Id
- *
- * Workflow ID these recordings belong to
- */
- workflow_id: number;
/**
* Files
*
@@ -1572,13 +1566,19 @@ export type EndCallConfig = {
*
* Type of goodbye message
*/
- messageType?: 'none' | 'custom';
+ messageType?: 'none' | 'custom' | 'audio';
/**
* Custommessage
*
* Custom message to play before ending the call
*/
customMessage?: string | null;
+ /**
+ * Audiorecordingid
+ *
+ * Recording ID for audio goodbye message
+ */
+ audioRecordingId?: string | null;
/**
* Endcallreason
*
@@ -1739,6 +1739,24 @@ export type HttpApiConfig = {
* Request timeout in milliseconds
*/
timeout_ms?: number | null;
+ /**
+ * Custommessage
+ *
+ * Custom message to play after tool execution
+ */
+ customMessage?: string | null;
+ /**
+ * Custommessagetype
+ *
+ * Type of custom message: text or audio
+ */
+ customMessageType?: 'text' | 'audio' | null;
+ /**
+ * Custommessagerecordingid
+ *
+ * Recording ID for audio custom message
+ */
+ customMessageRecordingId?: string | null;
};
/**
@@ -2102,30 +2120,24 @@ export type RecordingCreateRequestSchema = {
* Short recording ID from upload step
*/
recording_id: string;
- /**
- * Workflow Id
- *
- * Workflow ID
- */
- workflow_id: number;
/**
* Tts Provider
*
* TTS provider (e.g. elevenlabs)
*/
- tts_provider: string;
+ tts_provider?: string | null;
/**
* Tts Model
*
* TTS model name
*/
- tts_model: string;
+ tts_model?: string | null;
/**
* Tts Voice Id
*
* TTS voice identifier
*/
- tts_voice_id: string;
+ tts_voice_id?: string | null;
/**
* Transcript
*
@@ -2181,7 +2193,7 @@ export type RecordingResponseSchema = {
/**
* Workflow Id
*/
- workflow_id: number;
+ workflow_id?: number | null;
/**
* Organization Id
*/
@@ -2189,15 +2201,15 @@ export type RecordingResponseSchema = {
/**
* Tts Provider
*/
- tts_provider: string;
+ tts_provider?: string | null;
/**
* Tts Model
*/
- tts_model: string;
+ tts_model?: string | null;
/**
* Tts Voice Id
*/
- tts_voice_id: string;
+ tts_voice_id?: string | null;
/**
* Transcript
*/
@@ -2239,7 +2251,7 @@ export type RecordingUpdateRequestSchema = {
/**
* Recording Id
*
- * New descriptive recording ID
+ * New descriptive recording ID (letters, numbers, hyphens, underscores only)
*/
recording_id: string;
};
@@ -2828,13 +2840,19 @@ export type TransferCallConfig = {
*
* Type of message to play before transfer
*/
- messageType?: 'none' | 'custom';
+ messageType?: 'none' | 'custom' | 'audio';
/**
* Custommessage
*
* Custom message to play before transferring the call
*/
customMessage?: string | null;
+ /**
+ * Audiorecordingid
+ *
+ * Recording ID for audio message before transfer
+ */
+ audioRecordingId?: string | null;
/**
* Timeout
*
diff --git a/ui/src/components/flow/TextOrAudioInput.tsx b/ui/src/components/flow/TextOrAudioInput.tsx
index 1381493..1f575d8 100644
--- a/ui/src/components/flow/TextOrAudioInput.tsx
+++ b/ui/src/components/flow/TextOrAudioInput.tsx
@@ -1,7 +1,14 @@
+import { Check, ChevronDown, Pause, Play, Search } from "lucide-react";
+import { useMemo, useState } from "react";
+
import type { RecordingResponseSchema } from "@/client/types.gen";
+import { Button } from "@/components/ui/button";
+import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
+import { Popover, PopoverContentInline, PopoverTrigger } from "@/components/ui/popover";
import { RadioGroup, RadioGroupItem } from "@/components/ui/radio-group";
-import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select";
+import { useAudioPlayback } from "@/hooks/useAudioPlayback";
+import { cn } from "@/lib/utils";
interface TextOrAudioInputProps {
type: 'text' | 'audio';
@@ -62,36 +69,144 @@ interface RecordingSelectProps {
* their own none/custom/audio radio) can use it directly.
*/
export function RecordingSelect({ value, onChange, recordings }: RecordingSelectProps) {
+ const [open, setOpen] = useState(false);
+ const [search, setSearch] = useState("");
+ const { playingId, toggle, stop } = useAudioPlayback();
+
+ const selected = recordings.find((r) => String(r.id) === value);
+
+ const filtered = useMemo(() => {
+ if (!search) return recordings;
+ const q = search.toLowerCase();
+ return recordings.filter((r) =>
+ r.recording_id.toLowerCase().includes(q) ||
+ r.transcript.toLowerCase().includes(q) ||
+ ((r.metadata?.original_filename as string) || "").toLowerCase().includes(q)
+ );
+ }, [recordings, search]);
+
+ const handleSelect = (rec: RecordingResponseSchema) => {
+ stop();
+ onChange(String(rec.id));
+ setOpen(false);
+ };
+
+ const handlePlay = async (e: React.MouseEvent, rec: RecordingResponseSchema) => {
+ e.stopPropagation();
+ try {
+ await toggle(rec.recording_id, rec.storage_key, rec.storage_backend);
+ } catch {
+ // Ignore playback errors
+ }
+ };
+
return (
Select a pre-recorded audio file to play.
-
-
-
-
-
- {recordings.length === 0 ? (
-
- No recordings available
-
- ) : (
- recordings.map((r) => (
-
-
- {(r.metadata?.original_filename as string) || r.recording_id}
+ { if (!v) { stop(); setSearch(""); } setOpen(v); }}>
+
+
+ {selected ? (
+
+
+ {selected.recording_id}
+
+
+ {selected.transcript.length > 75
+ ? `${selected.transcript.slice(0, 75)}…`
+ : selected.transcript}
- {r.transcript && (
-
- — {r.transcript}
-
- )}
-
- ))
+
+ ) : (
+ Select a recording
+ )}
+
+
+
+
+ {recordings.length === 0 ? (
+
+ No recordings available
+
+ ) : (
+
+
+
+
+ setSearch(e.target.value)}
+ className="h-8 pl-8 text-sm"
+ autoFocus
+ />
+
+
+
+ {filtered.length === 0 ? (
+
+ No recordings match “{search}”
+
+ ) : filtered.map((r) => {
+ const filename = (r.metadata?.original_filename as string) || "";
+ const isSelected = String(r.id) === value;
+ const isPlaying = playingId === r.recording_id;
+
+ return (
+
handleSelect(r)}
+ >
+
+
+ {r.recording_id}
+
+ {filename && (
+
+ {filename}
+
+ )}
+
+ {r.transcript}
+
+
handlePlay(e, r)}
+ >
+ {isPlaying ? (
+
+ ) : (
+
+ )}
+
+
+ );
+ })}
+
+
)}
-
-
+
+
);
}
diff --git a/ui/src/components/flow/edges/CustomEdge.tsx b/ui/src/components/flow/edges/CustomEdge.tsx
index 443183b..56ad50d 100644
--- a/ui/src/components/flow/edges/CustomEdge.tsx
+++ b/ui/src/components/flow/edges/CustomEdge.tsx
@@ -72,7 +72,7 @@ const EdgeDetailsDialog = ({ open, onOpenChange, data, onSave }: EdgeDetailsDial
return (
-
+
Edit Condition
{data?.invalid && data.validationMessage && (
@@ -82,7 +82,7 @@ const EdgeDetailsDialog = ({ open, onOpenChange, data, onSave }: EdgeDetailsDial
)}
-
+
Condition Label
diff --git a/ui/src/components/ui/dialog.tsx b/ui/src/components/ui/dialog.tsx
index 5bfab4b..493d746 100644
--- a/ui/src/components/ui/dialog.tsx
+++ b/ui/src/components/ui/dialog.tsx
@@ -56,6 +56,23 @@ function DialogContent({
e.preventDefault()}
+ onCloseAutoFocus={() => {
+ document.body.style.pointerEvents = "";
+ }}
+ onPointerDownOutside={(e) => {
+ // Prevent the Dialog from closing when the user clicks inside a
+ // portaled Radix Popover/DropdownMenu rendered on top of this Dialog.
+ const target = e.target as HTMLElement;
+ if (target.closest('[data-radix-popper-content-wrapper]')) {
+ e.preventDefault();
+ }
+ }}
+ onInteractOutside={(e) => {
+ const target = e.target as HTMLElement;
+ if (target.closest('[data-radix-popper-content-wrapper]')) {
+ e.preventDefault();
+ }
+ }}
data-slot="dialog-content"
className={cn(
"bg-background data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0 data-[state=closed]:zoom-out-95 data-[state=open]:zoom-in-95 fixed top-[50%] left-[50%] z-50 grid w-full max-w-[calc(100%-2rem)] translate-x-[-50%] translate-y-[-50%] gap-4 rounded-lg border p-6 shadow-lg duration-200 sm:max-w-lg",
diff --git a/ui/src/components/ui/popover.tsx b/ui/src/components/ui/popover.tsx
index 4d567cd..1d1cd41 100644
--- a/ui/src/components/ui/popover.tsx
+++ b/ui/src/components/ui/popover.tsx
@@ -17,6 +17,9 @@ function PopoverTrigger({
return
}
+const popoverContentClass =
+ "bg-popover text-popover-foreground data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0 data-[state=closed]:zoom-out-95 data-[state=open]:zoom-in-95 data-[side=bottom]:slide-in-from-top-2 data-[side=left]:slide-in-from-right-2 data-[side=right]:slide-in-from-left-2 data-[side=top]:slide-in-from-bottom-2 z-50 w-72 origin-(--radix-popover-content-transform-origin) rounded-md border p-4 shadow-md outline-hidden"
+
function PopoverContent({
className,
align = "center",
@@ -29,20 +32,38 @@ function PopoverContent({
data-slot="popover-content"
align={align}
sideOffset={sideOffset}
- className={cn(
- "bg-popover text-popover-foreground data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0 data-[state=closed]:zoom-out-95 data-[state=open]:zoom-in-95 data-[side=bottom]:slide-in-from-top-2 data-[side=left]:slide-in-from-right-2 data-[side=right]:slide-in-from-left-2 data-[side=top]:slide-in-from-bottom-2 z-50 w-72 origin-(--radix-popover-content-transform-origin) rounded-md border p-4 shadow-md outline-hidden",
- className
- )}
+ className={cn(popoverContentClass, className)}
{...props}
/>
)
}
+/**
+ * PopoverContent without a Portal wrapper. Renders inline in the DOM tree,
+ * which avoids focus-trap conflicts when used inside a Dialog.
+ */
+function PopoverContentInline({
+ className,
+ align = "center",
+ sideOffset = 4,
+ ...props
+}: React.ComponentProps) {
+ return (
+
+ )
+}
+
function PopoverAnchor({
...props
}: React.ComponentProps) {
return
}
-export { Popover, PopoverAnchor,PopoverContent, PopoverTrigger }
+export { Popover, PopoverAnchor, PopoverContent, PopoverContentInline, PopoverTrigger }