diff --git a/api/alembic/versions/3cd3155084a2_dedup_org_scoped_recordings.py b/api/alembic/versions/3cd3155084a2_dedup_org_scoped_recordings.py new file mode 100644 index 0000000..67998cf --- /dev/null +++ b/api/alembic/versions/3cd3155084a2_dedup_org_scoped_recordings.py @@ -0,0 +1,108 @@ +"""dedup recordings to org-scoped unique audio + +Revision ID: 3cd3155084a2 +Revises: e7254d2c6c18 +Create Date: 2026-04-10 12:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "3cd3155084a2" +down_revision: Union[str, None] = "e7254d2c6c18" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + conn = op.get_bind() + + # 1. Identify duplicate groups: same (org, transcript, tts config). + # Within each group the earliest row (by created_at) is canonical; + # every other row is an alias that will be remapped and soft-deleted. + rows = conn.execute( + sa.text(""" + WITH ranked AS ( + SELECT + recording_id, + organization_id, + transcript, + tts_provider, + tts_model, + tts_voice_id, + ROW_NUMBER() OVER ( + PARTITION BY organization_id, transcript, + tts_provider, tts_model, tts_voice_id + ORDER BY created_at ASC + ) AS rn + FROM workflow_recordings + WHERE is_active = true + ), + canonical AS ( + SELECT recording_id AS canonical_id, + organization_id, transcript, + tts_provider, tts_model, tts_voice_id + FROM ranked + WHERE rn = 1 + ) + SELECT r.recording_id AS alias_id, c.canonical_id + FROM ranked r + JOIN canonical c + ON r.organization_id = c.organization_id + AND r.transcript = c.transcript + AND r.tts_provider = c.tts_provider + AND r.tts_model = c.tts_model + AND r.tts_voice_id = c.tts_voice_id + WHERE r.rn > 1 + """) + ).fetchall() + + if not rows: + return + + # 2. Replace alias recording_ids with canonical ones in workflow JSON. + # Both draft definitions (workflows.workflow_definition) and published + # versions (workflow_definitions.workflow_json) are updated. + for alias_id, canonical_id in rows: + alias_pattern = f"RECORDING_ID: {alias_id}" + canonical_pattern = f"RECORDING_ID: {canonical_id}" + conn.execute( + sa.text(""" + UPDATE workflows + SET workflow_definition = + REPLACE(workflow_definition::text, :alias, :canonical)::json + WHERE workflow_definition::text LIKE '%%' || :alias || '%%' + """), + {"alias": alias_pattern, "canonical": canonical_pattern}, + ) + conn.execute( + sa.text(""" + UPDATE workflow_definitions + SET workflow_json = + REPLACE(workflow_json::text, :alias, :canonical)::json + WHERE workflow_json::text LIKE '%%' || :alias || '%%' + """), + {"alias": alias_pattern, "canonical": canonical_pattern}, + ) + + # 3. Soft-delete every alias row. + alias_ids = [r[0] for r in rows] + conn.execute( + sa.text(""" + UPDATE workflow_recordings + SET is_active = false + WHERE recording_id = ANY(:ids) + AND is_active = true + """), + {"ids": alias_ids}, + ) + + +def downgrade() -> None: + # Deduplication is a one-way data migration. The soft-deleted rows + # still exist in the table; a manual restore is possible if needed. + pass diff --git a/api/alembic/versions/67a5cf3e09d0_unique_recording_id_per_org.py b/api/alembic/versions/67a5cf3e09d0_unique_recording_id_per_org.py new file mode 100644 index 0000000..0b9ddfa --- /dev/null +++ b/api/alembic/versions/67a5cf3e09d0_unique_recording_id_per_org.py @@ -0,0 +1,109 @@ +"""make recordings org-scoped instead of workflow-scoped + +Revision ID: 67a5cf3e09d0 +Revises: 3cd3155084a2 +Create Date: 2026-04-09 17:03:38.302041 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "67a5cf3e09d0" +down_revision: Union[str, None] = "3cd3155084a2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # 1. Widen recording_id from 16 to 64 chars for descriptive names + op.alter_column( + "workflow_recordings", + "recording_id", + existing_type=sa.VARCHAR(length=16), + type_=sa.String(length=64), + existing_nullable=False, + ) + + # 2. Make workflow_id nullable — recordings are now org-scoped + op.alter_column( + "workflow_recordings", + "workflow_id", + existing_type=sa.Integer(), + nullable=True, + ) + + # 3. Drop the old globally-unique index on recording_id + op.drop_index( + "ix_workflow_recordings_recording_id", + table_name="workflow_recordings", + ) + + # 4. Re-create as non-unique index (for lookups) + op.create_index( + "ix_workflow_recordings_recording_id", + "workflow_recordings", + ["recording_id"], + unique=False, + ) + + # 5. Add unique constraint (recording_id, organization_id) + op.create_unique_constraint( + "uq_workflow_recordings_recording_id_org", + "workflow_recordings", + ["recording_id", "organization_id"], + ) + + # 6. Drop the workflow+TTS scope index (no longer relevant) + op.drop_index( + "ix_workflow_recordings_tts_scope", + table_name="workflow_recordings", + ) + + +def downgrade() -> None: + # Re-create the TTS scope index + op.create_index( + "ix_workflow_recordings_tts_scope", + "workflow_recordings", + ["workflow_id", "tts_provider", "tts_model", "tts_voice_id"], + ) + + # Drop the org-scoped unique constraint + op.drop_constraint( + "uq_workflow_recordings_recording_id_org", + "workflow_recordings", + type_="unique", + ) + + # Drop non-unique index and re-create as unique + op.drop_index( + "ix_workflow_recordings_recording_id", + table_name="workflow_recordings", + ) + op.create_index( + "ix_workflow_recordings_recording_id", + "workflow_recordings", + ["recording_id"], + unique=True, + ) + + # Make workflow_id NOT NULL again + op.alter_column( + "workflow_recordings", + "workflow_id", + existing_type=sa.Integer(), + nullable=False, + ) + + # Revert recording_id width + op.alter_column( + "workflow_recordings", + "recording_id", + existing_type=sa.String(length=64), + type_=sa.VARCHAR(length=16), + existing_nullable=False, + ) diff --git a/api/alembic/versions/a1b2c3d4e5f6_make_tts_columns_nullable.py b/api/alembic/versions/a1b2c3d4e5f6_make_tts_columns_nullable.py new file mode 100644 index 0000000..563c45f --- /dev/null +++ b/api/alembic/versions/a1b2c3d4e5f6_make_tts_columns_nullable.py @@ -0,0 +1,42 @@ +"""make tts columns nullable on workflow_recordings + +Revision ID: a1b2c3d4e5f6 +Revises: 67a5cf3e09d0 +Create Date: 2026-04-10 12:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "a1b2c3d4e5f6" +down_revision: str = "67a5cf3e09d0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.alter_column( + "workflow_recordings", "tts_provider", existing_type=sa.String(), nullable=True + ) + op.alter_column( + "workflow_recordings", "tts_model", existing_type=sa.String(), nullable=True + ) + op.alter_column( + "workflow_recordings", "tts_voice_id", existing_type=sa.String(), nullable=True + ) + + +def downgrade() -> None: + op.alter_column( + "workflow_recordings", "tts_voice_id", existing_type=sa.String(), nullable=False + ) + op.alter_column( + "workflow_recordings", "tts_model", existing_type=sa.String(), nullable=False + ) + op.alter_column( + "workflow_recordings", "tts_provider", existing_type=sa.String(), nullable=False + ) diff --git a/api/db/models.py b/api/db/models.py index 820ccc5..b8d4b1d 100644 --- a/api/db/models.py +++ b/api/db/models.py @@ -1005,7 +1005,7 @@ class KnowledgeBaseDocumentModel(Base): class WorkflowRecordingModel(Base): - """Model for storing audio recordings scoped to a workflow and TTS configuration. + """Model for storing audio recordings scoped to an organization. Recordings are used in hybrid prompts where parts of the output are pre-recorded audio rather than dynamically generated TTS. @@ -1015,21 +1015,21 @@ class WorkflowRecordingModel(Base): id = Column(Integer, primary_key=True, index=True) - # Short globally unique ID (e.g. "xbhfha3k") used in prompts - recording_id = Column(String(16), unique=True, nullable=False, index=True) + # Descriptive ID used in prompts (unique per organization) + recording_id = Column(String(64), nullable=False, index=True) # Scoping workflow_id = Column( - Integer, ForeignKey("workflows.id", ondelete="CASCADE"), nullable=False + Integer, ForeignKey("workflows.id", ondelete="CASCADE"), nullable=True ) organization_id = Column( Integer, ForeignKey("organizations.id", ondelete="CASCADE"), nullable=False ) - # TTS configuration scope - tts_provider = Column(String, nullable=False) - tts_model = Column(String, nullable=False) - tts_voice_id = Column(String, nullable=False) + # TTS configuration metadata (optional, legacy) + tts_provider = Column(String, nullable=True) + tts_model = Column(String, nullable=True) + tts_voice_id = Column(String, nullable=True) # Content transcript = Column(Text, nullable=False) @@ -1062,16 +1062,14 @@ class WorkflowRecordingModel(Base): # Indexes __table_args__ = ( + UniqueConstraint( + "recording_id", + "organization_id", + name="uq_workflow_recordings_recording_id_org", + ), Index("ix_workflow_recordings_workflow_id", "workflow_id"), Index("ix_workflow_recordings_org_id", "organization_id"), Index("ix_workflow_recordings_recording_id", "recording_id"), - Index( - "ix_workflow_recordings_tts_scope", - "workflow_id", - "tts_provider", - "tts_model", - "tts_voice_id", - ), ) diff --git a/api/db/workflow_recording_client.py b/api/db/workflow_recording_client.py index f0c81c5..b0f1682 100644 --- a/api/db/workflow_recording_client.py +++ b/api/db/workflow_recording_client.py @@ -5,7 +5,7 @@ import string from typing import List, Optional from loguru import logger -from sqlalchemy import func, select +from sqlalchemy import func, select, text from api.db.base_client import BaseDBClient from api.db.models import WorkflowRecordingModel @@ -23,30 +23,30 @@ class WorkflowRecordingClient(BaseDBClient): async def create_recording( self, recording_id: str, - workflow_id: int, organization_id: int, - tts_provider: str, - tts_model: str, - tts_voice_id: str, transcript: str, storage_key: str, storage_backend: str, created_by: int, + workflow_id: Optional[int] = None, + tts_provider: Optional[str] = None, + tts_model: Optional[str] = None, + tts_voice_id: Optional[str] = None, metadata: Optional[dict] = None, ) -> WorkflowRecordingModel: """Create a new workflow recording record. Args: recording_id: Short unique recording identifier - workflow_id: ID of the workflow organization_id: ID of the organization - tts_provider: TTS provider name - tts_model: TTS model name - tts_voice_id: TTS voice identifier transcript: User-provided transcript storage_key: S3/MinIO storage key storage_backend: Storage backend (s3 or minio) created_by: ID of the user + workflow_id: Optional workflow ID (legacy) + tts_provider: Optional TTS provider name + tts_model: Optional TTS model name + tts_voice_id: Optional TTS voice identifier metadata: Optional extra metadata Returns: @@ -71,25 +71,22 @@ class WorkflowRecordingClient(BaseDBClient): await session.commit() await session.refresh(recording) - logger.info( - f"Created recording {recording_id} for workflow {workflow_id}, " - f"org {organization_id}" - ) + logger.info(f"Created recording {recording_id} for org {organization_id}") return recording - async def get_recordings_for_workflow( + async def get_recordings( self, - workflow_id: int, organization_id: int, + workflow_id: Optional[int] = None, tts_provider: Optional[str] = None, tts_model: Optional[str] = None, tts_voice_id: Optional[str] = None, ) -> List[WorkflowRecordingModel]: - """Get recordings for a workflow, optionally filtered by TTS config. + """Get recordings for an organization, optionally filtered. Args: - workflow_id: ID of the workflow organization_id: ID of the organization + workflow_id: Optional workflow ID filter tts_provider: Optional TTS provider filter tts_model: Optional TTS model filter tts_voice_id: Optional TTS voice ID filter @@ -99,11 +96,12 @@ class WorkflowRecordingClient(BaseDBClient): """ async with self.async_session() as session: query = select(WorkflowRecordingModel).where( - WorkflowRecordingModel.workflow_id == workflow_id, WorkflowRecordingModel.organization_id == organization_id, WorkflowRecordingModel.is_active == True, ) + if workflow_id is not None: + query = query.where(WorkflowRecordingModel.workflow_id == workflow_id) if tts_provider: query = query.where(WorkflowRecordingModel.tts_provider == tts_provider) if tts_model: @@ -140,15 +138,37 @@ class WorkflowRecordingClient(BaseDBClient): result = await session.execute(query) return result.scalar_one_or_none() - async def has_active_recordings( + async def get_recording_by_id( self, - workflow_id: int, + id: int, organization_id: int, - ) -> bool: - """Check if a workflow has any active recordings. + ) -> Optional[WorkflowRecordingModel]: + """Get a recording by its integer primary key. + + Args: + id: The primary key ID + organization_id: ID of the organization + + Returns: + WorkflowRecordingModel if found, None otherwise + """ + async with self.async_session() as session: + query = select(WorkflowRecordingModel).where( + WorkflowRecordingModel.id == id, + WorkflowRecordingModel.organization_id == organization_id, + WorkflowRecordingModel.is_active == True, + ) + + result = await session.execute(query) + return result.scalar_one_or_none() + + async def has_active_recordings( + self, + organization_id: int, + ) -> bool: + """Check if an organization has any active recordings. Args: - workflow_id: ID of the workflow organization_id: ID of the organization Returns: @@ -159,7 +179,6 @@ class WorkflowRecordingClient(BaseDBClient): select(func.count()) .select_from(WorkflowRecordingModel) .where( - WorkflowRecordingModel.workflow_id == workflow_id, WorkflowRecordingModel.organization_id == organization_id, WorkflowRecordingModel.is_active == True, ) @@ -167,11 +186,14 @@ class WorkflowRecordingClient(BaseDBClient): result = await session.execute(query) return result.scalar_one() > 0 - async def check_recording_id_exists(self, recording_id: str) -> bool: - """Check if a recording ID already exists globally. + async def check_recording_id_exists( + self, recording_id: str, organization_id: int + ) -> bool: + """Check if a recording ID already exists within an organization. Args: - recording_id: The short recording ID to check + recording_id: The recording ID to check + organization_id: ID of the organization Returns: True if exists, False otherwise @@ -179,10 +201,125 @@ class WorkflowRecordingClient(BaseDBClient): async with self.async_session() as session: query = select(WorkflowRecordingModel.id).where( WorkflowRecordingModel.recording_id == recording_id, + WorkflowRecordingModel.organization_id == organization_id, + WorkflowRecordingModel.is_active == True, ) result = await session.execute(query) return result.scalar_one_or_none() is not None + async def update_recording_id( + self, + id: int, + new_recording_id: str, + organization_id: int, + ) -> Optional[WorkflowRecordingModel]: + """Update the recording_id of a recording. + + Args: + id: Primary key ID of the recording + new_recording_id: New recording ID + organization_id: ID of the organization + + Returns: + Updated WorkflowRecordingModel if found, None otherwise + """ + async with self.async_session() as session: + query = select(WorkflowRecordingModel).where( + WorkflowRecordingModel.id == id, + WorkflowRecordingModel.organization_id == organization_id, + WorkflowRecordingModel.is_active == True, + ) + result = await session.execute(query) + recording = result.scalar_one_or_none() + + if not recording: + return None + + old_id = recording.recording_id + recording.recording_id = new_recording_id + await session.commit() + await session.refresh(recording) + + logger.info( + f"Updated recording ID {old_id} -> {new_recording_id}, " + f"org {organization_id}" + ) + return recording + + async def replace_recording_id_in_workflows( + self, + old_id: str, + new_id: str, + organization_id: int, + ) -> int: + """Replace all occurrences of a recording ID in workflow definitions. + + Updates both draft definitions (workflows.workflow_definition) and + versioned definitions (workflow_definitions.workflow_json), skipping + workflow_definitions with status 'legacy'. + + Args: + old_id: The old recording ID to find + new_id: The new recording ID to replace with + organization_id: ID of the organization (scopes to org workflows) + + Returns: + Total number of rows updated across both tables + """ + # Match the exact pattern used in prompts: "RECORDING_ID: " + old_pattern = f"RECORDING_ID: {old_id}" + new_pattern = f"RECORDING_ID: {new_id}" + + total = 0 + async with self.async_session() as session: + # Update workflows.workflow_definition (draft definitions) + result = await session.execute( + text(""" + UPDATE workflows + SET workflow_definition = + REPLACE(workflow_definition::text, :old_pat, :new_pat)::json + WHERE organization_id = :org_id + AND workflow_definition::text LIKE '%%' || :old_pat || '%%' + """), + { + "old_pat": old_pattern, + "new_pat": new_pattern, + "org_id": organization_id, + }, + ) + total += result.rowcount + + # Update workflow_definitions.workflow_json (versioned definitions) + # Skip legacy definitions + result = await session.execute( + text(""" + UPDATE workflow_definitions wd + SET workflow_json = + REPLACE(wd.workflow_json::text, :old_pat, :new_pat)::json + FROM workflows w + WHERE wd.workflow_id = w.id + AND w.organization_id = :org_id + AND wd.status != 'legacy' + AND wd.workflow_json::text LIKE '%%' || :old_pat || '%%' + """), + { + "old_pat": old_pattern, + "new_pat": new_pattern, + "org_id": organization_id, + }, + ) + total += result.rowcount + + await session.commit() + + if total > 0: + logger.info( + f"Replaced recording ID '{old_id}' -> '{new_id}' " + f"in {total} workflow definition(s), org {organization_id}" + ) + + return total + async def delete_recording( self, recording_id: str, diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 7fe75ad..96a592d 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -178,6 +178,11 @@ async def initiate_call( workflow_run_id = request.workflow_run_id if not workflow_run_id: + # Fetch workflow to merge template context variables (e.g. caller_number, + # called_number set in workflow settings for testing pre-call data fetch) + workflow = await db_client.get_workflow_by_id(request.workflow_id) + template_vars = (workflow.template_context_variables or {}) if workflow else {} + numeric_suffix = int(str(uuid.uuid4()).replace("-", "")[:8], 16) % 100000000 workflow_run_name = f"WR-TEL-OUT-{numeric_suffix:08d}" workflow_run = await db_client.create_workflow_run( @@ -187,6 +192,7 @@ async def initiate_call( user_id=user.id, call_type=CallType.OUTBOUND, initial_context={ + **template_vars, "phone_number": phone_number, "called_number": phone_number, "provider": provider.PROVIDER_NAME, diff --git a/api/routes/tool.py b/api/routes/tool.py index 49d1f5f..202772f 100644 --- a/api/routes/tool.py +++ b/api/routes/tool.py @@ -45,17 +45,29 @@ class HttpApiConfig(BaseModel): timeout_ms: Optional[int] = Field( default=5000, description="Request timeout in milliseconds" ) + customMessage: Optional[str] = Field( + default=None, description="Custom message to play after tool execution" + ) + customMessageType: Optional[Literal["text", "audio"]] = Field( + default=None, description="Type of custom message: text or audio" + ) + customMessageRecordingId: Optional[str] = Field( + default=None, description="Recording ID for audio custom message" + ) class EndCallConfig(BaseModel): """Configuration for End Call tools.""" - messageType: Literal["none", "custom"] = Field( + messageType: Literal["none", "custom", "audio"] = Field( default="none", description="Type of goodbye message" ) customMessage: Optional[str] = Field( default=None, description="Custom message to play before ending the call" ) + audioRecordingId: Optional[str] = Field( + default=None, description="Recording ID for audio goodbye message" + ) endCallReason: bool = Field( default=False, description="When enabled, LLM must provide a reason for ending the call. " @@ -74,12 +86,15 @@ class TransferCallConfig(BaseModel): destination: str = Field( description="Phone number or SIP endpoint to transfer the call to (E.164 format e.g., +1234567890, or SIP endpoint e.g., PJSIP/1234)" ) - messageType: Literal["none", "custom"] = Field( + messageType: Literal["none", "custom", "audio"] = Field( default="none", description="Type of message to play before transfer" ) customMessage: Optional[str] = Field( default=None, description="Custom message to play before transferring the call" ) + audioRecordingId: Optional[str] = Field( + default=None, description="Recording ID for audio message before transfer" + ) timeout: int = Field( default=30, ge=5, diff --git a/api/routes/workflow_recording.py b/api/routes/workflow_recording.py index 16e14a2..9f30997 100644 --- a/api/routes/workflow_recording.py +++ b/api/routes/workflow_recording.py @@ -16,6 +16,7 @@ from api.schemas.workflow_recording import ( BatchRecordingUploadResponseSchema, RecordingListResponseSchema, RecordingResponseSchema, + RecordingUpdateRequestSchema, RecordingUploadResponseSchema, ) from api.services.auth.depends import get_user @@ -25,11 +26,11 @@ from api.services.storage import storage_fs router = APIRouter(prefix="/workflow-recordings", tags=["workflow-recordings"]) -async def _generate_unique_recording_id() -> str: - """Generate a globally unique short recording ID.""" +async def _generate_unique_recording_id(organization_id: int) -> str: + """Generate a unique short recording ID within an organization.""" for _ in range(10): rid = generate_short_id(8) - exists = await db_client.check_recording_id_exists(rid) + exists = await db_client.check_recording_id_exists(rid, organization_id) if not exists: return rid raise HTTPException( @@ -69,11 +70,13 @@ async def get_upload_urls( try: items = [] for fd in request.files: - recording_id = await _generate_unique_recording_id() + recording_id = await _generate_unique_recording_id( + user.selected_organization_id + ) storage_key = ( f"recordings/{user.selected_organization_id}" - f"/{request.workflow_id}/{recording_id}" + f"/{recording_id}" f"/{fd.filename}" ) @@ -100,7 +103,7 @@ async def get_upload_urls( logger.info( f"Generated {len(items)} recording upload URL(s), " - f"workflow {request.workflow_id}, org {user.selected_organization_id}" + f"org {user.selected_organization_id}" ) return BatchRecordingUploadResponseSchema(items=items) @@ -131,22 +134,20 @@ async def create_recordings( for rec_req in request.recordings: recording = await db_client.create_recording( recording_id=rec_req.recording_id, - workflow_id=rec_req.workflow_id, organization_id=user.selected_organization_id, - tts_provider=rec_req.tts_provider, - tts_model=rec_req.tts_model, - tts_voice_id=rec_req.tts_voice_id, transcript=rec_req.transcript, storage_key=rec_req.storage_key, storage_backend=backend.value, created_by=user.id, + tts_provider=rec_req.tts_provider, + tts_model=rec_req.tts_model, + tts_voice_id=rec_req.tts_voice_id, metadata=rec_req.metadata, ) results.append(_build_response(recording)) logger.info( - f"Created {len(results)} recording(s) for " - f"workflow {request.recordings[0].workflow_id}" + f"Created {len(results)} recording(s) for org {user.selected_organization_id}" ) return BatchRecordingCreateResponseSchema(recordings=results) @@ -163,10 +164,12 @@ async def create_recordings( @router.get( "/", response_model=RecordingListResponseSchema, - summary="List recordings for a workflow", + summary="List recordings", ) async def list_recordings( - workflow_id: Annotated[int, Query(description="Workflow ID")], + workflow_id: Annotated[ + Optional[int], Query(description="Filter by workflow ID") + ] = None, tts_provider: Annotated[ Optional[str], Query(description="Filter by TTS provider") ] = None, @@ -178,11 +181,11 @@ async def list_recordings( ] = None, user=Depends(get_user), ): - """List recordings for a workflow, optionally filtered by TTS configuration.""" + """List recordings for the organization, optionally filtered.""" try: - recordings = await db_client.get_recordings_for_workflow( - workflow_id=workflow_id, + recordings = await db_client.get_recordings( organization_id=user.selected_organization_id, + workflow_id=workflow_id, tts_provider=tts_provider, tts_model=tts_model, tts_voice_id=tts_voice_id, @@ -233,6 +236,74 @@ async def delete_recording( ) from exc +@router.patch( + "/{id}", + response_model=RecordingResponseSchema, + summary="Update a recording's Recording ID", +) +async def update_recording( + id: int, + request: RecordingUpdateRequestSchema, + user=Depends(get_user), +): + """Update the recording_id (descriptive name) of a recording.""" + try: + new_id = request.recording_id.strip() + if not new_id: + raise HTTPException(status_code=400, detail="Recording ID cannot be empty") + + existing = await db_client.get_recording_by_id( + id, user.selected_organization_id + ) + if not existing: + raise HTTPException(status_code=404, detail="Recording not found") + + if new_id == existing.recording_id: + return _build_response(existing) + + exists = await db_client.check_recording_id_exists( + new_id, user.selected_organization_id + ) + if exists: + raise HTTPException( + status_code=409, + detail=f"Recording ID '{new_id}' is already in use", + ) + + old_id = existing.recording_id + + recording = await db_client.update_recording_id( + id=id, + new_recording_id=new_id, + organization_id=user.selected_organization_id, + ) + + if not recording: + raise HTTPException(status_code=404, detail="Recording not found") + + # Replace old recording ID in all non-legacy workflow definitions + updated = await db_client.replace_recording_id_in_workflows( + old_id=old_id, + new_id=new_id, + organization_id=user.selected_organization_id, + ) + if updated: + logger.info( + f"Updated {updated} workflow definition(s) with new recording ID " + f"'{old_id}' -> '{new_id}'" + ) + + return _build_response(recording) + + except HTTPException: + raise + except Exception as exc: + logger.error(f"Error updating recording: {exc}") + raise HTTPException( + status_code=500, detail="Failed to update recording" + ) from exc + + @router.post( "/transcribe", summary="Transcribe an audio file", diff --git a/api/schemas/workflow_recording.py b/api/schemas/workflow_recording.py index f60b16c..0c2d358 100644 --- a/api/schemas/workflow_recording.py +++ b/api/schemas/workflow_recording.py @@ -32,7 +32,6 @@ class FileDescriptor(BaseModel): class BatchRecordingUploadRequestSchema(BaseModel): """Request schema for getting presigned upload URLs for one or more files.""" - workflow_id: int = Field(..., description="Workflow ID these recordings belong to") files: List[FileDescriptor] = Field( ..., min_length=1, max_length=20, description="List of files to upload" ) @@ -50,10 +49,13 @@ class RecordingCreateRequestSchema(BaseModel): """Request schema for creating a recording record after upload.""" recording_id: str = Field(..., description="Short recording ID from upload step") - workflow_id: int = Field(..., description="Workflow ID") - tts_provider: str = Field(..., description="TTS provider (e.g. elevenlabs)") - tts_model: str = Field(..., description="TTS model name") - tts_voice_id: str = Field(..., description="TTS voice identifier") + tts_provider: Optional[str] = Field( + default=None, description="TTS provider (e.g. elevenlabs)" + ) + tts_model: Optional[str] = Field(default=None, description="TTS model name") + tts_voice_id: Optional[str] = Field( + default=None, description="TTS voice identifier" + ) transcript: str = Field( ..., description="User-provided transcript of the recording" ) @@ -68,11 +70,11 @@ class RecordingResponseSchema(BaseModel): id: int recording_id: str - workflow_id: int + workflow_id: Optional[int] = None organization_id: int - tts_provider: str - tts_model: str - tts_voice_id: str + tts_provider: Optional[str] = None + tts_model: Optional[str] = None + tts_voice_id: Optional[str] = None transcript: str storage_key: str storage_backend: str @@ -98,6 +100,18 @@ class BatchRecordingCreateResponseSchema(BaseModel): ) +class RecordingUpdateRequestSchema(BaseModel): + """Request schema for updating a recording's ID.""" + + recording_id: str = Field( + ..., + min_length=1, + max_length=64, + pattern=r"^[a-zA-Z0-9_-]+$", + description="New descriptive recording ID (letters, numbers, hyphens, underscores only)", + ) + + class RecordingListResponseSchema(BaseModel): """Response schema for list of recordings.""" diff --git a/api/services/campaign/campaign_call_dispatcher.py b/api/services/campaign/campaign_call_dispatcher.py index 505c7f4..b414d5f 100644 --- a/api/services/campaign/campaign_call_dispatcher.py +++ b/api/services/campaign/campaign_call_dispatcher.py @@ -200,7 +200,6 @@ class CampaignCallDispatcher: # Merge context variables (queued_run context already includes retry info if applicable) initial_context = { - **workflow.template_context_variables, **queued_run.context_variables, "campaign_id": campaign.id, "provider": provider.PROVIDER_NAME, diff --git a/api/services/pipecat/audio_playback.py b/api/services/pipecat/audio_playback.py new file mode 100644 index 0000000..f0b058e --- /dev/null +++ b/api/services/pipecat/audio_playback.py @@ -0,0 +1,188 @@ +"""Utilities for playing audio through the pipeline transport. + +Provides one-shot and looping playback of raw PCM audio. All playback +should be routed through ``transport.output().queue_frame`` so the audio +reaches the caller without passing through STT (which would otherwise +generate phantom transcriptions). +""" + +import asyncio +import uuid +from typing import Awaitable, Callable, Dict, Optional, Tuple + +import numpy as np +from loguru import logger + +from pipecat.frames.frames import ( + Frame, + OutputAudioRawFrame, + TTSAudioRawFrame, + TTSStartedFrame, + TTSStoppedFrame, + TTSTextFrame, +) + +try: + import soundfile as sf +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error("In order to use audio playback, you need to `pip install soundfile`.") + raise Exception(f"Missing module: {e}") + + +# --------------------------------------------------------------------------- +# Audio file loading / caching +# --------------------------------------------------------------------------- + +_audio_cache: Dict[Tuple[str, int], bytes] = {} + + +def load_audio_file(file_path: str, sample_rate: int) -> Optional[bytes]: + """Load an audio file as PCM-16 bytes, caching the result. + + Args: + file_path: Path to a WAV audio file. + sample_rate: Target sample rate (used as cache key; no resampling + is performed here). + + Returns: + Raw PCM-16 bytes, or *None* on failure. + """ + cache_key = (file_path, sample_rate) + if cache_key in _audio_cache: + logger.debug(f"Using cached audio for {file_path} at {sample_rate}Hz") + return _audio_cache[cache_key] + + try: + logger.info(f"Loading audio from {file_path} at {sample_rate}Hz") + sound, file_sample_rate = sf.read(file_path, dtype="int16") + logger.info( + f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}" + ) + + # Ensure mono (take first channel if stereo) + if len(sound.shape) > 1: + sound = sound[:, 0] + + if file_sample_rate != sample_rate: + logger.warning( + f"Audio file has sample rate {file_sample_rate}, expected {sample_rate}" + ) + + audio_bytes = sound.astype(np.int16).tobytes() + _audio_cache[cache_key] = audio_bytes + logger.info(f"Audio loaded: {len(sound)} samples at {sample_rate}Hz") + return audio_bytes + + except Exception as e: + logger.error(f"Failed to load audio file {file_path}: {e}") + return None + + +def clear_audio_cache() -> None: + """Clear the audio file cache to free memory.""" + _audio_cache.clear() + logger.info("Audio cache cleared") + + +# --------------------------------------------------------------------------- +# Playback helpers +# --------------------------------------------------------------------------- + + +async def play_audio( + audio_data: bytes, + *, + sample_rate: int, + queue_frame: Callable[[Frame], Awaitable[None]], + transcript: Optional[str] = None, + append_to_context: bool = False, +) -> None: + """Play raw PCM-16 audio once. + + Pushes ``TTSStarted -> TTSAudioRaw -> TTSStopped`` so downstream + processors (audio buffer, context aggregators) handle the audio + correctly. + + When *transcript* is provided a ``TTSTextFrame`` is also pushed so + that observers (e.g. ``RealtimeFeedbackObserver``) can relay the + spoken text to the UI. + + Args: + audio_data: Raw 16-bit mono PCM bytes. + sample_rate: Pipeline sample rate (e.g. 16000). + queue_frame: Frame sink -- typically ``transport.output().queue_frame``. + transcript: Optional transcript of the recording. + append_to_context: Whether the transcript should be appended to + the LLM assistant context. Defaults to False. + """ + context_id = str(uuid.uuid4()) + await queue_frame(TTSStartedFrame(context_id=context_id)) + if transcript: + tts_text = TTSTextFrame( + text=transcript, aggregated_by="recording", context_id=context_id + ) + tts_text.append_to_context = append_to_context + await queue_frame(tts_text) + await queue_frame( + TTSAudioRawFrame( + audio=audio_data, + sample_rate=sample_rate, + num_channels=1, + context_id=context_id, + ) + ) + await queue_frame(TTSStoppedFrame(context_id=context_id)) + + +async def play_audio_loop( + *, + stop_event: asyncio.Event, + sample_rate: int, + queue_frame: Callable[[Frame], Awaitable[None]], + audio_file: Optional[str] = None, +) -> None: + """Play audio in a loop until *stop_event* is set. + + Used for hold music during call transfers and ringers during + pre-call data fetches. + + Args: + stop_event: Set this event to terminate the loop. + sample_rate: Target sample rate for audio playback. + queue_frame: Frame sink -- typically ``transport.output().queue_frame``. + audio_file: Path to a WAV file. When *None* the default + ``transfer_hold_ring_{sample_rate}.wav`` asset is used. + """ + if audio_file is None: + from api.constants import APP_ROOT_DIR + + audio_file = str( + APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav" + ) + + audio_data = load_audio_file(audio_file, sample_rate) + if not audio_data: + logger.warning(f"Audio loop: failed to load {audio_file}, skipping") + return + + num_samples = len(audio_data) // 2 # 16-bit PCM = 2 bytes per sample + duration = num_samples / sample_rate + + logger.debug(f"Audio loop: playing at {sample_rate}Hz") + try: + while not stop_event.is_set(): + frame = OutputAudioRawFrame( + audio=audio_data, + sample_rate=sample_rate, + num_channels=1, + ) + await queue_frame(frame) + try: + await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5) + break + except asyncio.TimeoutError: + pass + except Exception as e: + logger.error(f"Audio loop error: {e}") + logger.debug("Audio loop: stopped") diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 829d3c9..fe6c769 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -6,6 +6,7 @@ from api.db import db_client from api.enums import PostHogEvent, WorkflowRunState from api.services.campaign.circuit_breaker import circuit_breaker from api.services.pipecat.audio_config import AudioConfig +from api.services.pipecat.audio_playback import play_audio, play_audio_loop from api.services.pipecat.in_memory_buffers import ( InMemoryAudioBuffer, InMemoryLogsBuffer, @@ -16,8 +17,11 @@ from api.services.posthog_client import capture_event from api.services.workflow.pipecat_engine import PipecatEngine from api.tasks.arq import enqueue_job from api.tasks.function_names import FunctionNames -from api.utils.hold_audio import play_hold_audio_loop -from pipecat.frames.frames import Frame, LLMContextFrame, TTSSpeakFrame +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + TTSSpeakFrame, +) from pipecat.pipeline.task import PipelineTask from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor from pipecat.utils.enums import EndTaskReason @@ -64,6 +68,7 @@ def register_event_handlers( pipeline_metrics_aggregator: PipelineMetricsAggregator, audio_config=AudioConfig, pre_call_fetch_task: asyncio.Task | None = None, + fetch_recording_audio=None, user_provider_id: str | None = None, ): """Register all event handlers for transport and task events. @@ -123,7 +128,11 @@ def register_event_handlers( stop_ringer = asyncio.Event() sample_rate = audio_config.pipeline_sample_rate or 16000 ringer_task = asyncio.create_task( - play_hold_audio_loop(task, stop_ringer, sample_rate) + play_audio_loop( + stop_event=stop_ringer, + sample_rate=sample_rate, + queue_frame=transport.output().queue_frame, + ) ) try: fetch_result = await pre_call_fetch_task @@ -151,12 +160,35 @@ def register_event_handlers( # so that render_template() has the complete _call_context_vars. await engine.set_node(engine.workflow.start_node_id) - greeting = engine.get_start_greeting() - if greeting: - logger.debug( - "Both pipeline_started and client_connected received - playing greeting via TTS" - ) - await task.queue_frame(TTSSpeakFrame(greeting)) + greeting_info = engine.get_start_greeting() + if greeting_info: + greeting_type, greeting_value = greeting_info + if ( + greeting_type == "audio" + and greeting_value + and fetch_recording_audio + ): + logger.debug(f"Playing audio greeting recording: {greeting_value}") + result = await fetch_recording_audio( + recording_pk=int(greeting_value) + ) + if result: + await play_audio( + result.audio, + sample_rate=audio_config.pipeline_sample_rate or 16000, + queue_frame=transport.output().queue_frame, + transcript=result.transcript, + append_to_context=True, + ) + else: + logger.warning( + f"Failed to fetch audio greeting {greeting_value}, " + "falling back to LLM generation" + ) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) + else: + logger.debug("Playing text greeting via TTS") + await task.queue_frame(TTSSpeakFrame(greeting_value)) else: logger.debug( "Both pipeline_started and client_connected received - triggering initial LLM generation" diff --git a/api/services/pipecat/realtime_feedback_observer.py b/api/services/pipecat/realtime_feedback_observer.py index fe5f641..35b393d 100644 --- a/api/services/pipecat/realtime_feedback_observer.py +++ b/api/services/pipecat/realtime_feedback_observer.py @@ -170,7 +170,10 @@ class RealtimeFeedbackObserver(BaseObserver): frame_direction = data.direction # Skip already processed frames (frames can be observed multiple times) - if frame.id in self._frames_seen: + if ( + frame.id in self._frames_seen + or frame_direction != FrameDirection.DOWNSTREAM + ): return self._frames_seen.add(frame.id) diff --git a/api/services/pipecat/recording_audio_cache.py b/api/services/pipecat/recording_audio_cache.py index 4063669..23be271 100644 --- a/api/services/pipecat/recording_audio_cache.py +++ b/api/services/pipecat/recording_audio_cache.py @@ -7,7 +7,7 @@ subsequent plays (even from other workers) are instantaneous. """ import os -from typing import Awaitable, Callable, Optional +from typing import Awaitable, Callable, NamedTuple, Optional import numpy as np from loguru import logger @@ -22,14 +22,24 @@ from .audio_file_cache import ( write_cache_file, ) + +class RecordingAudio(NamedTuple): + """Audio bytes paired with the recording's transcript (when available).""" + + audio: bytes + transcript: Optional[str] = None + + # --------------------------------------------------------------------------- # Cache path helper # --------------------------------------------------------------------------- -def _cache_path(recording_id: str, sample_rate: int) -> str: +def _cache_path(organization_id: int, recording_id: str, sample_rate: int) -> str: """Return the on-disk path for a cached PCM file.""" - return os.path.join(CACHE_DIR, f"{recording_id}_{sample_rate}.pcm") + return os.path.join( + CACHE_DIR, f"{organization_id}_{recording_id}_{sample_rate}.pcm" + ) # --------------------------------------------------------------------------- @@ -40,54 +50,95 @@ def _cache_path(recording_id: str, sample_rate: int) -> str: def create_recording_audio_fetcher( organization_id: int, pipeline_sample_rate: int, -) -> Callable[[str], Awaitable[Optional[bytes]]]: - """Create an async callback that returns raw PCM bytes for a recording_id. +) -> Callable[..., Awaitable[Optional[bytes]]]: + """Create an async callback that returns raw PCM bytes for a recording. - The returned callable: - 1. Checks the filesystem cache (keyed by ``recording_id`` + sample rate). - 2. On miss, looks up the recording in the DB, downloads the audio file - from S3/MinIO, converts it to 16-bit mono PCM at *pipeline_sample_rate*, - trims leading/trailing silence, caches the result on disk, and returns it. + The returned callable accepts **one** of two keyword arguments: + - ``recording_pk`` – the immutable integer primary key (used by + dropdown-based selections: greeting, edges, tool configs). + - ``recording_id`` – the human-readable string ID (used by + prompt-based ``RECORDING_ID: xxx`` references). + + Flow: + 1. Checks the filesystem cache (keyed by org + pk + sample rate). + 2. On miss, looks up the recording in the DB, downloads the audio + from S3/MinIO, converts to 16-bit mono PCM, trims silence, and + caches the result on disk. Args: organization_id: Organization owning the recordings. pipeline_sample_rate: Target PCM sample rate for the pipeline. - - Returns: - ``async (recording_id: str) -> Optional[bytes]`` """ from api.db import db_client from api.services.storage import get_storage_for_backend - # Resolve storage instances once per backend at creation time, not per fetch. _storage_cache: dict[str, object] = {} + _transcript_cache: dict[str, Optional[str]] = {} def _get_storage(backend: str): if backend not in _storage_cache: _storage_cache[backend] = get_storage_for_backend(backend) return _storage_cache[backend] - async def fetch(recording_id: str) -> Optional[bytes]: - cached = _cache_path(recording_id, pipeline_sample_rate) + async def _lookup_recording( + cache_key: str, + recording_pk: Optional[int], + recording_id: Optional[str], + ): + """DB lookup with transcript caching.""" + if recording_pk is not None: + recording = await db_client.get_recording_by_id( + recording_pk, organization_id + ) + else: + recording = await db_client.get_recording_by_recording_id( + recording_id, organization_id + ) + if recording: + _transcript_cache[cache_key] = recording.transcript or None + return recording + + async def fetch( + *, + recording_pk: Optional[int] = None, + recording_id: Optional[str] = None, + ) -> Optional[RecordingAudio]: + if recording_pk is None and recording_id is None: + logger.warning("fetch called with neither recording_pk nor recording_id") + return None + + # Use pk for cache key when available, otherwise recording_id + cache_key = str(recording_pk) if recording_pk is not None else recording_id + cached = _cache_path(organization_id, cache_key, pipeline_sample_rate) # 1. Serve from filesystem cache if os.path.exists(cached): - logger.debug(f"Recording {recording_id} served from disk cache") - return read_cached_file(cached) + logger.debug(f"Recording {cache_key} served from disk cache") + audio = read_cached_file(cached) + # Transcript may already be in memory from a prior fetch; + # if not, do a lightweight DB lookup. + if cache_key not in _transcript_cache: + await _lookup_recording(cache_key, recording_pk, recording_id) + return RecordingAudio( + audio=audio, transcript=_transcript_cache.get(cache_key) + ) # 2. DB lookup - recording = await db_client.get_recording_by_recording_id( - recording_id, organization_id - ) + recording = await _lookup_recording(cache_key, recording_pk, recording_id) + if not recording: - logger.warning(f"Recording {recording_id} not found in database") + logger.warning(f"Recording {cache_key} not found in database") return None # 3. Download, convert, trim, and cache pcm_data = await _download_and_convert( recording, pipeline_sample_rate, _get_storage ) - return pcm_data + if pcm_data is None: + return None + return RecordingAudio( + audio=pcm_data, transcript=_transcript_cache.get(cache_key) + ) return fetch @@ -98,11 +149,10 @@ def create_recording_audio_fetcher( async def warm_recording_cache( - workflow_id: int, organization_id: int, pipeline_sample_rate: int, ) -> None: - """Pre-fetch all active recordings for a workflow into the disk cache. + """Pre-fetch all active recordings for an organization into the disk cache. Launched as a background ``asyncio.Task`` at pipeline startup so that recordings are ready before the first playback request. Errors are logged @@ -112,9 +162,7 @@ async def warm_recording_cache( from api.services.storage import get_storage_for_backend try: - recordings = await db_client.get_recordings_for_workflow( - workflow_id, organization_id - ) + recordings = await db_client.get_recordings(organization_id=organization_id) if not recordings: return @@ -122,15 +170,20 @@ async def warm_recording_cache( uncached = [ r for r in recordings - if not os.path.exists(_cache_path(r.recording_id, pipeline_sample_rate)) + if not os.path.exists( + _cache_path(organization_id, str(r.id), pipeline_sample_rate) + ) + and not os.path.exists( + _cache_path(organization_id, r.recording_id, pipeline_sample_rate) + ) ] if not uncached: - logger.debug(f"Recording cache already warm for workflow {workflow_id}") + logger.debug(f"Recording cache already warm for org {organization_id}") return logger.info( f"Warming recording cache: {len(uncached)}/{len(recordings)} " - f"recording(s) for workflow {workflow_id}" + f"recording(s) for org {organization_id}" ) # Resolve storage instances once per backend, not per recording @@ -156,7 +209,7 @@ async def warm_recording_cache( f"Cache warm: error processing {recording.recording_id}" ) - logger.info(f"Recording cache warm complete for workflow {workflow_id}") + logger.info(f"Recording cache warm complete for org {organization_id}") except Exception: logger.exception("Recording cache warm failed") @@ -187,7 +240,11 @@ async def _download_and_convert( pcm_data = _trim_silence(pcm_data, sample_rate) # Write to disk cache - cached = _cache_path(recording.recording_id, sample_rate) + cached = _cache_path( + recording.organization_id, + recording.recording_id, + sample_rate, + ) write_cache_file(cached, pcm_data) return pcm_data diff --git a/api/services/pipecat/recording_router_processor.py b/api/services/pipecat/recording_router_processor.py index d1291a9..23ceb67 100644 --- a/api/services/pipecat/recording_router_processor.py +++ b/api/services/pipecat/recording_router_processor.py @@ -17,6 +17,7 @@ from typing import Awaitable, Callable, Optional from loguru import logger +from api.services.pipecat.recording_audio_cache import RecordingAudio from api.services.workflow.pipecat_engine_context_composer import ( RECORDING_MARKER, TTS_MARKER, @@ -48,14 +49,14 @@ class RecordingRouterProcessor(FrameProcessor): Args: audio_sample_rate: Pipeline sample rate for OutputAudioRawFrame. fetch_recording_audio: Async callback that takes a recording_id and - returns raw 16-bit mono PCM bytes, or None on failure. + returns a RecordingAudio (audio + transcript), or None on failure. """ def __init__( self, *, audio_sample_rate: int, - fetch_recording_audio: Callable[[str], Awaitable[Optional[bytes]]], + fetch_recording_audio: Callable[..., Awaitable[Optional[RecordingAudio]]], **kwargs, ): super().__init__(**kwargs) @@ -245,8 +246,8 @@ class RecordingRouterProcessor(FrameProcessor): """ logger.info(f"Playing pre-recorded audio: {recording_id}") - audio_data = await self._fetch_recording_audio(recording_id) - if not audio_data: + result = await self._fetch_recording_audio(recording_id=recording_id) + if not result: logger.warning( f"Failed to fetch recording {recording_id}, no audio will play" ) @@ -256,7 +257,7 @@ class RecordingRouterProcessor(FrameProcessor): await self.push_frame(TTSStartedFrame(context_id=context_id)) await self.push_frame( TTSAudioRawFrame( - audio=audio_data, + audio=result.audio, sample_rate=self._audio_sample_rate, num_channels=1, context_id=context_id, @@ -264,10 +265,10 @@ class RecordingRouterProcessor(FrameProcessor): ) await self.push_frame(TTSStoppedFrame(context_id=context_id)) - duration_secs = len(audio_data) / (self._audio_sample_rate * 2) + duration_secs = len(result.audio) / (self._audio_sample_rate * 2) logger.debug( f"Finished pushing recording {recording_id} " - f"({len(audio_data)} bytes, {duration_secs:.1f}s)" + f"({len(result.audio)} bytes, {duration_secs:.1f}s)" ) # ------------------------------------------------------------------ diff --git a/api/services/pipecat/run_pipeline.py b/api/services/pipecat/run_pipeline.py index 30cda60..b9ff9a3 100644 --- a/api/services/pipecat/run_pipeline.py +++ b/api/services/pipecat/run_pipeline.py @@ -698,9 +698,7 @@ async def _run_pipeline( # Check if the workflow has any active recordings so the engine can # include recording response mode instructions in all node prompts. - has_recordings = await db_client.has_active_recordings( - workflow_id, workflow.organization_id - ) + has_recordings = await db_client.has_active_recordings(workflow.organization_id) context_compaction_enabled = (workflow.workflow_configurations or {}).get( "context_compaction_enabled", False @@ -831,6 +829,14 @@ async def _run_pipeline( voicemail_detector = None recording_router = None + # Create recording audio fetcher (used by recording router, audio greetings, + # and audio transition speech) + fetch_audio = create_recording_audio_fetcher( + organization_id=workflow.organization_id, + pipeline_sample_rate=audio_config.pipeline_sample_rate, + ) + engine.set_fetch_recording_audio(fetch_audio) + if not is_realtime: # Create voicemail detector if enabled in workflow configurations voicemail_config = (workflow.workflow_configurations or {}).get( @@ -871,10 +877,6 @@ async def _run_pipeline( # Create recording router if workflow has active recordings if has_recordings: - fetch_audio = create_recording_audio_fetcher( - organization_id=workflow.organization_id, - pipeline_sample_rate=audio_config.pipeline_sample_rate, - ) recording_router = RecordingRouterProcessor( audio_sample_rate=audio_config.pipeline_sample_rate, fetch_recording_audio=fetch_audio, @@ -883,7 +885,6 @@ async def _run_pipeline( # before the first playback request. asyncio.create_task( warm_recording_cache( - workflow_id=workflow_id, organization_id=workflow.organization_id, pipeline_sample_rate=audio_config.pipeline_sample_rate, ) @@ -918,8 +919,9 @@ async def _run_pipeline( # Create pipeline task with audio configuration task = create_pipeline_task(pipeline, workflow_run_id, audio_config) - # Now set the task on the engine + # Now set the task and transport output on the engine engine.set_task(task) + engine.set_transport_output(transport.output()) # Initialize the engine to set the initial context with # System Prompt and Tools @@ -979,6 +981,7 @@ async def _run_pipeline( pipeline_metrics_aggregator=pipeline_metrics_aggregator, audio_config=audio_config, pre_call_fetch_task=pre_call_fetch_task, + fetch_recording_audio=fetch_audio, user_provider_id=user_provider_id, ) diff --git a/api/services/pipecat/service_factory.py b/api/services/pipecat/service_factory.py index 6a6bfe3..14c73ae 100644 --- a/api/services/pipecat/service_factory.py +++ b/api/services/pipecat/service_factory.py @@ -230,7 +230,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): api_key=user_config.tts.api_key, settings=DeepgramTTSSettings(voice=user_config.tts.voice), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) elif user_config.tts.provider == ServiceProviders.OPENAI.value: @@ -238,7 +238,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): api_key=user_config.tts.api_key, settings=OpenAITTSSettings(model=user_config.tts.model), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) elif user_config.tts.provider == ServiceProviders.ELEVENLABS.value: @@ -258,7 +258,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): similarity_boost=0.75, ), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) elif user_config.tts.provider == ServiceProviders.CARTESIA.value: @@ -284,7 +284,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): ), ), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) elif user_config.tts.provider == ServiceProviders.DOGRAH.value: @@ -299,7 +299,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): speed=user_config.tts.speed, ), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) elif user_config.tts.provider == ServiceProviders.CAMB.value: @@ -312,7 +312,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): voice_id=voice_id, model=user_config.tts.model, text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], ) # Set language directly as BCP-47 code (bypasses Language enum conversion) tts._settings.language = language @@ -327,7 +327,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): speed=user_config.tts.speed, ), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) elif user_config.tts.provider == ServiceProviders.RIME.value: @@ -352,7 +352,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): api_key=user_config.tts.api_key, settings=RimeTTSSettings(**settings_kwargs), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) elif user_config.tts.provider == ServiceProviders.SARVAM.value: @@ -382,7 +382,7 @@ def create_tts_service(user_config, audio_config: "AudioConfig"): language=pipecat_language, ), text_filters=[xml_function_tag_filter], - skip_aggregator_types=["recording_router"], + skip_aggregator_types=["recording_router", "recording"], silence_time_s=1.0, ) else: diff --git a/api/services/workflow/dto.py b/api/services/workflow/dto.py index 31d9a35..1fc3258 100644 --- a/api/services/workflow/dto.py +++ b/api/services/workflow/dto.py @@ -54,6 +54,8 @@ class NodeDataDTO(BaseModel): extraction_variables: Optional[list[ExtractionVariableDTO]] = None add_global_prompt: bool = True greeting: Optional[str] = None + greeting_type: Optional[str] = None # 'text' or 'audio' + greeting_recording_id: Optional[str] = None wait_for_user_response: bool = False wait_for_user_response_timeout: Optional[float] = None detect_voicemail: bool = False @@ -102,6 +104,8 @@ class EdgeDataDTO(BaseModel): label: str = Field(..., min_length=1) condition: str = Field(..., min_length=1) transition_speech: Optional[str] = None + transition_speech_type: Optional[str] = None # 'text' or 'audio' + transition_speech_recording_id: Optional[str] = None class RFEdgeDTO(BaseModel): diff --git a/api/services/workflow/duplicate.py b/api/services/workflow/duplicate.py index cd67146..dc7b4c5 100644 --- a/api/services/workflow/duplicate.py +++ b/api/services/workflow/duplicate.py @@ -1,14 +1,12 @@ -"""Service for duplicating workflows including recordings.""" +"""Service for duplicating workflows.""" import copy -import json import posixpath import uuid from loguru import logger from api.db import db_client -from api.db.workflow_recording_client import generate_short_id from api.enums import StorageBackend from api.services.storage import get_storage_for_backend, storage_fs @@ -41,22 +39,14 @@ def _regenerate_trigger_uuids(workflow_definition: dict) -> dict: return updated_definition -async def _generate_unique_recording_id() -> str: - """Generate a globally unique short recording ID.""" - for _ in range(10): - rid = generate_short_id(8) - exists = await db_client.check_recording_id_exists(rid) - if not exists: - return rid - raise RuntimeError("Failed to generate unique recording ID") - - async def duplicate_workflow( workflow_id: int, organization_id: int, user_id: int, ): - """Duplicate a workflow including its definition, config, recordings, and triggers. + """Duplicate a workflow including its definition, config, and triggers. + + Recordings are org-scoped and shared, so they are not duplicated. Args: workflow_id: The source workflow ID to duplicate @@ -130,29 +120,7 @@ async def duplicate_workflow( organization_id=organization_id, ) - # 6. Copy recordings with new IDs and storage paths scoped to new workflow - recording_id_map = await _duplicate_recordings( - source_workflow_id=workflow_id, - new_workflow_id=new_workflow.id, - organization_id=organization_id, - user_id=user_id, - ) - - # 7. Replace old recording IDs with new ones in the workflow definition - if recording_id_map: - workflow_definition = _replace_recording_ids( - workflow_definition, recording_id_map - ) - new_workflow = await db_client.update_workflow( - workflow_id=new_workflow.id, - name=None, - workflow_definition=workflow_definition, - template_context_variables=None, - workflow_configurations=None, - organization_id=organization_id, - ) - - # 8. Sync triggers for the new workflow + # 6. Sync triggers for the new workflow if workflow_definition: trigger_paths = _extract_trigger_paths(workflow_definition) if trigger_paths: @@ -165,94 +133,6 @@ async def duplicate_workflow( return new_workflow -async def _duplicate_recordings( - source_workflow_id: int, - new_workflow_id: int, - organization_id: int, - user_id: int, -) -> dict[str, str]: - """Duplicate all recordings for a workflow. - - Copies each recording file to a new storage path scoped under the new - workflow ID, and creates new DB records pointing to the copied files. - - Returns: - Mapping of old_recording_id -> new_recording_id - """ - recordings = await db_client.get_recordings_for_workflow( - workflow_id=source_workflow_id, - organization_id=organization_id, - ) - - if not recordings: - return {} - - recording_id_map: dict[str, str] = {} - - for rec in recordings: - try: - new_recording_id = await _generate_unique_recording_id() - - # Build new storage key: recordings/{org_id}/{new_workflow_id}/{new_recording_id}/{filename} - filename = posixpath.basename(rec.storage_key) - new_storage_key = ( - f"recordings/{organization_id}" - f"/{new_workflow_id}/{new_recording_id}" - f"/{filename}" - ) - - copied = await _copy_storage_object( - rec.storage_key, new_storage_key, rec.storage_backend - ) - if not copied: - logger.warning( - f"Failed to copy recording file {rec.recording_id}, skipping" - ) - continue - - await db_client.create_recording( - recording_id=new_recording_id, - workflow_id=new_workflow_id, - organization_id=organization_id, - tts_provider=rec.tts_provider, - tts_model=rec.tts_model, - tts_voice_id=rec.tts_voice_id, - transcript=rec.transcript, - storage_key=new_storage_key, - storage_backend=rec.storage_backend, - created_by=user_id, - metadata=copy.deepcopy(rec.recording_metadata), - ) - - recording_id_map[rec.recording_id] = new_recording_id - logger.info( - f"Duplicated recording {rec.recording_id} -> {new_recording_id}" - ) - - except Exception as e: - logger.error(f"Error duplicating recording {rec.recording_id}: {e}") - continue - - return recording_id_map - - -def _replace_recording_ids( - workflow_definition: dict, - recording_id_map: dict[str, str], -) -> dict: - """Replace old recording IDs with new ones throughout the workflow definition. - - Uses JSON serialization to do a thorough find-and-replace across all - nested fields (node prompts, data, etc.). - """ - definition_str = json.dumps(workflow_definition) - - for old_id, new_id in recording_id_map.items(): - definition_str = definition_str.replace(old_id, new_id) - - return json.loads(definition_str) - - async def _copy_storage_object( source_key: str, dest_key: str, storage_backend: str ) -> bool: diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index 50f3ffd..784de4d 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING, Awaitable, Callable, Optional, Union +from api.services.pipecat.audio_playback import play_audio from api.services.workflow.disposition_mapper import ( apply_disposition_mapping, get_organization_id_from_workflow_run, @@ -114,6 +115,13 @@ class PipecatEngine: # Audio configuration (set via set_audio_config from _run_pipeline) self._audio_config = None + # Transport output processor for injecting audio directly into the + # output, bypassing STT (set via set_transport_output from _run_pipeline) + self._transport_output = None + + # Recording audio fetcher (set via set_fetch_recording_audio from _run_pipeline) + self._fetch_recording_audio = None + # True when the workflow has active recordings; enables recording # response mode instructions on all nodes for in-context learning. self._has_recordings: bool = has_recordings @@ -191,6 +199,8 @@ class PipecatEngine: name: str, transition_to_node: str, transition_speech: Optional[str] = None, + transition_speech_type: Optional[str] = None, + transition_speech_recording_id: Optional[str] = None, ): async def transition_func(function_call_params: FunctionCallParams) -> None: """Inner function that handles the node change tool calls""" @@ -204,8 +214,34 @@ class PipecatEngine: # Perform variable extraction before transitioning to new node await self._perform_variable_extraction_if_needed(self._current_node) - # Queue transition speech before switching nodes - if transition_speech: + # Queue transition speech/audio before switching nodes + speech_type = transition_speech_type or "text" + if ( + speech_type == "audio" + and transition_speech_recording_id + and self._fetch_recording_audio + ): + logger.info( + f"Playing transition audio: {transition_speech_recording_id}" + ) + self._queued_speech_mute_state = "waiting" + result = await self._fetch_recording_audio( + recording_pk=int(transition_speech_recording_id) + ) + if result: + await play_audio( + result.audio, + sample_rate=self._audio_config.pipeline_sample_rate + if self._audio_config + else 16000, + queue_frame=self._transport_output.queue_frame, + transcript=result.transcript, + ) + else: + logger.warning( + f"Failed to fetch transition audio {transition_speech_recording_id}" + ) + elif transition_speech: logger.info(f"Playing transition speech: {transition_speech}") self._queued_speech_mute_state = "waiting" await self.task.queue_frame( @@ -259,6 +295,8 @@ class PipecatEngine: name: str, transition_to_node: str, transition_speech: Optional[str] = None, + transition_speech_type: Optional[str] = None, + transition_speech_recording_id: Optional[str] = None, ): logger.debug( f"Registering function {name} to transition to node {transition_to_node} with LLM" @@ -266,7 +304,11 @@ class PipecatEngine: # Create transition function transition_func = await self._create_transition_func( - name, transition_to_node, transition_speech + name, + transition_to_node, + transition_speech, + transition_speech_type, + transition_speech_recording_id, ) # Register function with LLM @@ -442,6 +484,8 @@ class PipecatEngine: outgoing_edge.get_function_name(), outgoing_edge.target, outgoing_edge.transition_speech, + outgoing_edge.data.transition_speech_type, + outgoing_edge.data.transition_speech_recording_id, ) # Register custom tool handlers for this node @@ -533,11 +577,27 @@ class PipecatEngine: # Setup LLM Context with Prompts and Functions await self._setup_llm_context(node) - def get_start_greeting(self) -> Optional[str]: - """Return the rendered greeting for the start node, or None if not configured.""" + def get_start_greeting(self) -> Optional[tuple[str, Optional[str]]]: + """Return the greeting info for the start node, or None if not configured. + + Returns: + A tuple of (greeting_type, value) where: + - ("text", rendered_text) for text greetings spoken via TTS + - ("audio", recording_id) for pre-recorded audio greetings + Or None if no greeting is configured. + """ start_node = self.workflow.nodes.get(self.workflow.start_node_id) - if start_node and start_node.greeting: - return self._format_prompt(start_node.greeting) + if not start_node: + return None + + greeting_type = start_node.greeting_type or "text" + + if greeting_type == "audio" and start_node.greeting_recording_id: + return ("audio", start_node.greeting_recording_id) + + if start_node.greeting: + return ("text", self._format_prompt(start_node.greeting)) + return None async def _handle_end_node(self, node: Node) -> None: @@ -698,6 +758,18 @@ class PipecatEngine: """Set the audio configuration for the pipeline.""" self._audio_config = audio_config + def set_transport_output(self, transport_output) -> None: + """Set the transport output processor for direct audio playback. + + Audio queued here bypasses STT and the rest of the pipeline, + going straight to the caller. + """ + self._transport_output = transport_output + + def set_fetch_recording_audio(self, fetch_fn) -> None: + """Set the recording audio fetcher callback.""" + self._fetch_recording_audio = fetch_fn + def set_mute_pipeline(self, mute: bool) -> None: """Set the pipeline mute state. diff --git a/api/services/workflow/pipecat_engine_callbacks.py b/api/services/workflow/pipecat_engine_callbacks.py index 8fb1511..83990bf 100644 --- a/api/services/workflow/pipecat_engine_callbacks.py +++ b/api/services/workflow/pipecat_engine_callbacks.py @@ -168,7 +168,6 @@ def create_aggregation_correction_callback(engine: "PipecatEngine"): reference = engine._current_llm_generation_reference_text if not reference: - logger.warning("No reference text available for aggregation correction") return corrupted # Apply the correction algorithm diff --git a/api/services/workflow/pipecat_engine_custom_tools.py b/api/services/workflow/pipecat_engine_custom_tools.py index eecb338..0ee12fd 100644 --- a/api/services/workflow/pipecat_engine_custom_tools.py +++ b/api/services/workflow/pipecat_engine_custom_tools.py @@ -16,6 +16,7 @@ from loguru import logger from api.db import db_client from api.enums import ToolCategory, WorkflowRunMode +from api.services.pipecat.audio_playback import play_audio, play_audio_loop from api.services.telephony.call_transfer_manager import get_call_transfer_manager from api.services.telephony.factory import get_telephony_provider from api.services.telephony.transfer_event_protocol import TransferContext @@ -27,7 +28,6 @@ from api.services.workflow.tools.custom_tool import ( execute_http_tool, tool_to_function_schema, ) -from api.utils.hold_audio import play_hold_audio_loop from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.frames.frames import ( FunctionCallResultProperties, @@ -77,6 +77,45 @@ class CustomToolManager: self._engine = engine self._organization_id: Optional[int] = None + async def _play_config_message( + self, config: dict, *, append_to_context: bool = False + ) -> bool: + """Play a message from tool config — text or pre-recorded audio. + + Returns True if a message was queued, False otherwise. + """ + message_type = config.get("messageType", "none") + + if message_type == "audio": + recording_pk = config.get("audioRecordingId") + if recording_pk and self._engine._fetch_recording_audio: + result = await self._engine._fetch_recording_audio( + recording_pk=int(recording_pk) + ) + if result: + await play_audio( + result.audio, + sample_rate=self._engine._audio_config.pipeline_sample_rate + if self._engine._audio_config + else 16000, + queue_frame=self._engine._transport_output.queue_frame, + transcript=result.transcript, + ) + return True + else: + logger.warning(f"Failed to fetch recording pk={recording_pk}") + return False + + if message_type == "custom": + custom_message = config.get("customMessage", "") + if custom_message: + await self._engine.task.queue_frame( + TTSSpeakFrame(custom_message, append_to_context=append_to_context) + ) + return True + + return False + async def get_organization_id(self) -> Optional[int]: """Get and cache the organization ID from workflow run.""" if self._organization_id is None: @@ -250,9 +289,30 @@ class CustomToolManager: try: # Queue custom message before executing the API call + # Queue custom message (text or audio) before executing the API call config = tool.definition.get("config", {}) if tool.definition else {} + custom_msg_type = config.get("customMessageType", "text") custom_message = config.get("customMessage", "") - if custom_message: + if custom_msg_type == "audio": + recording_pk = config.get("customMessageRecordingId") + if recording_pk and self._engine._fetch_recording_audio: + logger.info( + f"Playing audio message before HTTP tool: pk={recording_pk}" + ) + self._engine._queued_speech_mute_state = "waiting" + result = await self._engine._fetch_recording_audio( + recording_pk=int(recording_pk) + ) + if result: + await play_audio( + result.audio, + sample_rate=self._engine._audio_config.pipeline_sample_rate + if self._engine._audio_config + else 16000, + queue_frame=self._engine._transport_output.queue_frame, + transcript=result.transcript, + ) + elif custom_message: logger.info( f"Playing custom message before HTTP tool: {custom_message}" ) @@ -299,8 +359,6 @@ class CustomToolManager: try: # Get the end call configuration config = tool.definition.get("config", {}) - message_type = config.get("messageType", "none") - custom_message = config.get("customMessage", "") # Handle end call reason if enabled end_call_reason_enabled = config.get("endCallReason", False) @@ -322,10 +380,8 @@ class CustomToolManager: properties=properties, ) - if message_type == "custom" and custom_message: - # Queue the custom message to be spoken - logger.info(f"Playing custom goodbye message: {custom_message}") - await self._engine.task.queue_frame(TTSSpeakFrame(custom_message)) + played = await self._play_config_message(config) + if played: # End the call after the message (not immediately) await self._engine.end_call_with_reason( EndTaskReason.END_CALL_TOOL_REASON.value, @@ -370,8 +426,6 @@ class CustomToolManager: # Get the transfer call configuration config = tool.definition.get("config", {}) destination = config.get("destination", "") - message_type = config.get("messageType", "none") - custom_message = config.get("customMessage", "") timeout_seconds = config.get( "timeout", 30 ) # Default 30 seconds if not configured @@ -443,10 +497,9 @@ class CustomToolManager: ) return - if message_type == "custom" and custom_message: - logger.info(f"Playing pre-transfer message: {custom_message}") + played = await self._play_config_message(config) + if played: self._engine._queued_speech_mute_state = "waiting" - await self._engine.task.queue_frame(TTSSpeakFrame(custom_message)) # Get organization ID for provider configuration organization_id = await self.get_organization_id() @@ -537,10 +590,10 @@ class CustomToolManager: # Start hold music as background task hold_music_task = asyncio.create_task( - play_hold_audio_loop( - self._engine.task, - hold_music_stop_event, - sample_rate, + play_audio_loop( + stop_event=hold_music_stop_event, + sample_rate=sample_rate, + queue_frame=self._engine._transport_output.queue_frame, ) ) diff --git a/api/services/workflow/workflow.py b/api/services/workflow/workflow.py index fe5a9ec..e85ae68 100644 --- a/api/services/workflow/workflow.py +++ b/api/services/workflow/workflow.py @@ -77,6 +77,8 @@ class Node: self.extraction_variables = data.extraction_variables self.add_global_prompt = data.add_global_prompt self.greeting = data.greeting + self.greeting_type = data.greeting_type + self.greeting_recording_id = data.greeting_recording_id self.detect_voicemail = data.detect_voicemail self.delayed_start = data.delayed_start self.delayed_start_duration = data.delayed_start_duration diff --git a/api/tests/test_recording_router_processor.py b/api/tests/test_recording_router_processor.py index 24b76c2..5ef2057 100644 --- a/api/tests/test_recording_router_processor.py +++ b/api/tests/test_recording_router_processor.py @@ -13,6 +13,7 @@ from typing import Optional import pytest +from api.services.pipecat.recording_audio_cache import RecordingAudio from api.services.pipecat.recording_router_processor import ( RecordingRouterProcessor, ) @@ -37,9 +38,9 @@ from pipecat.tests import run_test FAKE_AUDIO = b"\x00\x01" * 8000 # 1 second of 16-bit mono @ 16 kHz -async def _fake_fetch(recording_id: str) -> Optional[bytes]: +async def _fake_fetch(recording_id: str) -> Optional[RecordingAudio]: """Stub that returns fake PCM audio for any recording_id.""" - return FAKE_AUDIO + return RecordingAudio(audio=FAKE_AUDIO) def _make_processor(**kwargs) -> RecordingRouterProcessor: @@ -189,7 +190,7 @@ class TestMixedMarkerSuppression: async def tracking_fetch(recording_id: str): fetched_ids.append(recording_id) - return FAKE_AUDIO + return RecordingAudio(audio=FAKE_AUDIO) processor = _make_processor(fetch=tracking_fetch) diff --git a/api/tests/test_text_and_audio_playback.py b/api/tests/test_text_and_audio_playback.py new file mode 100644 index 0000000..3a8b1a6 --- /dev/null +++ b/api/tests/test_text_and_audio_playback.py @@ -0,0 +1,603 @@ +"""Tests for text and audio playback in greetings, transitions, and tool messages. + +Verifies that: +- Text mode produces TTSSpeakFrame +- Audio mode produces TTSStartedFrame -> TTSAudioRawFrame -> TTSStoppedFrame +- Covers: start node greetings, edge transition speech, tool config messages +""" + +import asyncio +from typing import Any, Dict, List +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from api.services.pipecat.recording_audio_cache import RecordingAudio +from api.services.workflow.dto import ( + EdgeDataDTO, + NodeDataDTO, + NodeType, + Position, + ReactFlowDTO, + RFEdgeDTO, + RFNodeDTO, +) +from api.services.workflow.pipecat_engine import PipecatEngine +from api.services.workflow.pipecat_engine_custom_tools import CustomToolManager +from api.services.workflow.workflow import WorkflowGraph +from pipecat.frames.frames import ( + Frame, + LLMContextFrame, + TTSAudioRawFrame, + TTSSpeakFrame, + TTSStartedFrame, + TTSStoppedFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMAssistantAggregatorParams, + LLMContextAggregatorPair, +) +from pipecat.tests import MockLLMService, MockTTSService +from pipecat.tests.mock_transport import MockTransport +from pipecat.transports.base_transport import TransportParams + +# ─── Constants ────────────────────────────────────────────────── + +START_PROMPT = "Start Call System Prompt" +END_PROMPT = "End Call System Prompt" +TEXT_GREETING = "Hello, welcome to our service!" +TEXT_TRANSITION = "Thank you for calling, goodbye!" +AUDIO_GREETING_ID = "rec-greeting-001" +AUDIO_TRANSITION_ID = "101" +FAKE_PCM_AUDIO = b"\x00\x01" * 1000 # Fake 16-bit mono PCM data + + +# ─── Fixtures ─────────────────────────────────────────────────── + + +@pytest.fixture +def text_workflow() -> WorkflowGraph: + """Start->End workflow with text greeting and text transition speech.""" + dto = ReactFlowDTO( + nodes=[ + RFNodeDTO( + id="start", + type=NodeType.startNode, + position=Position(x=0, y=0), + data=NodeDataDTO( + name="Start Call", + prompt=START_PROMPT, + is_start=True, + allow_interrupt=False, + add_global_prompt=False, + greeting=TEXT_GREETING, + greeting_type="text", + extraction_enabled=False, + ), + ), + RFNodeDTO( + id="end", + type=NodeType.endNode, + position=Position(x=0, y=200), + data=NodeDataDTO( + name="End Call", + prompt=END_PROMPT, + is_end=True, + allow_interrupt=False, + add_global_prompt=False, + extraction_enabled=False, + ), + ), + ], + edges=[ + RFEdgeDTO( + id="start-end", + source="start", + target="end", + data=EdgeDataDTO( + label="End Call", + condition="When the user says end the call", + transition_speech=TEXT_TRANSITION, + transition_speech_type="text", + ), + ), + ], + ) + return WorkflowGraph(dto) + + +@pytest.fixture +def audio_workflow() -> WorkflowGraph: + """Start->End workflow with audio greeting and audio transition speech.""" + dto = ReactFlowDTO( + nodes=[ + RFNodeDTO( + id="start", + type=NodeType.startNode, + position=Position(x=0, y=0), + data=NodeDataDTO( + name="Start Call", + prompt=START_PROMPT, + is_start=True, + allow_interrupt=False, + add_global_prompt=False, + greeting_type="audio", + greeting_recording_id=AUDIO_GREETING_ID, + extraction_enabled=False, + ), + ), + RFNodeDTO( + id="end", + type=NodeType.endNode, + position=Position(x=0, y=200), + data=NodeDataDTO( + name="End Call", + prompt=END_PROMPT, + is_end=True, + allow_interrupt=False, + add_global_prompt=False, + extraction_enabled=False, + ), + ), + ], + edges=[ + RFEdgeDTO( + id="start-end", + source="start", + target="end", + data=EdgeDataDTO( + label="End Call", + condition="When the user says end the call", + transition_speech_type="audio", + transition_speech_recording_id=AUDIO_TRANSITION_ID, + ), + ), + ], + ) + return WorkflowGraph(dto) + + +# ─── Pipeline Helper ──────────────────────────────────────────── + + +async def run_pipeline_and_capture_frames( + workflow: WorkflowGraph, + functions: List[Dict[str, Any]], + fetch_recording_audio=None, + num_text_steps: int = 1, +) -> tuple[MockLLMService, LLMContext, list[Frame]]: + """Run a pipeline with mock tool calls and capture frames queued via task.queue_frame. + + Returns: + Tuple of (llm, context, list of captured frames). + """ + first_step_chunks = MockLLMService.create_multiple_function_call_chunks(functions) + mock_steps = MockLLMService.create_multi_step_responses( + first_step_chunks, num_text_steps=num_text_steps, step_prefix="Response" + ) + + llm = MockLLMService(mock_steps=mock_steps, chunk_delay=0.001) + tts = MockTTSService(mock_audio_duration_ms=40, frame_delay=0) + mock_transport = MockTransport( + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + audio_in_sample_rate=16000, + audio_out_sample_rate=16000, + ), + ) + + context = LLMContext() + assistant_params = LLMAssistantAggregatorParams(expect_stripped_words=True) + context_aggregator = LLMContextAggregatorPair( + context, assistant_params=assistant_params + ) + + engine = PipecatEngine( + llm=llm, + context=context, + workflow=workflow, + call_context_vars={"customer_name": "Test User"}, + workflow_run_id=1, + ) + + transport_output = mock_transport.output() + + if fetch_recording_audio: + engine.set_fetch_recording_audio(fetch_recording_audio) + engine.set_transport_output(transport_output) + + pipeline = Pipeline([llm, tts, transport_output, context_aggregator.assistant()]) + task = PipelineTask(pipeline, params=PipelineParams(), enable_rtvi=False) + engine.set_task(task) + + # Spy on task.queue_frame and transport_output.queue_frame to capture + # all frames queued by the engine (audio transitions go via transport output) + queued_frames: list[Frame] = [] + original_queue_frame = task.queue_frame + + async def capturing_queue_frame(frame): + queued_frames.append(frame) + await original_queue_frame(frame) + + task.queue_frame = capturing_queue_frame + + if fetch_recording_audio: + original_transport_queue = transport_output.queue_frame + + async def _spy_transport_queue(frame, *args, **kwargs): + queued_frames.append(frame) + await original_transport_queue(frame, *args, **kwargs) + + transport_output.queue_frame = _spy_transport_queue + + with ( + patch( + "api.services.workflow.pipecat_engine.get_organization_id_from_workflow_run", + new_callable=AsyncMock, + return_value=1, + ), + patch( + "api.services.workflow.pipecat_engine.apply_disposition_mapping", + new_callable=AsyncMock, + return_value="completed", + ), + ): + runner = PipelineRunner() + + async def run(): + await runner.run(task) + + async def initialize(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) + + await asyncio.gather(run(), initialize()) + + return llm, context, queued_frames + + +# ─── Tests: Start Greeting ────────────────────────────────────── + + +class TestStartGreeting: + """Unit tests for PipecatEngine.get_start_greeting().""" + + def test_text_greeting_returns_text_tuple(self, text_workflow: WorkflowGraph): + """Text greeting config should return ('text', rendered_text).""" + engine = PipecatEngine( + workflow=text_workflow, + call_context_vars={}, + workflow_run_id=1, + ) + result = engine.get_start_greeting() + assert result == ("text", TEXT_GREETING) + + def test_audio_greeting_returns_audio_tuple(self, audio_workflow: WorkflowGraph): + """Audio greeting config should return ('audio', recording_id).""" + engine = PipecatEngine( + workflow=audio_workflow, + call_context_vars={}, + workflow_run_id=1, + ) + result = engine.get_start_greeting() + assert result == ("audio", AUDIO_GREETING_ID) + + def test_no_greeting_returns_none(self): + """No greeting configured should return None.""" + dto = ReactFlowDTO( + nodes=[ + RFNodeDTO( + id="start", + type=NodeType.startNode, + position=Position(x=0, y=0), + data=NodeDataDTO( + name="Start", + prompt="Prompt", + is_start=True, + add_global_prompt=False, + extraction_enabled=False, + ), + ), + RFNodeDTO( + id="end", + type=NodeType.endNode, + position=Position(x=0, y=200), + data=NodeDataDTO( + name="End", + prompt="End", + is_end=True, + add_global_prompt=False, + extraction_enabled=False, + ), + ), + ], + edges=[ + RFEdgeDTO( + id="e", + source="start", + target="end", + data=EdgeDataDTO(label="End", condition="End"), + ), + ], + ) + engine = PipecatEngine( + workflow=WorkflowGraph(dto), + call_context_vars={}, + workflow_run_id=1, + ) + assert engine.get_start_greeting() is None + + def test_text_greeting_renders_template_variables(self): + """Text greeting with {{variable}} placeholders should be rendered.""" + dto = ReactFlowDTO( + nodes=[ + RFNodeDTO( + id="start", + type=NodeType.startNode, + position=Position(x=0, y=0), + data=NodeDataDTO( + name="Start", + prompt="Prompt", + is_start=True, + add_global_prompt=False, + greeting="Hello {{customer_name}}!", + greeting_type="text", + extraction_enabled=False, + ), + ), + RFNodeDTO( + id="end", + type=NodeType.endNode, + position=Position(x=0, y=200), + data=NodeDataDTO( + name="End", + prompt="End", + is_end=True, + add_global_prompt=False, + extraction_enabled=False, + ), + ), + ], + edges=[ + RFEdgeDTO( + id="e", + source="start", + target="end", + data=EdgeDataDTO(label="End", condition="End"), + ), + ], + ) + engine = PipecatEngine( + workflow=WorkflowGraph(dto), + call_context_vars={"customer_name": "Alice"}, + workflow_run_id=1, + ) + result = engine.get_start_greeting() + assert result == ("text", "Hello Alice!") + + +# ─── Tests: Transition Speech (Pipeline) ──────────────────────── + + +class TestTransitionSpeech: + """Pipeline tests for edge transition speech (text and audio).""" + + @pytest.mark.asyncio + async def test_text_transition_queues_tts_speak_frame( + self, text_workflow: WorkflowGraph + ): + """Text transition speech should queue a TTSSpeakFrame with the message.""" + functions = [ + { + "name": "end_call", + "arguments": {}, + "tool_call_id": "call_transition", + }, + ] + + llm, context, queued_frames = await run_pipeline_and_capture_frames( + workflow=text_workflow, + functions=functions, + num_text_steps=2, + ) + + # Pipeline completes: 1st gen on StartNode, 2nd gen on EndNode + assert llm.get_current_step() == 2 + + # Verify TTSSpeakFrame was queued with the transition speech text + tts_speak_frames = [f for f in queued_frames if isinstance(f, TTSSpeakFrame)] + transition_frames = [f for f in tts_speak_frames if f.text == TEXT_TRANSITION] + assert len(transition_frames) == 1, ( + f"Expected one TTSSpeakFrame with text '{TEXT_TRANSITION}', " + f"got: {[f.text for f in tts_speak_frames]}" + ) + + # No raw audio frames should be queued for text transition + audio_raw = [f for f in queued_frames if isinstance(f, TTSAudioRawFrame)] + assert len(audio_raw) == 0 + + @pytest.mark.asyncio + async def test_audio_transition_queues_audio_frames( + self, audio_workflow: WorkflowGraph + ): + """Audio transition speech should queue TTSStarted + TTSAudioRaw + TTSStopped.""" + functions = [ + { + "name": "end_call", + "arguments": {}, + "tool_call_id": "call_transition", + }, + ] + + mock_fetch = AsyncMock(return_value=RecordingAudio(audio=FAKE_PCM_AUDIO)) + + llm, context, queued_frames = await run_pipeline_and_capture_frames( + workflow=audio_workflow, + functions=functions, + fetch_recording_audio=mock_fetch, + num_text_steps=2, + ) + + # Pipeline completes + assert llm.get_current_step() == 2 + + # Verify fetch was called with the correct recording ID + mock_fetch.assert_called_once_with(recording_pk=int(AUDIO_TRANSITION_ID)) + + # Verify the three-frame audio sequence was queued + started = [f for f in queued_frames if isinstance(f, TTSStartedFrame)] + audio = [f for f in queued_frames if isinstance(f, TTSAudioRawFrame)] + stopped = [f for f in queued_frames if isinstance(f, TTSStoppedFrame)] + + assert len(started) >= 1, ( + f"Expected TTSStartedFrame. " + f"Frame types: {[type(f).__name__ for f in queued_frames]}" + ) + assert len(audio) >= 1, "Expected TTSAudioRawFrame" + assert len(stopped) >= 1, "Expected TTSStoppedFrame" + + # Verify audio content + assert audio[0].audio == FAKE_PCM_AUDIO + assert audio[0].sample_rate == 16000 + assert audio[0].num_channels == 1 + + # Verify context_id consistency across the three frames + ctx_id = started[0].context_id + assert ctx_id is not None + assert audio[0].context_id == ctx_id + assert stopped[0].context_id == ctx_id + + # No TTSSpeakFrame should be queued for audio transition + speak = [f for f in queued_frames if isinstance(f, TTSSpeakFrame)] + assert len(speak) == 0 + + +# ─── Tests: Tool Config Messages ──────────────────────────────── + + +class TestPlayConfigMessage: + """Unit tests for CustomToolManager._play_config_message.""" + + @pytest.fixture + def mock_engine(self): + """Create a mock engine with frame capture on task.queue_frame.""" + engine = Mock() + engine._workflow_run_id = 1 + engine._call_context_vars = {} + engine._fetch_recording_audio = None + engine._audio_config = None + engine.task = Mock() + engine.llm = Mock() + + # Capture frames queued via task.queue_frame + engine._queued_frames = [] + + async def mock_queue_frame(frame): + engine._queued_frames.append(frame) + + engine.task.queue_frame = mock_queue_frame + + # Also capture frames queued via transport_output.queue_frame (audio playback) + engine._transport_output = Mock() + engine._transport_output.queue_frame = mock_queue_frame + return engine + + @pytest.mark.asyncio + async def test_custom_text_queues_tts_speak_frame(self, mock_engine): + """messageType='custom' queues TTSSpeakFrame with the message text.""" + manager = CustomToolManager(mock_engine) + config = {"messageType": "custom", "customMessage": "Ending your call now."} + + result = await manager._play_config_message(config) + + assert result is True + frames = mock_engine._queued_frames + assert len(frames) == 1 + assert isinstance(frames[0], TTSSpeakFrame) + assert frames[0].text == "Ending your call now." + + @pytest.mark.asyncio + async def test_audio_queues_started_raw_stopped_frames(self, mock_engine): + """messageType='audio' queues TTSStarted + TTSAudioRaw + TTSStopped.""" + mock_fetch = AsyncMock(return_value=RecordingAudio(audio=FAKE_PCM_AUDIO)) + mock_engine._fetch_recording_audio = mock_fetch + + manager = CustomToolManager(mock_engine) + config = {"messageType": "audio", "audioRecordingId": "201"} + + result = await manager._play_config_message(config) + + assert result is True + mock_fetch.assert_called_once_with(recording_pk=201) + + frames = mock_engine._queued_frames + assert len(frames) == 3 + assert isinstance(frames[0], TTSStartedFrame) + assert isinstance(frames[1], TTSAudioRawFrame) + assert isinstance(frames[2], TTSStoppedFrame) + + # Verify audio content + assert frames[1].audio == FAKE_PCM_AUDIO + assert frames[1].sample_rate == 16000 + assert frames[1].num_channels == 1 + + # Context IDs should match across all three frames + ctx_id = frames[0].context_id + assert ctx_id is not None + assert frames[1].context_id == ctx_id + assert frames[2].context_id == ctx_id + + @pytest.mark.asyncio + async def test_none_message_type_returns_false(self, mock_engine): + """messageType='none' returns False without queuing frames.""" + manager = CustomToolManager(mock_engine) + result = await manager._play_config_message({"messageType": "none"}) + + assert result is False + assert len(mock_engine._queued_frames) == 0 + + @pytest.mark.asyncio + async def test_audio_without_fetch_callback_returns_false(self, mock_engine): + """Audio without fetch_recording_audio callback returns False.""" + mock_engine._fetch_recording_audio = None + + manager = CustomToolManager(mock_engine) + config = {"messageType": "audio", "audioRecordingId": "301"} + + result = await manager._play_config_message(config) + + assert result is False + assert len(mock_engine._queued_frames) == 0 + + @pytest.mark.asyncio + async def test_audio_with_failed_fetch_returns_false(self, mock_engine): + """Audio with fetch returning None returns False.""" + mock_fetch = AsyncMock(return_value=None) + mock_engine._fetch_recording_audio = mock_fetch + + manager = CustomToolManager(mock_engine) + config = {"messageType": "audio", "audioRecordingId": "301"} + + result = await manager._play_config_message(config) + + assert result is False + mock_fetch.assert_called_once_with(recording_pk=301) + assert len(mock_engine._queued_frames) == 0 + + @pytest.mark.asyncio + async def test_custom_empty_message_returns_false(self, mock_engine): + """messageType='custom' with empty message returns False.""" + manager = CustomToolManager(mock_engine) + config = {"messageType": "custom", "customMessage": ""} + + result = await manager._play_config_message(config) + + assert result is False + assert len(mock_engine._queued_frames) == 0 diff --git a/api/utils/hold_audio.py b/api/utils/hold_audio.py deleted file mode 100644 index c914082..0000000 --- a/api/utils/hold_audio.py +++ /dev/null @@ -1,151 +0,0 @@ -""" -Hold audio utility for loading, caching, and playing hold music files. - -This module provides functionality to load hold music audio files at specific sample rates -with caching to improve performance during multiple calls, and a reusable loop that queues -audio frames until a stop event is set. -""" - -import asyncio -from typing import Dict, Optional, Tuple - -import numpy as np -from loguru import logger - -from pipecat.frames.frames import OutputAudioRawFrame - -try: - import soundfile as sf -except ModuleNotFoundError as e: - logger.error(f"Exception: {e}") - logger.error("In order to use hold audio, you need to `pip install soundfile`.") - raise Exception(f"Missing module: {e}") - - -# Global cache for loaded hold music data -_hold_audio_cache: Dict[Tuple[str, int], np.ndarray] = {} - - -def load_hold_audio(file_path: str, sample_rate: int) -> Optional[bytes]: - """Load hold music audio file at the specified sample rate with caching. - - Args: - file_path: Path to the hold music audio file - sample_rate: Target sample rate (8000 or 16000 Hz supported) - - Returns: - Audio data as bytes (PCM16) or None if loading failed - """ - cache_key = (file_path, sample_rate) - - # Check cache first - if cache_key in _hold_audio_cache: - logger.debug(f"Using cached hold audio for {file_path} at {sample_rate}Hz") - audio_data = _hold_audio_cache[cache_key] - return audio_data.tobytes() - - try: - logger.info(f"Loading hold audio from {file_path} at {sample_rate}Hz") - - # Load audio file - sound, file_sample_rate = sf.read(file_path, dtype="int16") - logger.info( - f"Audio file loaded - file sample_rate: {file_sample_rate}, target: {sample_rate}" - ) - - # Ensure mono audio (take first channel if stereo) - if len(sound.shape) > 1: - sound = sound[:, 0] - - # Resample if needed - if file_sample_rate != sample_rate: - logger.warning( - f"Hold music file has sample rate {file_sample_rate}, expected {sample_rate}" - ) - # For now, we'll use the audio as-is and let the transport handle resampling - # In a production system, you might want to use librosa or scipy for proper resampling - - # Convert to int16 and cache - audio_data = sound.astype(np.int16) - _hold_audio_cache[cache_key] = audio_data - - logger.info( - f"Hold audio loaded successfully: {len(audio_data)} samples at {sample_rate}Hz" - ) - return audio_data.tobytes() - - except Exception as e: - logger.error(f"Failed to load hold audio file {file_path}: {e}") - return None - - -def clear_hold_audio_cache(): - """Clear the hold audio cache to free memory.""" - global _hold_audio_cache - _hold_audio_cache.clear() - logger.info("Hold audio cache cleared") - - -def get_cache_info() -> Dict[str, int]: - """Get information about the current cache state. - - Returns: - Dictionary with cache statistics - """ - return { - "cached_files": len(_hold_audio_cache), - "total_cache_size": sum(len(data) for data in _hold_audio_cache.values()), - } - - -async def play_hold_audio_loop( - task, - stop_event: asyncio.Event, - sample_rate: int = 16000, - hold_music_file: Optional[str] = None, -) -> None: - """Play hold/ring-back audio in a loop until *stop_event* is set. - - This is a shared helper used by call-transfer hold music and the - pre-call data fetch ringer. The caller is responsible for creating - the ``asyncio.Event`` and setting it when playback should stop. - - Args: - task: A ``PipelineTask`` (or anything with ``queue_frame``). - stop_event: Set this event to terminate the loop. - sample_rate: Target sample rate for audio playback. - hold_music_file: Path to a WAV file. When *None* the default - ``transfer_hold_ring_{sample_rate}.wav`` asset is used. - """ - if hold_music_file is None: - from api.constants import APP_ROOT_DIR - - hold_music_file = str( - APP_ROOT_DIR / "assets" / f"transfer_hold_ring_{sample_rate}.wav" - ) - - hold_audio_data = load_hold_audio(hold_music_file, sample_rate) - if not hold_audio_data: - logger.warning(f"Hold audio loop: failed to load {hold_music_file}, skipping") - return - - num_samples = len(hold_audio_data) // 2 # 16-bit PCM = 2 bytes per sample - duration = num_samples / sample_rate - - logger.debug(f"Hold audio loop: playing at {sample_rate}Hz") - try: - while not stop_event.is_set(): - frame = OutputAudioRawFrame( - audio=hold_audio_data, - sample_rate=sample_rate, - num_channels=1, - ) - await task.queue_frame(frame) - try: - await asyncio.wait_for(stop_event.wait(), timeout=duration + 1.5) - break - except asyncio.TimeoutError: - pass - except Exception as e: - logger.error(f"Hold audio loop: error: {e}") - logger.debug("Hold audio loop: stopped") diff --git a/docs/voice-agent/pre-recorded-audio.mdx b/docs/voice-agent/pre-recorded-audio.mdx index aca6dec..d0fe0aa 100644 --- a/docs/voice-agent/pre-recorded-audio.mdx +++ b/docs/voice-agent/pre-recorded-audio.mdx @@ -6,15 +6,6 @@ tag: "NEW" Custom recordings allow you to build **hybrid voice agents** that use your own pre-recorded audio for key parts of the conversation, while falling back to LLM-generated speech (via a cloned voice) for dynamic responses. This gives you the best of both worlds — the emotional depth of real human speech and the flexibility of AI-generated dialogue. - - - ## Why use custom recordings? - **Reduced TTS cost** — Pre-recorded audio is played directly, so you are not charged for TTS synthesis on those segments. @@ -50,23 +41,20 @@ You can use any TTS provider that supports voice cloning. The steps will vary by ## Step 3: Upload recordings -Navigate to your agent in the workflow builder and open the **Recordings** panel. You can either upload pre-recorded audio files or record directly in the browser. +Navigate to the **Recordings** page in the Dograh dashboard. Recordings are shared across all agents in your organization. You can either upload pre-recorded audio files or record directly in the browser. For each recording: -1. Click **Record** (or upload a file). -2. Speak the exact phrase you want the agent to use. -3. Give the recording a descriptive name (e.g., `greeting`, `invitation`, `venue`). -4. Verify the transcription is correct — edit it if needed. -5. Click **Upload**. +1. Click **Upload Recording**. +2. Choose an audio file or click **Record** to record in the browser. +3. Verify the transcription is correct — edit it if needed. +4. Click **Upload**. - -Recordings are scoped to a specific **provider and Voice ID**. If you change either, you will need to re-upload your recordings to ensure consistency between the recorded audio and the cloned voice used for dynamic responses. - +You can rename a recording's ID at any time by clicking the edit icon next to it in the recordings list. ## Step 4: Build the workflow -Open your agent's workflow and write the conversation flow in natural language. To insert a recording, type **`@`** in the prompt editor — this will show a list of all available recordings scoped to your current Voice ID. +Open your agent's workflow and write the conversation flow in natural language. To insert a recording, type **`@`** in the prompt editor — this will show a list of all available recordings in your organization. For any user question that falls outside your recordings, the agent automatically generates a dynamic response using the LLM, which is then synthesized using your cloned voice via TTS. diff --git a/pipecat b/pipecat index 002c095..5a2e4c8 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 002c095b2f15a11d4a6ffd85b86592821aa5cd62 +Subproject commit 5a2e4c89118264bfdfe91db059b01e226609f060 diff --git a/ui/src/app/campaigns/CsvUploadSelector.tsx b/ui/src/app/campaigns/CsvUploadSelector.tsx index d829eb1..76fcafe 100644 --- a/ui/src/app/campaigns/CsvUploadSelector.tsx +++ b/ui/src/app/campaigns/CsvUploadSelector.tsx @@ -3,25 +3,19 @@ import { useRef, useState } from 'react'; import { toast } from 'sonner'; +import { getPresignedUploadUrlApiV1S3PresignedUploadUrlPost } from '@/client/sdk.gen'; import { Button } from '@/components/ui/button'; import { Label } from '@/components/ui/label'; import logger from '@/lib/logger'; interface CsvUploadSelectorProps { - accessToken: string; onFileUploaded: (fileKey: string, fileName: string) => void; selectedFileName?: string; } -interface PresignedUploadUrlResponse { - upload_url: string; - file_key: string; - expires_in: number; -} - const MAX_FILE_SIZE = 10 * 1024 * 1024; // 10MB -export default function CsvUploadSelector({ accessToken, onFileUploaded, selectedFileName }: CsvUploadSelectorProps) { +export default function CsvUploadSelector({ onFileUploaded, selectedFileName }: CsvUploadSelectorProps) { const [uploading, setUploading] = useState(false); const [uploadProgress, setUploadProgress] = useState(0); const fileInputRef = useRef(null); @@ -48,25 +42,18 @@ export default function CsvUploadSelector({ accessToken, onFileUploaded, selecte try { // Step 1: Request presigned upload URL logger.info('Requesting presigned upload URL for:', file.name); - const presignedResponse = await fetch('/api/v1/s3/presigned-upload-url', { - method: 'POST', - headers: { - 'Authorization': `Bearer ${accessToken}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ + const { data: presignedData, error } = await getPresignedUploadUrlApiV1S3PresignedUploadUrlPost({ + body: { file_name: file.name, file_size: file.size, content_type: 'text/csv', - }), + }, }); - if (!presignedResponse.ok) { - const error = await presignedResponse.json(); - throw new Error(error.detail || 'Failed to get upload URL'); + if (error || !presignedData) { + throw new Error('Failed to get upload URL'); } - const presignedData: PresignedUploadUrlResponse = await presignedResponse.json(); logger.info('Received presigned URL, uploading file...'); // Step 2: Upload file directly to S3/MinIO diff --git a/ui/src/app/campaigns/new/page.tsx b/ui/src/app/campaigns/new/page.tsx index 7318a25..248d5f7 100644 --- a/ui/src/app/campaigns/new/page.tsx +++ b/ui/src/app/campaigns/new/page.tsx @@ -415,7 +415,6 @@ export default function NewCampaignPage() { /> ) : ( diff --git a/ui/src/app/files/page.tsx b/ui/src/app/files/page.tsx index 54cdbca..f84b0ec 100644 --- a/ui/src/app/files/page.tsx +++ b/ui/src/app/files/page.tsx @@ -1,11 +1,18 @@ "use client"; -import { ExternalLink } from "lucide-react"; +import { ExternalLink, Upload } from "lucide-react"; import { useEffect, useState } from "react"; +import { Button } from "@/components/ui/button"; import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; import { Skeleton } from "@/components/ui/skeleton"; -import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; import { useAuth } from "@/lib/auth"; import DocumentList from "./DocumentList"; @@ -14,6 +21,7 @@ import DocumentUpload from "./DocumentUpload"; export default function FilesPage() { const { user, redirectToLogin, loading } = useAuth(); const [refreshKey, setRefreshKey] = useState(0); + const [isUploadOpen, setIsUploadOpen] = useState(false); // Redirect if not authenticated useEffect(() => { @@ -23,8 +31,8 @@ export default function FilesPage() { }, [loading, user, redirectToLogin]); const handleUploadSuccess = () => { - // Trigger refresh of document list setRefreshKey(prev => prev + 1); + setIsUploadOpen(false); }; if (loading || !user) { @@ -50,44 +58,37 @@ export default function FilesPage() {

- - - All Files - Upload New - - - - - + + +
+
Your Documents - View and manage your uploaded documents + Documents shared across all agents in your organization - - - - - - +
+ +
+
+ + + +
- - - - Upload Document - - Upload a PDF or document file to add to your knowledge base - - - - - - - -
+ + + + Upload Document + + Upload a PDF or document file to add to your knowledge base + + + + + ); } diff --git a/ui/src/app/recordings/RecordingsList.tsx b/ui/src/app/recordings/RecordingsList.tsx new file mode 100644 index 0000000..dadd3e2 --- /dev/null +++ b/ui/src/app/recordings/RecordingsList.tsx @@ -0,0 +1,323 @@ +"use client"; + +import { AudioLines, Check, Pause, Pencil, Play, RefreshCw, Search, Trash2, X } from "lucide-react"; +import { useCallback, useEffect, useState } from "react"; +import { toast } from "sonner"; + +import { + deleteRecordingApiV1WorkflowRecordingsRecordingIdDelete, + listRecordingsApiV1WorkflowRecordingsGet, + updateRecordingApiV1WorkflowRecordingsIdPatch, +} from "@/client/sdk.gen"; +import type { RecordingResponseSchema } from "@/client/types.gen"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Skeleton } from "@/components/ui/skeleton"; +import { useAudioPlayback } from "@/hooks/useAudioPlayback"; +import logger from "@/lib/logger"; + +export default function RecordingsList({ refreshKey }: { refreshKey?: number }) { + const [recordings, setRecordings] = useState([]); + const [isLoading, setIsLoading] = useState(true); + const [searchQuery, setSearchQuery] = useState(""); + const [error, setError] = useState(null); + + // Inline edit state + const [editingId, setEditingId] = useState(null); + const [editValue, setEditValue] = useState(""); + const [editError, setEditError] = useState(null); + + const { playingId, toggle: togglePlayback, stop: stopPlayback } = useAudioPlayback(); + + const fetchRecordings = useCallback(async () => { + try { + setIsLoading(true); + setError(null); + + const response = await listRecordingsApiV1WorkflowRecordingsGet({ + query: {}, + }); + + if (response.error || !response.data) { + throw new Error("Failed to fetch recordings"); + } + + setRecordings(response.data.recordings); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to fetch recordings"); + logger.error("Error fetching recordings:", err); + } finally { + setIsLoading(false); + } + }, []); + + useEffect(() => { + fetchRecordings(); + }, [fetchRecordings, refreshKey]); + + const handleDelete = async (recordingId: string) => { + if (!confirm("Are you sure you want to delete this recording?")) return; + + try { + const response = await deleteRecordingApiV1WorkflowRecordingsRecordingIdDelete({ + path: { recording_id: recordingId }, + }); + + if (response.error) { + throw new Error("Failed to delete recording"); + } + + toast.success("Recording deleted"); + fetchRecordings(); + } catch (err) { + toast.error(err instanceof Error ? err.message : "Failed to delete recording"); + logger.error("Error deleting recording:", err); + } + }; + + const handlePlay = async (rec: RecordingResponseSchema) => { + try { + await togglePlayback(rec.recording_id, rec.storage_key, rec.storage_backend); + } catch { + toast.error("Failed to play recording"); + } + }; + + const startEditing = (rec: RecordingResponseSchema) => { + setEditingId(rec.recording_id); + setEditValue(rec.recording_id); + setEditError(null); + }; + + const cancelEditing = () => { + setEditingId(null); + setEditValue(""); + setEditError(null); + }; + + const saveRecordingId = async (rec: RecordingResponseSchema) => { + const newId = editValue.trim(); + if (!newId) { + setEditError("ID cannot be empty"); + return; + } + if (!/^[a-zA-Z0-9_-]+$/.test(newId)) { + setEditError("Only letters, numbers, hyphens, and underscores"); + return; + } + if (newId === rec.recording_id) { + cancelEditing(); + return; + } + + setEditError(null); + try { + const response = await updateRecordingApiV1WorkflowRecordingsIdPatch({ + path: { id: rec.id }, + body: { recording_id: newId }, + }); + + if (response.error) { + const errData = response.error as { detail?: string }; + throw new Error(errData?.detail || "Failed to update recording ID"); + } + + toast.success(`Recording ID updated to "${newId}". All workflow references have been updated.`); + cancelEditing(); + fetchRecordings(); + } catch (err) { + setEditError(err instanceof Error ? err.message : "Failed to update recording ID"); + } + }; + + const formatDate = (dateString: string): string => { + const date = new Date(dateString); + return date.toLocaleDateString() + " " + date.toLocaleTimeString(); + }; + + const filteredRecordings = recordings.filter((rec) => { + if (!searchQuery) return true; + const q = searchQuery.toLowerCase(); + const filename = (rec.metadata?.original_filename as string) || ""; + return ( + filename.toLowerCase().includes(q) || + rec.transcript.toLowerCase().includes(q) || + rec.recording_id.toLowerCase().includes(q) + ); + }); + + if (isLoading && recordings.length === 0) { + return ( +
+ {[1, 2, 3].map((i) => ( +
+
+ + +
+ +
+ ))} +
+ ); + } + + if (error) { + return ( +
+ {error} +
+ ); + } + + return ( +
+ {/* Search and Refresh */} +
+
+ + setSearchQuery(e.target.value)} + className="pl-10" + /> +
+ +
+ + {/* Results count */} +
+ {filteredRecordings.length} recording{filteredRecordings.length !== 1 ? "s" : ""} + {searchQuery && ` matching "${searchQuery}"`} +
+ + {/* Recordings List */} + {filteredRecordings.length === 0 ? ( +
+ +

+ {searchQuery + ? "No recordings match your search" + : "No recordings yet"} +

+
+ ) : ( +
+ {filteredRecordings.map((rec) => { + const filename = (rec.metadata?.original_filename as string) || ""; + const isEditing = editingId === rec.recording_id; + + return ( +
+
+
+ +
+
+ {/* Recording ID (editable) */} +
+ {isEditing ? ( +
+ { setEditValue(e.target.value); setEditError(null); }} + onKeyDown={(e) => { + if (e.key === "Enter") saveRecordingId(rec); + if (e.key === "Escape") cancelEditing(); + }} + className={`h-7 text-sm font-mono w-48 ${editError ? "border-destructive" : ""}`} + maxLength={64} + autoFocus + /> + + + {editError && ( + {editError} + )} +
+ ) : ( +
+ + {rec.recording_id} + + +
+ )} +
+ {/* Filename */} + {filename && ( +

+ {filename} +

+ )} + {/* Transcript */} +

+ {rec.transcript} +

+
+ {formatDate(rec.created_at)} +
+
+
+
+ + +
+
+ ); + })} +
+ )} +
+ ); +} diff --git a/ui/src/app/recordings/RecordingsUploadDialog.tsx b/ui/src/app/recordings/RecordingsUploadDialog.tsx new file mode 100644 index 0000000..d991b7b --- /dev/null +++ b/ui/src/app/recordings/RecordingsUploadDialog.tsx @@ -0,0 +1,465 @@ +"use client"; + +import { Loader2, Mic, Square, Upload, X } from "lucide-react"; +import { useCallback, useEffect, useRef, useState } from "react"; + +import { + createRecordingsApiV1WorkflowRecordingsPost, + getUploadUrlsApiV1WorkflowRecordingsUploadUrlPost, + transcribeAudioApiV1WorkflowRecordingsTranscribePost, +} from "@/client"; +import type { RecordingUploadResponseSchema } from "@/client/types.gen"; +import { Button } from "@/components/ui/button"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Textarea } from "@/components/ui/textarea"; +import { LANGUAGE_DISPLAY_NAMES } from "@/constants/languages"; +interface RecordingsUploadDialogProps { + open: boolean; + onOpenChange: (open: boolean) => void; + onUploadComplete?: () => void; +} + +const MAX_FILE_SIZE = 5 * 1024 * 1024; // 5MB + +interface PendingFile { + id: string; + file: File; + transcript: string; + isTranscribing: boolean; + error?: string; +} + +let pendingFileCounter = 0; + +export const RecordingsUploadDialog = ({ + open, + onOpenChange, + onUploadComplete, +}: RecordingsUploadDialogProps) => { + const [uploading, setUploading] = useState(false); + const [pendingFiles, setPendingFiles] = useState([]); + const [error, setError] = useState(null); + const [language, setLanguage] = useState("multi"); + const [recordingStep, setRecordingStep] = useState<"idle" | "naming" | "recording">("idle"); + const [recordingFilename, setRecordingFilename] = useState(""); + const [recordingDuration, setRecordingDuration] = useState(0); + const mediaRecorderRef = useRef(null); + const audioChunksRef = useRef([]); + const recordingTimerRef = useRef | null>(null); + const fileInputRef = useRef(null); + const languageRef = useRef(language); + languageRef.current = language; + + const stopRecordingTimer = useCallback(() => { + if (recordingTimerRef.current) { + clearInterval(recordingTimerRef.current); + recordingTimerRef.current = null; + } + }, []); + + const stopRecording = useCallback(() => { + if (mediaRecorderRef.current && mediaRecorderRef.current.state !== "inactive") { + mediaRecorderRef.current.stop(); + } + }, []); + + const resetRecordingState = useCallback(() => { + setRecordingStep("idle"); + setRecordingFilename(""); + setRecordingDuration(0); + }, []); + + useEffect(() => { + if (open) { + setError(null); + setPendingFiles([]); + setLanguage("multi"); + resetRecordingState(); + } + }, [open, resetRecordingState]); + + useEffect(() => { + if (!open) { + stopRecording(); + stopRecordingTimer(); + } + }, [open, stopRecording, stopRecordingTimer]); + + const transcribeFile = async (pendingId: string, file: File) => { + setPendingFiles((prev) => + prev.map((p) => (p.id === pendingId ? { ...p, isTranscribing: true } : p)) + ); + try { + const currentLang = languageRef.current; + const result = await transcribeAudioApiV1WorkflowRecordingsTranscribePost({ + body: { file, language: currentLang }, + }); + const data = result.data as Record | undefined; + if (data?.transcript) { + setPendingFiles((prev) => + prev.map((p) => + p.id === pendingId ? { ...p, transcript: data.transcript as string, isTranscribing: false } : p + ) + ); + } else { + setPendingFiles((prev) => + prev.map((p) => (p.id === pendingId ? { ...p, isTranscribing: false } : p)) + ); + } + } catch { + setPendingFiles((prev) => + prev.map((p) => + p.id === pendingId + ? { ...p, isTranscribing: false, error: "Auto-transcription failed" } + : p + ) + ); + } + }; + + const addPendingFiles = (files: File[]) => { + const valid: PendingFile[] = []; + for (const file of files) { + if (file.size > MAX_FILE_SIZE) { + setError(`${file.name} (${(file.size / (1024 * 1024)).toFixed(1)}MB) exceeds 5MB limit — skipped.`); + continue; + } + const id = `pending-${++pendingFileCounter}`; + valid.push({ id, file, transcript: "", isTranscribing: false }); + } + if (valid.length === 0) return; + setPendingFiles((prev) => [...prev, ...valid]); + setError(null); + for (const pf of valid) { + transcribeFile(pf.id, pf.file); + } + }; + + const removePendingFile = (pendingId: string) => { + setPendingFiles((prev) => prev.filter((p) => p.id !== pendingId)); + }; + + const updateTranscript = (pendingId: string, transcript: string) => { + setPendingFiles((prev) => + prev.map((p) => (p.id === pendingId ? { ...p, transcript } : p)) + ); + }; + + const startRecording = async () => { + try { + const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); + const mediaRecorder = new MediaRecorder(stream); + mediaRecorderRef.current = mediaRecorder; + audioChunksRef.current = []; + + mediaRecorder.ondataavailable = (e) => { + if (e.data.size > 0) audioChunksRef.current.push(e.data); + }; + + const filename = recordingFilename.trim() || "recording"; + mediaRecorder.onstop = () => { + stream.getTracks().forEach((t) => t.stop()); + stopRecordingTimer(); + + const blob = new Blob(audioChunksRef.current, { type: mediaRecorder.mimeType }); + if (blob.size > MAX_FILE_SIZE) { + setError(`Recording (${(blob.size / (1024 * 1024)).toFixed(1)}MB) exceeds the maximum allowed size of 5MB.`); + resetRecordingState(); + return; + } + const ext = mediaRecorder.mimeType.includes("webm") ? "webm" : "mp4"; + const file = new File([blob], `${filename}.${ext}`, { type: mediaRecorder.mimeType }); + resetRecordingState(); + addPendingFiles([file]); + }; + + mediaRecorder.start(); + setRecordingStep("recording"); + setRecordingDuration(0); + setError(null); + recordingTimerRef.current = setInterval(() => { + setRecordingDuration((d) => d + 1); + }, 1000); + } catch { + setError("Microphone access denied. Please allow microphone permissions."); + resetRecordingState(); + } + }; + + const handleFileSelect = (fileList: FileList | null) => { + if (!fileList || fileList.length === 0) return; + addPendingFiles(Array.from(fileList)); + if (fileInputRef.current) fileInputRef.current.value = ""; + }; + + const handleUpload = async () => { + const ready = pendingFiles.filter((p) => p.transcript.trim() && !p.isTranscribing); + if (ready.length === 0) return; + + setUploading(true); + setError(null); + + try { + const uploadUrlResponse = await getUploadUrlsApiV1WorkflowRecordingsUploadUrlPost({ + body: { + files: ready.map((p) => ({ + filename: p.file.name, + mime_type: p.file.type || "audio/wav", + file_size: p.file.size, + })), + }, + }); + + if (!uploadUrlResponse.data?.items) { + throw new Error("Failed to get upload URLs"); + } + + const items = uploadUrlResponse.data.items; + + await Promise.all( + items.map(async (item: RecordingUploadResponseSchema, idx: number) => { + const file = ready[idx].file; + const uploadResponse = await fetch(item.upload_url, { + method: "PUT", + body: file, + headers: { "Content-Type": file.type || "audio/wav" }, + }); + if (!uploadResponse.ok) { + throw new Error(`File upload failed for ${file.name}`); + } + }) + ); + + await createRecordingsApiV1WorkflowRecordingsPost({ + body: { + recordings: items.map((item: RecordingUploadResponseSchema, idx: number) => ({ + recording_id: item.recording_id, + transcript: ready[idx].transcript.trim(), + storage_key: item.storage_key, + metadata: { + original_filename: ready[idx].file.name, + file_size_bytes: ready[idx].file.size, + mime_type: ready[idx].file.type, + language, + }, + })), + }, + }); + + setPendingFiles([]); + setLanguage("multi"); + resetRecordingState(); + if (fileInputRef.current) fileInputRef.current.value = ""; + onUploadComplete?.(); + onOpenChange(false); + } catch (err) { + setError(err instanceof Error ? err.message : "Failed to upload recordings"); + } finally { + setUploading(false); + } + }; + + const isRecording = recordingStep === "recording"; + const anyTranscribing = pendingFiles.some((p) => p.isTranscribing); + const readyCount = pendingFiles.filter((p) => p.transcript.trim() && !p.isTranscribing).length; + const isBusy = uploading || isRecording || anyTranscribing; + + return ( + + + + Upload Recordings + + Upload or record audio files. Use{" "} + @ in + prompt fields to insert them into your agents. + + + + {error && ( +
+ {error} +
+ )} + + {/* Upload Section */} +
+ {/* Audio source: file picker or record */} +
+ +
+ handleFileSelect(e.target.files)} + className="hidden" + /> + + {recordingStep === "idle" && ( + + )} +
+
+ + {/* Recording: filename + start/stop */} + {(recordingStep === "naming" || isRecording) && ( +
+ {recordingStep === "naming" && ( + <> +
+ + setRecordingFilename(e.target.value)} + autoFocus + /> +
+
+ + +
+ + )} + {isRecording && ( +
+ + + + + + {Math.floor(recordingDuration / 60)}:{(recordingDuration % 60).toString().padStart(2, "0")} + + {recordingFilename} + +
+ )} +
+ )} + + {/* Pending files list */} + {pendingFiles.length > 0 && ( +
+ + {pendingFiles.map((pf) => ( +
+
+ + {pf.file.name} ({(pf.file.size / (1024 * 1024)).toFixed(1)}MB) + + {pf.isTranscribing && ( + + )} + +
+ {pf.error && ( +

{pf.error}

+ )} +