From 29c5be298cf6f47891a7987226e58e2d4fdd0598 Mon Sep 17 00:00:00 2001 From: Abhishek Date: Wed, 24 Jun 2026 22:07:35 +0530 Subject: [PATCH] chore: refactor status processor (#465) * chore: refactor status processor * fix: fix billing duration when billsec is None for Cloudonix --- api/enums.py | 28 +- api/services/pipecat/realtime/gemini_live.py | 7 +- .../telephony/providers/ari/provider.py | 20 +- .../telephony/providers/cloudonix/provider.py | 47 ++- .../telephony/providers/cloudonix/routes.py | 12 +- .../telephony/providers/plivo/provider.py | 24 +- .../telephony/providers/telnyx/provider.py | 22 +- .../telephony/providers/twilio/provider.py | 5 +- .../telephony/providers/vobiz/provider.py | 5 +- .../telephony/providers/vonage/provider.py | 18 +- api/services/telephony/status_processor.py | 237 ++++++------ api/services/workflow/disposition_mapper.py | 46 --- api/services/workflow/pipecat_engine.py | 36 +- api/tasks/run_integrations.py | 9 + .../integrations/_run_pipeline_helpers.py | 8 - api/tests/telephony/cloudonix/test_routes.py | 53 ++- api/tests/telephony/test_status_processor.py | 98 +++++ .../test_pipecat_engine_context_update.py | 21 +- api/tests/test_pipecat_engine_end_call.py | 357 ++++++++---------- ...cat_engine_node_switch_with_user_speech.py | 25 +- api/tests/test_pipecat_engine_tool_calls.py | 25 +- .../test_pipecat_engine_transition_mute.py | 70 ++-- ...test_pipecat_engine_variable_extraction.py | 31 +- api/tests/test_run_integrations_webhook.py | 88 +++++ api/tests/test_text_and_audio_playback.py | 5 - ...t_tts_endframe_with_audio_write_failure.py | 176 ++++----- api/tests/test_user_idle_handler.py | 21 +- .../test_user_muting_during_bot_speech.py | 221 +++++------ .../src/dograh_sdk/_generated_models.py | 4 +- 29 files changed, 910 insertions(+), 809 deletions(-) delete mode 100644 api/services/workflow/disposition_mapper.py create mode 100644 api/tests/telephony/test_status_processor.py create mode 100644 api/tests/test_run_integrations_webhook.py diff --git a/api/enums.py b/api/enums.py index 2b8ac637..3d43e1b8 100644 --- a/api/enums.py +++ b/api/enums.py @@ -17,6 +17,32 @@ class CallType(Enum): OUTBOUND = "outbound" +class TelephonyCallStatus(str, Enum): + INITIATED = "initiated" + RINGING = "ringing" + IN_PROGRESS = "in-progress" + ANSWERED = "answered" + COMPLETED = "completed" + FAILED = "failed" + BUSY = "busy" + NO_ANSWER = "no-answer" + CANCELED = "canceled" + ERROR = "error" + + @classmethod + def from_raw(cls, value: object) -> "TelephonyCallStatus | None": + if isinstance(value, cls): + return value + + if value in (None, ""): + return None + + try: + return cls(str(value).lower()) + except ValueError: + return None + + class WorkflowRunMode(Enum): ARI = "ari" PLIVO = "plivo" @@ -77,8 +103,6 @@ class WorkflowRunStatus(Enum): class OrganizationConfigurationKey(Enum): - DISPOSITION_CODE_MAPPING = "DISPOSITION_CODE_MAPPING" - DISPOSITION_MESSAGE_TEMPLATE = "DISPOSITION_MESSAGE_TEMPLATE" CONCURRENT_CALL_LIMIT = "CONCURRENT_CALL_LIMIT" TELEPHONY_CONFIGURATION = ( "TELEPHONY_CONFIGURATION" # Stores all providers + active one diff --git a/api/services/pipecat/realtime/gemini_live.py b/api/services/pipecat/realtime/gemini_live.py index 8203f9e6..0df8797d 100644 --- a/api/services/pipecat/realtime/gemini_live.py +++ b/api/services/pipecat/realtime/gemini_live.py @@ -22,6 +22,9 @@ from typing import Any from loguru import logger +from api.services.pipecat.gemini_json_schema_adapter import ( + DograhGeminiJSONSchemaAdapter, +) from pipecat.frames.frames import ( BotStoppedSpeakingFrame, Frame, @@ -35,10 +38,6 @@ from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService from pipecat.services.llm_service import FunctionCallFromLLM from pipecat.utils.tracing.service_decorators import traced_gemini_live -from api.services.pipecat.gemini_json_schema_adapter import ( - DograhGeminiJSONSchemaAdapter, -) - class DograhGeminiLiveLLMService(GeminiLiveLLMService): """Gemini Live with Dograh engine integration quirks. See module docstring.""" diff --git a/api/services/telephony/providers/ari/provider.py b/api/services/telephony/providers/ari/provider.py index 7c750b11..8c811f9a 100644 --- a/api/services/telephony/providers/ari/provider.py +++ b/api/services/telephony/providers/ari/provider.py @@ -14,7 +14,7 @@ from fastapi import HTTPException from loguru import logger from api.db import db_client -from api.enums import WorkflowRunMode +from api.enums import TelephonyCallStatus, WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, @@ -205,12 +205,12 @@ class ARIProvider(TelephonyProvider): """ # Map ARI channel states to common status format state_map = { - "Up": "answered", - "Down": "completed", - "Ringing": "ringing", - "Ring": "ringing", - "Busy": "busy", - "Unavailable": "failed", + "Up": TelephonyCallStatus.ANSWERED, + "Down": TelephonyCallStatus.COMPLETED, + "Ringing": TelephonyCallStatus.RINGING, + "Ring": TelephonyCallStatus.RINGING, + "Busy": TelephonyCallStatus.BUSY, + "Unavailable": TelephonyCallStatus.FAILED, } channel_state = data.get("channel", {}).get("state", "") @@ -218,11 +218,11 @@ class ARIProvider(TelephonyProvider): # Determine status from event type if event_type == "StasisStart": - status = "answered" + status = TelephonyCallStatus.ANSWERED elif event_type == "StasisEnd": - status = "completed" + status = TelephonyCallStatus.COMPLETED elif event_type == "ChannelDestroyed": - status = "completed" + status = TelephonyCallStatus.COMPLETED else: status = state_map.get(channel_state, channel_state.lower()) diff --git a/api/services/telephony/providers/cloudonix/provider.py b/api/services/telephony/providers/cloudonix/provider.py index 5ea9c005..31b84fb6 100644 --- a/api/services/telephony/providers/cloudonix/provider.py +++ b/api/services/telephony/providers/cloudonix/provider.py @@ -11,7 +11,7 @@ from fastapi import HTTPException from loguru import logger from api.db import db_client -from api.enums import WorkflowRunMode +from api.enums import TelephonyCallStatus, WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, @@ -348,15 +348,15 @@ class CloudonixProvider(TelephonyProvider): # Map Cloudonix status values to common format # These mappings may need adjustment based on actual Cloudonix callback format status_map = { - "initiated": "initiated", - "ringing": "ringing", - "answered": "answered", - "completed": "completed", - "failed": "failed", - "busy": "busy", - "no-answer": "no-answer", - "canceled": "canceled", - "error": "error", + "initiated": TelephonyCallStatus.INITIATED, + "ringing": TelephonyCallStatus.RINGING, + "answered": TelephonyCallStatus.ANSWERED, + "completed": TelephonyCallStatus.COMPLETED, + "failed": TelephonyCallStatus.FAILED, + "busy": TelephonyCallStatus.BUSY, + "no-answer": TelephonyCallStatus.NO_ANSWER, + "canceled": TelephonyCallStatus.CANCELED, + "error": TelephonyCallStatus.ERROR, } call_status = data.get("status", "") @@ -374,6 +374,33 @@ class CloudonixProvider(TelephonyProvider): "extra": data, # Include all original data } + @staticmethod + def parse_cdr_status_callback(data: Dict[str, Any]) -> Dict[str, Any]: + """Parse Cloudonix CDR data into generic status callback format.""" + disposition_map = { + "ANSWER": TelephonyCallStatus.COMPLETED, + "BUSY": TelephonyCallStatus.BUSY, + "CANCEL": TelephonyCallStatus.CANCELED, + "FAILED": TelephonyCallStatus.FAILED, + "CONGESTION": TelephonyCallStatus.FAILED, + "NOANSWER": TelephonyCallStatus.NO_ANSWER, + } + + disposition = data.get("disposition") or "" + session = data.get("session") + billsec = data.get("billsec") + + return { + "call_id": session.get("token") if isinstance(session, dict) else "", + "status": disposition_map.get(disposition.upper(), disposition.lower()), + "from_number": data.get("from"), + "to_number": data.get("to"), + "duration": str( + billsec if billsec is not None else (data.get("duration") or 0) + ), + "extra": data, + } + async def get_webhook_response( self, workflow_id: int, user_id: int, workflow_run_id: int ) -> str: diff --git a/api/services/telephony/providers/cloudonix/routes.py b/api/services/telephony/providers/cloudonix/routes.py index facf4bdb..80e7037a 100644 --- a/api/services/telephony/providers/cloudonix/routes.py +++ b/api/services/telephony/providers/cloudonix/routes.py @@ -12,6 +12,7 @@ from pipecat.utils.run_context import set_current_run_id from api.db import db_client from api.services.telephony.factory import get_telephony_provider_for_run +from api.services.telephony.providers.cloudonix.provider import CloudonixProvider from api.services.telephony.status_processor import ( StatusCallbackRequest, _process_status_update, @@ -120,8 +121,15 @@ async def handle_cloudonix_cdr(request: Request): set_current_run_id(workflow_run_id) logger.info(f"[run {workflow_run_id}] Processing Cloudonix CDR for call {call_id}") - # Convert CDR to status update using StatusCallbackRequest - status_update = StatusCallbackRequest.from_cloudonix_cdr(cdr_data) + parsed_data = CloudonixProvider.parse_cdr_status_callback(cdr_data) + status_update = StatusCallbackRequest( + call_id=parsed_data["call_id"], + status=parsed_data["status"], + from_number=parsed_data.get("from_number"), + to_number=parsed_data.get("to_number"), + duration=parsed_data.get("duration"), + extra=parsed_data.get("extra", {}), + ) # Process the status update await _process_status_update(workflow_run_id, status_update) diff --git a/api/services/telephony/providers/plivo/provider.py b/api/services/telephony/providers/plivo/provider.py index d6c336b5..494552bc 100644 --- a/api/services/telephony/providers/plivo/provider.py +++ b/api/services/telephony/providers/plivo/provider.py @@ -15,7 +15,7 @@ from fastapi import HTTPException from loguru import logger from api.db import db_client -from api.enums import WorkflowRunMode +from api.enums import TelephonyCallStatus, WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, @@ -281,17 +281,17 @@ class PlivoProvider(TelephonyProvider): def parse_status_callback(self, data: Dict[str, Any]) -> Dict[str, Any]: status_map = { - "in-progress": "answered", - "ringing": "ringing", - "ring": "ringing", - "completed": "completed", - "hangup": "completed", - "stopstream": "completed", - "busy": "busy", - "no-answer": "no-answer", - "cancel": "canceled", - "cancelled": "canceled", - "timeout": "no-answer", + "in-progress": TelephonyCallStatus.ANSWERED, + "ringing": TelephonyCallStatus.RINGING, + "ring": TelephonyCallStatus.RINGING, + "completed": TelephonyCallStatus.COMPLETED, + "hangup": TelephonyCallStatus.COMPLETED, + "stopstream": TelephonyCallStatus.COMPLETED, + "busy": TelephonyCallStatus.BUSY, + "no-answer": TelephonyCallStatus.NO_ANSWER, + "cancel": TelephonyCallStatus.CANCELED, + "cancelled": TelephonyCallStatus.CANCELED, + "timeout": TelephonyCallStatus.NO_ANSWER, } call_status = (data.get("CallStatus") or data.get("Event") or "").lower() diff --git a/api/services/telephony/providers/telnyx/provider.py b/api/services/telephony/providers/telnyx/provider.py index f14e0f15..3adaef52 100644 --- a/api/services/telephony/providers/telnyx/provider.py +++ b/api/services/telephony/providers/telnyx/provider.py @@ -25,7 +25,7 @@ TELNYX_TIMESTAMP_TOLERANCE_SECONDS = 300 TELNYX_PUBLIC_KEY_BYTES = 32 TELNYX_SIGNATURE_BYTES = 64 -from api.enums import WorkflowRunMode +from api.enums import TelephonyCallStatus, WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, @@ -305,23 +305,25 @@ class TelnyxProvider(TelephonyProvider): } @staticmethod - def _resolve_status(event_type: str, payload: Dict[str, Any]) -> str: + def _resolve_status( + event_type: str, payload: Dict[str, Any] + ) -> TelephonyCallStatus | str: """Map a Telnyx event type (and hangup cause) to a normalized status.""" EVENT_STATUS = { - "call.initiated": "initiated", - "call.answered": "in-progress", - "call.hangup": "completed", + "call.initiated": TelephonyCallStatus.INITIATED, + "call.answered": TelephonyCallStatus.IN_PROGRESS, + "call.hangup": TelephonyCallStatus.COMPLETED, "call.machine.detection.ended": "machine-detected", "streaming.started": "streaming-started", "streaming.stopped": "streaming-stopped", } HANGUP_STATUS = { - "busy": "busy", - "no_answer": "no-answer", - "timeout": "no-answer", - "call_rejected": "failed", - "unallocated_number": "failed", + "busy": TelephonyCallStatus.BUSY, + "no_answer": TelephonyCallStatus.NO_ANSWER, + "timeout": TelephonyCallStatus.NO_ANSWER, + "call_rejected": TelephonyCallStatus.FAILED, + "unallocated_number": TelephonyCallStatus.FAILED, } status = EVENT_STATUS.get(event_type, event_type) diff --git a/api/services/telephony/providers/twilio/provider.py b/api/services/telephony/providers/twilio/provider.py index e0deee34..57f75820 100644 --- a/api/services/telephony/providers/twilio/provider.py +++ b/api/services/telephony/providers/twilio/provider.py @@ -11,7 +11,7 @@ from fastapi import HTTPException from loguru import logger from twilio.request_validator import RequestValidator -from api.enums import WorkflowRunMode +from api.enums import TelephonyCallStatus, WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, @@ -230,9 +230,10 @@ class TwilioProvider(TelephonyProvider): """ Parse Twilio status callback data into generic format. """ + call_status = data.get("CallStatus", "") return { "call_id": data.get("CallSid", ""), - "status": data.get("CallStatus", ""), + "status": TelephonyCallStatus.from_raw(call_status) or call_status, "from_number": data.get("From"), "to_number": data.get("To"), "direction": data.get("Direction"), diff --git a/api/services/telephony/providers/vobiz/provider.py b/api/services/telephony/providers/vobiz/provider.py index 383da3c2..0641ceb6 100644 --- a/api/services/telephony/providers/vobiz/provider.py +++ b/api/services/telephony/providers/vobiz/provider.py @@ -14,7 +14,7 @@ import aiohttp from fastapi import HTTPException from loguru import logger -from api.enums import WorkflowRunMode +from api.enums import TelephonyCallStatus, WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, @@ -335,9 +335,10 @@ class VobizProvider(TelephonyProvider): - call_uuid (instead of CallSid) - status, from, to, duration, etc. """ + call_status = data.get("CallStatus", "") return { "call_id": data.get("CallUUID", ""), - "status": data.get("CallStatus", ""), + "status": TelephonyCallStatus.from_raw(call_status) or call_status, "from_number": data.get("From"), "to_number": data.get("To"), "direction": data.get("Direction"), diff --git a/api/services/telephony/providers/vonage/provider.py b/api/services/telephony/providers/vonage/provider.py index 880da209..325cf93a 100644 --- a/api/services/telephony/providers/vonage/provider.py +++ b/api/services/telephony/providers/vonage/provider.py @@ -12,7 +12,7 @@ import jwt from fastapi import HTTPException, Response from loguru import logger -from api.enums import WorkflowRunMode +from api.enums import TelephonyCallStatus, WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, NormalizedInboundData, @@ -291,14 +291,14 @@ class VonageProvider(TelephonyProvider): """ # Map Vonage status to common format status_map = { - "started": "initiated", - "ringing": "ringing", - "answered": "answered", - "complete": "completed", - "failed": "failed", - "busy": "busy", - "timeout": "no-answer", - "rejected": "busy", + "started": TelephonyCallStatus.INITIATED, + "ringing": TelephonyCallStatus.RINGING, + "answered": TelephonyCallStatus.ANSWERED, + "complete": TelephonyCallStatus.COMPLETED, + "failed": TelephonyCallStatus.FAILED, + "busy": TelephonyCallStatus.BUSY, + "timeout": TelephonyCallStatus.NO_ANSWER, + "rejected": TelephonyCallStatus.BUSY, } return { diff --git a/api/services/telephony/status_processor.py b/api/services/telephony/status_processor.py index b93a0d9e..c5d39b98 100644 --- a/api/services/telephony/status_processor.py +++ b/api/services/telephony/status_processor.py @@ -12,25 +12,96 @@ from loguru import logger from pydantic import BaseModel from api.db import db_client -from api.enums import WorkflowRunState +from api.enums import TelephonyCallStatus, WorkflowRunState from api.services.campaign.campaign_call_dispatcher import campaign_call_dispatcher from api.services.campaign.campaign_event_publisher import ( get_campaign_event_publisher, ) from api.services.campaign.circuit_breaker import circuit_breaker +from api.tasks.arq import enqueue_job +from api.tasks.function_names import FunctionNames + +TERMINAL_NOT_CONNECTED_STATUSES = frozenset( + { + TelephonyCallStatus.FAILED, + TelephonyCallStatus.BUSY, + TelephonyCallStatus.NO_ANSWER, + TelephonyCallStatus.CANCELED, + TelephonyCallStatus.ERROR, + } +) +IN_FLIGHT_STATUSES = frozenset( + { + TelephonyCallStatus.INITIATED, + TelephonyCallStatus.RINGING, + TelephonyCallStatus.IN_PROGRESS, + TelephonyCallStatus.ANSWERED, + } +) +RETRYABLE_NOT_CONNECTED_STATUSES = frozenset( + {TelephonyCallStatus.BUSY, TelephonyCallStatus.NO_ANSWER} +) +FAILURE_NOT_CONNECTED_STATUSES = frozenset( + {TelephonyCallStatus.ERROR, TelephonyCallStatus.FAILED} +) + + +def _status_value(value: object) -> str: + status = TelephonyCallStatus.from_raw(value) + if status is not None: + return status.value + + return str(value or "").lower() + + +def _duration_seconds(duration: str | None) -> int | float: + if duration in (None, ""): + return 0 + + try: + parsed = float(duration) + except (TypeError, ValueError): + return 0 + + return int(parsed) if parsed.is_integer() else parsed + + +def _append_unique_tags(existing_tags: object, new_tags: list[str]) -> list[str]: + tags = existing_tags if isinstance(existing_tags, list) else [] + merged = list(tags) + for tag in new_tags: + if tag not in merged: + merged.append(tag) + return merged + + +async def _enqueue_integrations_for_unconnected_run( + workflow_run_id: int, + status: str, +) -> None: + """Fire post-call integrations (e.g. webhooks) when a call ends before the + Pipecat pipeline ever starts. + + Enqueues integrations only -- deliberately *not* + ``PROCESS_WORKFLOW_COMPLETION`` -- so an unconnected call still triggers the + configured webhooks without incurring platform-usage billing. + """ + await enqueue_job(FunctionNames.RUN_INTEGRATIONS_POST_WORKFLOW_RUN, workflow_run_id) + logger.info( + f"[run {workflow_run_id}] Enqueued post-call integrations after terminal " + f"telephony status: {status}" + ) class StatusCallbackRequest(BaseModel): """Normalized status callback shape used across all telephony providers. - Per-provider converters live as classmethods (``from_twilio``, ``from_plivo``, - ``from_vonage``, ``from_cloudonix_cdr``) so the route handler for each - provider can map raw webhook payloads into this shape and hand off to - :func:`_process_status_update`. + Provider-specific route handlers map raw webhook payloads into this shape, + then hand it off to :func:`_process_status_update`. """ call_id: str - status: str + status: TelephonyCallStatus | str from_number: Optional[str] = None to_number: Optional[str] = None direction: Optional[str] = None @@ -38,102 +109,14 @@ class StatusCallbackRequest(BaseModel): extra: dict = {} - @classmethod - def from_twilio(cls, data: dict): - """Convert Twilio callback to generic format.""" - return cls( - call_id=data.get("CallSid", ""), - status=data.get("CallStatus", ""), - from_number=data.get("From"), - to_number=data.get("To"), - direction=data.get("Direction"), - duration=data.get("CallDuration") or data.get("Duration"), - extra=data, - ) - - @classmethod - def from_plivo(cls, data: dict): - """Convert Plivo callback to generic format.""" - status_map = { - "in-progress": "answered", - "ringing": "ringing", - "ring": "ringing", - "completed": "completed", - "hangup": "completed", - "stopstream": "completed", - "busy": "busy", - "no-answer": "no-answer", - "cancel": "canceled", - "cancelled": "canceled", - "timeout": "no-answer", - } - call_status = (data.get("CallStatus") or data.get("Event") or "").lower() - return cls( - call_id=data.get("CallUUID", "") or data.get("RequestUUID", ""), - status=status_map.get(call_status, call_status), - from_number=data.get("From"), - to_number=data.get("To"), - direction=data.get("Direction"), - duration=data.get("Duration"), - extra=data, - ) - - @classmethod - def from_vonage(cls, data: dict): - """Convert Vonage event to generic format.""" - status_map = { - "started": "initiated", - "ringing": "ringing", - "answered": "answered", - "complete": "completed", - "failed": "failed", - "busy": "busy", - "timeout": "no-answer", - "rejected": "busy", - } - - return cls( - call_id=data.get("uuid", ""), - status=status_map.get(data.get("status", ""), data.get("status", "")), - from_number=data.get("from"), - to_number=data.get("to"), - direction=data.get("direction"), - duration=data.get("duration"), - extra=data, - ) - - @classmethod - def from_cloudonix_cdr(cls, data: dict): - """Convert Cloudonix CDR to generic format.""" - disposition_map = { - "ANSWER": "completed", - "BUSY": "busy", - "CANCEL": "canceled", - "FAILED": "failed", - "CONGESTION": "failed", - "NOANSWER": "no-answer", - } - - disposition = data.get("disposition") or "" - status = disposition_map.get(disposition.upper(), disposition.lower()) - session = data.get("session") - call_id = session.get("token") if isinstance(session, dict) else "" - - return cls( - call_id=call_id or "", - status=status, - from_number=data.get("from"), - to_number=data.get("to"), - duration=str(data.get("billsec") or data.get("duration") or 0), - extra=data, - ) - async def _process_status_update(workflow_run_id: int, status: StatusCallbackRequest): """Process status updates from telephony providers. Idempotent: handles repeated callbacks (e.g. from both webhook and CDR). """ + normalized_status = TelephonyCallStatus.from_raw(status.status) + status_value = _status_value(status.status) workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: logger.warning( @@ -143,7 +126,7 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq telephony_callback_logs = workflow_run.logs.get("telephony_status_callbacks", []) telephony_callback_log = { - "status": status.status, + "status": status_value, "timestamp": datetime.now(UTC).isoformat(), "call_id": status.call_id, "duration": status.duration, @@ -156,7 +139,7 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq logs={"telephony_status_callbacks": telephony_callback_logs}, ) - if status.status == "completed": + if normalized_status == TelephonyCallStatus.COMPLETED: logger.info( f"[run {workflow_run_id}] Call completed with duration: {status.duration}s" ) @@ -174,26 +157,29 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq state=WorkflowRunState.COMPLETED.value, ) - elif status.status in ["failed", "busy", "no-answer", "canceled", "error"]: + elif normalized_status in TERMINAL_NOT_CONNECTED_STATUSES: logger.warning( - f"[run {workflow_run_id}] Call failed with status: {status.status}" + f"[run {workflow_run_id}] Call failed with status: {normalized_status.value}" ) if workflow_run.campaign_id: await campaign_call_dispatcher.release_call_slot(workflow_run_id) - is_failure = status.status in ("error", "failed") + is_failure = normalized_status in FAILURE_NOT_CONNECTED_STATUSES await circuit_breaker.record_and_evaluate( workflow_run.campaign_id, is_failure=is_failure, workflow_run_id=workflow_run_id if is_failure else None, - reason=status.status if is_failure else None, + reason=normalized_status.value if is_failure else None, ) - if status.status in ["busy", "no-answer"] and workflow_run.campaign_id: + if ( + normalized_status in RETRYABLE_NOT_CONNECTED_STATUSES + and workflow_run.campaign_id + ): publisher = await get_campaign_event_publisher() await publisher.publish_retry_needed( workflow_run_id=workflow_run_id, - reason=status.status.replace("-", "_"), + reason=normalized_status.value.replace("-", "_"), campaign_id=workflow_run.campaign_id, queued_run_id=workflow_run.queued_run_id, ) @@ -203,15 +189,42 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq if workflow_run.gathered_context else [] ) - call_tags.extend(["not_connected", f"telephony_{status.status.lower()}"]) - - await db_client.update_workflow_run( - run_id=workflow_run_id, - is_completed=True, - state=WorkflowRunState.COMPLETED.value, - gathered_context={"call_tags": call_tags}, + call_tags = _append_unique_tags( + call_tags, + ["not_connected", f"telephony_{normalized_status.value}"], ) - elif status.status in ["in-progress", "initiated", "ringing"]: + + gathered_context = { + "call_tags": call_tags, + "call_disposition": normalized_status.value, + "mapped_call_disposition": normalized_status.value, + } + if status.call_id: + gathered_context["call_id"] = status.call_id + + should_run_post_call_integrations = ( + workflow_run.state == WorkflowRunState.INITIALIZED.value + and not workflow_run.is_completed + ) + + update_kwargs = { + "run_id": workflow_run_id, + "is_completed": True, + "state": WorkflowRunState.COMPLETED.value, + "gathered_context": gathered_context, + } + if should_run_post_call_integrations: + update_kwargs["usage_info"] = { + "call_duration_seconds": _duration_seconds(status.duration) + } + + await db_client.update_workflow_run(**update_kwargs) + + if should_run_post_call_integrations: + await _enqueue_integrations_for_unconnected_run( + workflow_run_id, normalized_status.value + ) + elif normalized_status in IN_FLIGHT_STATUSES: # No-op while the call is in flight. pass else: diff --git a/api/services/workflow/disposition_mapper.py b/api/services/workflow/disposition_mapper.py deleted file mode 100644 index f26c015e..00000000 --- a/api/services/workflow/disposition_mapper.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Utility module for applying disposition code mapping.""" - -from loguru import logger - -from api.db import db_client -from api.enums import OrganizationConfigurationKey - - -async def apply_disposition_mapping(value: str, organization_id: int | None) -> str: - """Apply disposition code mapping if configured. - - Args: - value: The original disposition value to map - organization_id: The organization ID - - Returns: - The mapped value if found in configuration, otherwise the original value - """ - if not organization_id or not value: - return value - - try: - disposition_mapping = await db_client.get_configuration_value( - organization_id, - OrganizationConfigurationKey.DISPOSITION_CODE_MAPPING.value, - default={}, - ) - - if not disposition_mapping: - return value - - # Return mapped value if exists, otherwise original - # DISPOSITION_CODE_MAPPING looks like {"user_idle_max_duration_exceeded": "DAIR"} etc. - mapped_value = disposition_mapping.get(value, value) - - if mapped_value != value: - logger.debug( - f"Mapped disposition code from '{value}' to '{mapped_value}' " - f"for organization {organization_id}" - ) - - return mapped_value - - except Exception as e: - logger.error(f"Error applying disposition mapping: {e}") - return value diff --git a/api/services/workflow/pipecat_engine.py b/api/services/workflow/pipecat_engine.py index a0d67947..716a2800 100644 --- a/api/services/workflow/pipecat_engine.py +++ b/api/services/workflow/pipecat_engine.py @@ -19,7 +19,6 @@ from pipecat.utils.enums import EndTaskReason from api.db import db_client from api.enums import ToolCategory from api.services.pipecat.audio_playback import play_audio -from api.services.workflow.disposition_mapper import apply_disposition_mapping from api.services.workflow.workflow_graph import Node, WorkflowGraph if TYPE_CHECKING: @@ -751,38 +750,21 @@ class PipecatEngine: CancelFrame(reason=reason) if abort_immediately else EndFrame(reason=reason) ) - # Apply disposition mapping - first try call_disposition if it is, - # extracted from the call conversation then fall back to reason - call_disposition = self._gathered_context.get("call_disposition", "") - organization_id = await self._get_organization_id() + # Record the call disposition: prefer one extracted from the conversation, + # otherwise fall back to the disconnect reason. + call_disposition = self._gathered_context.get("call_disposition", "") or reason + self._gathered_context["call_disposition"] = call_disposition + self._gathered_context["mapped_call_disposition"] = call_disposition if call_disposition: - # If call_disposition exists, map it - mapped_disposition = await apply_disposition_mapping( - call_disposition, organization_id - ) - # Store the original and mapped values - self._gathered_context["extracted_call_disposition"] = call_disposition - self._gathered_context["call_disposition"] = call_disposition - self._gathered_context["mapped_call_disposition"] = mapped_disposition - else: - # Otherwise, map the disconnect reason - mapped_disposition = await apply_disposition_mapping( - reason, organization_id - ) - # Store the mapped disconnect reason - self._gathered_context["call_disposition"] = reason - self._gathered_context["mapped_call_disposition"] = mapped_disposition - - effective_disposition = self._gathered_context.get("call_disposition", "") - if effective_disposition: call_tags = self._gathered_context.get("call_tags", []) - if effective_disposition not in call_tags: - call_tags.append(effective_disposition) + if call_disposition not in call_tags: + call_tags.append(call_disposition) self._gathered_context["call_tags"] = call_tags logger.debug( - f"Finishing run with reason: {reason}, disposition: {mapped_disposition} queueing frame {frame_to_push}" + f"Finishing run with reason: {reason}, disposition: {call_disposition} " + f"queueing frame {frame_to_push}" ) await self.task.queue_frame(frame_to_push) diff --git a/api/tasks/run_integrations.py b/api/tasks/run_integrations.py index d99eaf0e..db23bec2 100644 --- a/api/tasks/run_integrations.py +++ b/api/tasks/run_integrations.py @@ -436,6 +436,15 @@ async def _execute_webhook_node( payload = render_template(webhook_data.payload_template or {}, render_context) + # Always surface the call disposition on the outgoing payload, even when the + # template author didn't reference it. Fill only if absent so a template that + # sets it explicitly keeps its own value. + if isinstance(payload, dict): + gathered_context = render_context.get("gathered_context") or {} + payload.setdefault( + "call_disposition", gathered_context.get("call_disposition", "") + ) + method = (webhook_data.http_method or "POST").upper() logger.info(f"Executing webhook '{webhook_name}': {method}") diff --git a/api/tests/integrations/_run_pipeline_helpers.py b/api/tests/integrations/_run_pipeline_helpers.py index 58b4ffd2..6b71c110 100644 --- a/api/tests/integrations/_run_pipeline_helpers.py +++ b/api/tests/integrations/_run_pipeline_helpers.py @@ -160,14 +160,6 @@ def patch_run_pipeline_externals( NoopFeedbackObserver, ) ) - # Disposition mapper would otherwise call out to the LLM. - stack.enter_context( - patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", - ) - ) # Capture the PipelineWorker so the test can drive it from outside. stack.enter_context( patch( diff --git a/api/tests/telephony/cloudonix/test_routes.py b/api/tests/telephony/cloudonix/test_routes.py index e22b0672..31cf220a 100644 --- a/api/tests/telephony/cloudonix/test_routes.py +++ b/api/tests/telephony/cloudonix/test_routes.py @@ -11,8 +11,9 @@ from unittest.mock import AsyncMock, patch import pytest from starlette.requests import Request +from api.enums import TelephonyCallStatus +from api.services.telephony.providers.cloudonix.provider import CloudonixProvider from api.services.telephony.providers.cloudonix.routes import handle_cloudonix_cdr -from api.services.telephony.status_processor import StatusCallbackRequest def _json_request(body: bytes) -> Request: @@ -79,33 +80,33 @@ async def test_cdr_route_handles_string_session(): assert result == {"status": "error", "message": "Missing call_id field"} -def test_from_cloudonix_cdr_tolerates_missing_session_and_disposition(): - """``from_cloudonix_cdr`` must not crash on a partial CDR payload.""" +def test_parse_cloudonix_cdr_tolerates_missing_session_and_disposition(): + """Cloudonix CDR parsing must not crash on a partial payload.""" # Missing both session and disposition. - req = StatusCallbackRequest.from_cloudonix_cdr({"domain": "acme.cloudonix.io"}) - assert req.call_id == "" - assert req.status == "" + req = CloudonixProvider.parse_cdr_status_callback({"domain": "acme.cloudonix.io"}) + assert req["call_id"] == "" + assert req["status"] == "" # Explicit null values. - req = StatusCallbackRequest.from_cloudonix_cdr( + req = CloudonixProvider.parse_cdr_status_callback( {"session": None, "disposition": None} ) - assert req.call_id == "" - assert req.status == "" + assert req["call_id"] == "" + assert req["status"] == "" -def test_from_cloudonix_cdr_tolerates_string_session(): - """``from_cloudonix_cdr`` treats a non-object session as missing call_id.""" - req = StatusCallbackRequest.from_cloudonix_cdr( +def test_parse_cloudonix_cdr_tolerates_string_session(): + """Cloudonix CDR parsing treats a non-object session as missing call_id.""" + req = CloudonixProvider.parse_cdr_status_callback( {"session": "abc", "disposition": "ANSWER"} ) - assert req.call_id == "" - assert req.status == "completed" + assert req["call_id"] == "" + assert req["status"] == TelephonyCallStatus.COMPLETED -def test_from_cloudonix_cdr_maps_disposition_and_session_token(): +def test_parse_cloudonix_cdr_maps_disposition_and_session_token(): """Normal, well-formed CDR payloads still map correctly.""" - req = StatusCallbackRequest.from_cloudonix_cdr( + req = CloudonixProvider.parse_cdr_status_callback( { "session": {"token": "abc123"}, "disposition": "BUSY", @@ -114,6 +115,20 @@ def test_from_cloudonix_cdr_maps_disposition_and_session_token(): "billsec": 12, } ) - assert req.call_id == "abc123" - assert req.status == "busy" - assert req.duration == "12" + assert req["call_id"] == "abc123" + assert req["status"] == TelephonyCallStatus.BUSY + assert req["duration"] == "12" + + +def test_parse_cloudonix_cdr_preserves_zero_billsec(): + """A zero billed duration must not fall back to total call duration.""" + req = CloudonixProvider.parse_cdr_status_callback( + { + "session": {"token": "abc123"}, + "disposition": "ANSWER", + "billsec": 0, + "duration": 42, + } + ) + + assert req["duration"] == "0" diff --git a/api/tests/telephony/test_status_processor.py b/api/tests/telephony/test_status_processor.py new file mode 100644 index 00000000..e0bbfd4a --- /dev/null +++ b/api/tests/telephony/test_status_processor.py @@ -0,0 +1,98 @@ +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import pytest + +from api.enums import TelephonyCallStatus, WorkflowRunState +from api.services.telephony.status_processor import ( + StatusCallbackRequest, + _process_status_update, +) +from api.tasks.function_names import FunctionNames + + +@pytest.mark.asyncio +async def test_initialized_no_answer_enqueues_workflow_completion(): + workflow_run = SimpleNamespace( + id=123, + campaign_id=None, + queued_run_id=None, + state=WorkflowRunState.INITIALIZED.value, + is_completed=False, + logs={"telephony_status_callbacks": []}, + gathered_context={"call_tags": ["existing"]}, + ) + status = StatusCallbackRequest( + call_id="call-123", + status="No-Answer", + ) + + with ( + patch("api.services.telephony.status_processor.db_client") as mock_db, + patch( + "api.services.telephony.status_processor.enqueue_job", + new_callable=AsyncMock, + ) as mock_enqueue, + ): + mock_db.get_workflow_run_by_id = AsyncMock(return_value=workflow_run) + mock_db.update_workflow_run = AsyncMock() + + await _process_status_update(123, status) + + log_update = mock_db.update_workflow_run.await_args_list[0].kwargs + callback_log = log_update["logs"]["telephony_status_callbacks"][0] + assert callback_log["status"] == "no-answer" + assert callback_log["call_id"] == "call-123" + + completion_update = mock_db.update_workflow_run.await_args_list[1].kwargs + assert completion_update["run_id"] == 123 + assert completion_update["is_completed"] is True + assert completion_update["state"] == WorkflowRunState.COMPLETED.value + assert completion_update["usage_info"] == {"call_duration_seconds": 0} + assert completion_update["gathered_context"] == { + "call_tags": ["existing", "not_connected", "telephony_no-answer"], + "call_disposition": "no-answer", + "mapped_call_disposition": "no-answer", + "call_id": "call-123", + } + mock_enqueue.assert_awaited_once_with( + FunctionNames.RUN_INTEGRATIONS_POST_WORKFLOW_RUN, 123 + ) + + +@pytest.mark.asyncio +async def test_running_terminal_status_does_not_enqueue_workflow_completion(): + workflow_run = SimpleNamespace( + id=456, + campaign_id=None, + queued_run_id=None, + state=WorkflowRunState.RUNNING.value, + is_completed=False, + logs={"telephony_status_callbacks": []}, + gathered_context={"call_tags": ["not_connected"]}, + ) + status = StatusCallbackRequest( + call_id="call-456", + status=TelephonyCallStatus.FAILED, + duration="7", + ) + + with ( + patch("api.services.telephony.status_processor.db_client") as mock_db, + patch( + "api.services.telephony.status_processor.enqueue_job", + new_callable=AsyncMock, + ) as mock_enqueue, + ): + mock_db.get_workflow_run_by_id = AsyncMock(return_value=workflow_run) + mock_db.update_workflow_run = AsyncMock() + + await _process_status_update(456, status) + + completion_update = mock_db.update_workflow_run.await_args_list[1].kwargs + assert "usage_info" not in completion_update + assert completion_update["gathered_context"]["call_tags"] == [ + "not_connected", + "telephony_failed", + ] + mock_enqueue.assert_not_awaited() diff --git a/api/tests/test_pipecat_engine_context_update.py b/api/tests/test_pipecat_engine_context_update.py index b73c65ba..63afbbb9 100644 --- a/api/tests/test_pipecat_engine_context_update.py +++ b/api/tests/test_pipecat_engine_context_update.py @@ -126,22 +126,17 @@ async def run_pipeline_and_capture_context( new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) return llm, context diff --git a/api/tests/test_pipecat_engine_end_call.py b/api/tests/test_pipecat_engine_end_call.py index bc4fea8e..b927369b 100644 --- a/api/tests/test_pipecat_engine_end_call.py +++ b/api/tests/test_pipecat_engine_end_call.py @@ -268,28 +268,23 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={"user_intent": "end call"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "end call"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) # Verify end_call_with_reason was called assert len(test_helper.end_call_reasons) >= 1, ( @@ -371,28 +366,23 @@ class TestEndCallViaNodeTransition: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={"greeting_type": "formal", "user_name": "John"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"greeting_type": "formal", "user_name": "John"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) # Should have 3 LLM generations assert llm.get_current_step() == 3 @@ -469,28 +459,23 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="end_call_tool", + return_value={"user_intent": "end"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "end"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) # Verify end_call_with_reason was called with END_CALL_TOOL_REASON assert len(test_helper.end_call_reasons) >= 1, ( @@ -560,28 +545,23 @@ class TestEndCallViaCustomTool: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="end_call_tool", + return_value={"user_intent": "end"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "end"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) # Verify end_call_with_reason was called assert len(test_helper.end_call_reasons) >= 1, ( @@ -637,37 +617,32 @@ class TestEndCallViaClientDisconnect: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="user_hangup", + return_value={"user_intent": "disconnected"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "disconnected"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_and_disconnect(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_and_disconnect(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for initial generation to complete - await asyncio.sleep(0.1) + # Wait for initial generation to complete + await asyncio.sleep(0.1) - # Simulate client disconnect by calling end_call_with_reason directly - # This is what on_client_disconnected does - await engine.end_call_with_reason( - EndTaskReason.USER_HANGUP.value, abort_immediately=True - ) + # Simulate client disconnect by calling end_call_with_reason directly + # This is what on_client_disconnected does + await engine.end_call_with_reason( + EndTaskReason.USER_HANGUP.value, abort_immediately=True + ) - await asyncio.gather(run_pipeline(), initialize_and_disconnect()) + await asyncio.gather(run_pipeline(), initialize_and_disconnect()) # Verify end_call_with_reason was called with USER_HANGUP assert EndTaskReason.USER_HANGUP.value in test_helper.end_call_reasons, ( @@ -727,46 +702,41 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="first_reason", + return_value={"user_intent": "end"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "end"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_and_race(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_and_race(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for initial generation - await asyncio.sleep(0.1) + # Wait for initial generation + await asyncio.sleep(0.1) - # Try to end call multiple times concurrently - await asyncio.gather( - engine.end_call_with_reason( - EndTaskReason.USER_HANGUP.value, abort_immediately=True - ), - engine.end_call_with_reason( - EndTaskReason.END_CALL_TOOL_REASON.value, - abort_immediately=True, - ), - engine.end_call_with_reason( - EndTaskReason.USER_QUALIFIED.value, - abort_immediately=False, - ), - ) + # Try to end call multiple times concurrently + await asyncio.gather( + engine.end_call_with_reason( + EndTaskReason.USER_HANGUP.value, abort_immediately=True + ), + engine.end_call_with_reason( + EndTaskReason.END_CALL_TOOL_REASON.value, + abort_immediately=True, + ), + engine.end_call_with_reason( + EndTaskReason.USER_QUALIFIED.value, + abort_immediately=False, + ), + ) - await asyncio.gather(run_pipeline(), initialize_and_race()) + await asyncio.gather(run_pipeline(), initialize_and_race()) # Due to the _call_disposed guard, only one end_call should fully execute # The tracked end_call_reasons will show all attempted calls @@ -838,41 +808,34 @@ class TestEndCallRaceConditions: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="end_reason", + return_value={"user_intent": "end"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "end"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_and_race_disconnect(): - nonlocal disconnect_called - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_and_race_disconnect(): + nonlocal disconnect_called + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for the end_call tool to be called - await asyncio.sleep(0.15) + # Wait for the end_call tool to be called + await asyncio.sleep(0.15) - # Simulate client disconnect racing with end_call tool - disconnect_called = True - await engine.end_call_with_reason( - EndTaskReason.USER_HANGUP.value, abort_immediately=True - ) - - await asyncio.gather( - run_pipeline(), initialize_and_race_disconnect() + # Simulate client disconnect racing with end_call tool + disconnect_called = True + await engine.end_call_with_reason( + EndTaskReason.USER_HANGUP.value, abort_immediately=True ) + await asyncio.gather(run_pipeline(), initialize_and_race_disconnect()) + # Verify disconnect was attempted assert disconnect_called, "Disconnect should have been called" @@ -933,40 +896,35 @@ class TestEndCallExtractionBehavior: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", + with patch.object( + VariableExtractionManager, + "_perform_extraction", + side_effect=mock_extraction, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - side_effect=mock_extraction, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_and_end(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_and_end(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for initial generation - await asyncio.sleep(0.1) + # Wait for initial generation + await asyncio.sleep(0.1) - # End the call - await engine.end_call_with_reason( - EndTaskReason.USER_HANGUP.value, abort_immediately=True - ) + # End the call + await engine.end_call_with_reason( + EndTaskReason.USER_HANGUP.value, abort_immediately=True + ) - # Verify extraction was awaited (synchronous) - assert extraction_completed.is_set(), ( - "Extraction should have completed before end_call returned" - ) + # Verify extraction was awaited (synchronous) + assert extraction_completed.is_set(), ( + "Extraction should have completed before end_call returned" + ) - await asyncio.gather(run_pipeline(), initialize_and_end()) + await asyncio.gather(run_pipeline(), initialize_and_end()) # Verify synchronous extraction was used sync_extractions = [ @@ -1058,35 +1016,30 @@ class TestEndCallExtractionBehavior: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", + with patch.object( + VariableExtractionManager, + "_perform_extraction", + extraction_mock, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - extraction_mock, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_and_end(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_and_end(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for initial generation - await asyncio.sleep(0.1) + # Wait for initial generation + await asyncio.sleep(0.1) - # End the call - await engine.end_call_with_reason( - EndTaskReason.USER_HANGUP.value, abort_immediately=True - ) + # End the call + await engine.end_call_with_reason( + EndTaskReason.USER_HANGUP.value, abort_immediately=True + ) - await asyncio.gather(run_pipeline(), initialize_and_end()) + await asyncio.gather(run_pipeline(), initialize_and_end()) # Extraction should have been called but the inner _perform_extraction # should not have been called because extraction_enabled=False diff --git a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py index d815a694..aeebfe76 100644 --- a/api/tests/test_pipecat_engine_node_switch_with_user_speech.py +++ b/api/tests/test_pipecat_engine_node_switch_with_user_speech.py @@ -281,24 +281,19 @@ class TestNodeSwitchWithUserSpeech: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - # Start the LLM generation - user speech will be injected - # automatically when FunctionCallResultFrame #1 is seen - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + # Start the LLM generation - user speech will be injected + # automatically when FunctionCallResultFrame #1 is seen + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) # Total 4 generations out of which 1 was cancelled due to interruption assert llm.get_current_step() == 4 diff --git a/api/tests/test_pipecat_engine_tool_calls.py b/api/tests/test_pipecat_engine_tool_calls.py index 5c71c09d..92d3c54d 100644 --- a/api/tests/test_pipecat_engine_tool_calls.py +++ b/api/tests/test_pipecat_engine_tool_calls.py @@ -117,24 +117,19 @@ async def run_pipeline_with_tool_calls( new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - # Small delay to let runner start - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + # Small delay to let runner start + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Run both concurrently - await asyncio.gather(run_pipeline(), initialize_engine()) + # Run both concurrently + await asyncio.gather(run_pipeline(), initialize_engine()) return llm, context diff --git a/api/tests/test_pipecat_engine_transition_mute.py b/api/tests/test_pipecat_engine_transition_mute.py index 1bb37774..9a6636f3 100644 --- a/api/tests/test_pipecat_engine_transition_mute.py +++ b/api/tests/test_pipecat_engine_transition_mute.py @@ -171,31 +171,26 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={"user_intent": "end call"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "end call"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.wait_for( - asyncio.gather(run_pipeline(), initialize_engine()), - timeout=10.0, - ) + await asyncio.wait_for( + asyncio.gather(run_pipeline(), initialize_engine()), + timeout=10.0, + ) assert len(captured_states) == 1, ( f"Expected the transition function to be invoked exactly once, " @@ -245,31 +240,26 @@ class TestTransitionFunctionMutesUser: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={"user_intent": "end call"}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_intent": "end call"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.wait_for( - asyncio.gather(run_pipeline(), initialize_engine()), - timeout=10.0, - ) + await asyncio.wait_for( + asyncio.gather(run_pipeline(), initialize_engine()), + timeout=10.0, + ) assert function_call_mute_strategy._function_call_in_progress == set(), ( "FunctionCallUserMuteStrategy should have cleared its in-progress " diff --git a/api/tests/test_pipecat_engine_variable_extraction.py b/api/tests/test_pipecat_engine_variable_extraction.py index 9adfd867..12887a4b 100644 --- a/api/tests/test_pipecat_engine_variable_extraction.py +++ b/api/tests/test_pipecat_engine_variable_extraction.py @@ -156,29 +156,24 @@ class TestVariableExtractionDuringTransitions: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + # Mock the actual extraction to avoid needing a real LLM + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={"user_name": "John Doe"}, ): - # Mock the actual extraction to avoid needing a real LLM - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={"user_name": "John Doe"}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) # Should have 3 LLM generations assert llm.get_current_step() == 3 diff --git a/api/tests/test_run_integrations_webhook.py b/api/tests/test_run_integrations_webhook.py new file mode 100644 index 00000000..326e6905 --- /dev/null +++ b/api/tests/test_run_integrations_webhook.py @@ -0,0 +1,88 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from api.services.workflow.dto import WebhookNodeData +from api.tasks.run_integrations import _execute_webhook_node + + +def _mock_httpx_client(captured: dict): + """Build a patch target for httpx.AsyncClient that records the request kwargs.""" + response = MagicMock() + response.status_code = 200 + response.raise_for_status = MagicMock() + + async def _request(**kwargs): + captured.update(kwargs) + return response + + client = MagicMock() + client.request = AsyncMock(side_effect=_request) + + ctx = MagicMock() + ctx.__aenter__ = AsyncMock(return_value=client) + ctx.__aexit__ = AsyncMock(return_value=False) + return MagicMock(return_value=ctx) + + +@pytest.mark.asyncio +async def test_webhook_injects_disposition_when_absent(): + """call_disposition is added to the payload when the template omits it.""" + webhook = WebhookNodeData( + name="Test Webhook", + enabled=True, + endpoint_url="https://example.com/hook", + payload_template={"event": "call_done"}, + ) + render_context = {"gathered_context": {"call_disposition": "no-answer"}} + + captured: dict = {} + with patch( + "api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured) + ): + ok = await _execute_webhook_node(webhook, render_context, organization_id=1) + + assert ok is True + assert captured["json"] == { + "event": "call_done", + "call_disposition": "no-answer", + } + + +@pytest.mark.asyncio +async def test_webhook_preserves_template_disposition(): + """A disposition key set explicitly in the template is not overwritten.""" + webhook = WebhookNodeData( + name="Test Webhook", + enabled=True, + endpoint_url="https://example.com/hook", + payload_template={"call_disposition": "custom-from-template"}, + ) + render_context = {"gathered_context": {"call_disposition": "no-answer"}} + + captured: dict = {} + with patch( + "api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured) + ): + await _execute_webhook_node(webhook, render_context, organization_id=1) + + assert captured["json"]["call_disposition"] == "custom-from-template" + + +@pytest.mark.asyncio +async def test_webhook_injects_empty_disposition_when_context_missing(): + """Missing gathered_context values fall back to an empty string, not omission.""" + webhook = WebhookNodeData( + name="Test Webhook", + enabled=True, + endpoint_url="https://example.com/hook", + payload_template={}, + ) + + captured: dict = {} + with patch( + "api.tasks.run_integrations.httpx.AsyncClient", _mock_httpx_client(captured) + ): + await _execute_webhook_node(webhook, {}, organization_id=1) + + assert captured["json"] == {"call_disposition": ""} diff --git a/api/tests/test_text_and_audio_playback.py b/api/tests/test_text_and_audio_playback.py index b46dc215..03897417 100644 --- a/api/tests/test_text_and_audio_playback.py +++ b/api/tests/test_text_and_audio_playback.py @@ -241,11 +241,6 @@ async def run_pipeline_and_capture_frames( new_callable=AsyncMock, return_value=1, ), - patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", - ), ): async def run(): diff --git a/api/tests/test_tts_endframe_with_audio_write_failure.py b/api/tests/test_tts_endframe_with_audio_write_failure.py index cc34a797..f7cd78e2 100644 --- a/api/tests/test_tts_endframe_with_audio_write_failure.py +++ b/api/tests/test_tts_endframe_with_audio_write_failure.py @@ -208,63 +208,58 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_and_end_call(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) + async def initialize_and_end_call(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) - # Start LLM generation - this will trigger TTS - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + # Start LLM generation - this will trigger TTS + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Sleep so that processing is paused in TTS Service - await asyncio.sleep(0.1) + # Sleep so that processing is paused in TTS Service + await asyncio.sleep(0.1) - await engine.end_call_with_reason( - EndTaskReason.USER_HANGUP.value, - abort_immediately=False, - ) - - # Create tasks explicitly for better control - pipeline_task = asyncio.create_task(run_pipeline()) - end_call_task = asyncio.create_task(initialize_and_end_call()) - - # Wait with timeout - done, pending = await asyncio.wait( - [pipeline_task, end_call_task], - timeout=3.0, - return_when=asyncio.ALL_COMPLETED, + await engine.end_call_with_reason( + EndTaskReason.USER_HANGUP.value, + abort_immediately=False, ) - # If there are pending tasks, we timed out - if pending: - test_timed_out = True - # Cancel all pending tasks - for t in pending: - t.cancel() + # Create tasks explicitly for better control + pipeline_task = asyncio.create_task(run_pipeline()) + end_call_task = asyncio.create_task(initialize_and_end_call()) - # Give limited time for cleanup - try: - await asyncio.wait_for( - asyncio.gather(*pending, return_exceptions=True), - timeout=1.0, - ) - except asyncio.TimeoutError: - pass # Cleanup took too long, continue anyway + # Wait with timeout + done, pending = await asyncio.wait( + [pipeline_task, end_call_task], + timeout=3.0, + return_when=asyncio.ALL_COMPLETED, + ) + + # If there are pending tasks, we timed out + if pending: + test_timed_out = True + # Cancel all pending tasks + for t in pending: + t.cancel() + + # Give limited time for cleanup + try: + await asyncio.wait_for( + asyncio.gather(*pending, return_exceptions=True), + timeout=1.0, + ) + except asyncio.TimeoutError: + pass # Cleanup took too long, continue anyway # Verify audio write was attempted but failed output_transport = transport._output @@ -327,62 +322,57 @@ class TestTTSPauseWithAudioWriteFailure: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_and_observe(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) + async def initialize_and_observe(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Sleep so that processing is paused in TTS Service - await asyncio.sleep(0.1) + # Sleep so that processing is paused in TTS Service + await asyncio.sleep(0.1) - await engine.end_call_with_reason( - EndTaskReason.USER_HANGUP.value, - abort_immediately=False, - ) - - # Create tasks explicitly for better control - pipeline_task = asyncio.create_task(run_pipeline()) - end_call_task = asyncio.create_task(initialize_and_observe()) - - # Wait with timeout - done, pending = await asyncio.wait( - [pipeline_task, end_call_task], - timeout=3.0, - return_when=asyncio.ALL_COMPLETED, + await engine.end_call_with_reason( + EndTaskReason.USER_HANGUP.value, + abort_immediately=False, ) - # If there are pending tasks, we timed out - if pending: - test_timed_out = True - # Cancel all pending tasks - for t in pending: - t.cancel() + # Create tasks explicitly for better control + pipeline_task = asyncio.create_task(run_pipeline()) + end_call_task = asyncio.create_task(initialize_and_observe()) - # Give limited time for cleanup - try: - await asyncio.wait_for( - asyncio.gather(*pending, return_exceptions=True), - timeout=1.0, - ) - except asyncio.TimeoutError: - pass # Cleanup took too long, continue anyway + # Wait with timeout + done, pending = await asyncio.wait( + [pipeline_task, end_call_task], + timeout=3.0, + return_when=asyncio.ALL_COMPLETED, + ) + + # If there are pending tasks, we timed out + if pending: + test_timed_out = True + # Cancel all pending tasks + for t in pending: + t.cancel() + + # Give limited time for cleanup + try: + await asyncio.wait_for( + asyncio.gather(*pending, return_exceptions=True), + timeout=1.0, + ) + except asyncio.TimeoutError: + pass # Cleanup took too long, continue anyway # Verify some frames were written successfully before failure output_transport = transport._output diff --git a/api/tests/test_user_idle_handler.py b/api/tests/test_user_idle_handler.py index 5e3b622a..34c6448e 100644 --- a/api/tests/test_user_idle_handler.py +++ b/api/tests/test_user_idle_handler.py @@ -261,22 +261,17 @@ class TestUserIdleHandler: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", - new_callable=AsyncMock, - return_value="completed", - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def initialize_engine(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + async def initialize_engine(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - await asyncio.gather(run_pipeline(), initialize_engine()) + await asyncio.gather(run_pipeline(), initialize_engine()) # All 5 LLM steps should have been consumed assert llm.get_current_step() == 5 diff --git a/api/tests/test_user_muting_during_bot_speech.py b/api/tests/test_user_muting_during_bot_speech.py index 6a6acf65..add3c024 100644 --- a/api/tests/test_user_muting_during_bot_speech.py +++ b/api/tests/test_user_muting_during_bot_speech.py @@ -247,50 +247,45 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def run_test(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) + async def run_test(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) - # Trigger first LLM completion - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + # Trigger first LLM completion + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for first bot started - await asyncio.wait_for( - observer.first_bot_started.wait(), timeout=5.0 - ) - - # Queue user speaking frames so that second generation starts - await queue_user_speaking_and_transcript_frames(task) - - # Wait for first bot stopped - await asyncio.wait_for( - observer.first_bot_stopped.wait(), timeout=5.0 - ) - - await task.cancel() - - await asyncio.gather( - run_pipeline(), - run_test(), - return_exceptions=True, + # Wait for first bot started + await asyncio.wait_for( + observer.first_bot_started.wait(), timeout=5.0 ) + # Queue user speaking frames so that second generation starts + await queue_user_speaking_and_transcript_frames(task) + + # Wait for first bot stopped + await asyncio.wait_for( + observer.first_bot_stopped.wait(), timeout=5.0 + ) + + await task.cancel() + + await asyncio.gather( + run_pipeline(), + run_test(), + return_exceptions=True, + ) + # VERIFY: Muted at first BotStartedSpeaking assert len(observer.mute_status_on_bot_started) >= 1 assert observer.mute_status_on_bot_started[0] is True, ( @@ -337,55 +332,50 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def run_test(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) + async def run_test(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) - # Trigger first LLM completion - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + # Trigger first LLM completion + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for first bot stopped (first response complete) - await asyncio.wait_for( - observer.first_bot_stopped.wait(), timeout=5.0 - ) - - # Queue user speaking frames for second generation - await queue_user_speaking_and_transcript_frames(task) - - # Wait for second bot started - await asyncio.wait_for( - observer.second_bot_started.wait(), timeout=5.0 - ) - - # Wait for second bot stopped - await asyncio.wait_for( - observer.second_bot_stopped.wait(), timeout=5.0 - ) - - await task.cancel() - - await asyncio.gather( - run_pipeline(), - run_test(), - return_exceptions=True, + # Wait for first bot stopped (first response complete) + await asyncio.wait_for( + observer.first_bot_stopped.wait(), timeout=5.0 ) + # Queue user speaking frames for second generation + await queue_user_speaking_and_transcript_frames(task) + + # Wait for second bot started + await asyncio.wait_for( + observer.second_bot_started.wait(), timeout=5.0 + ) + + # Wait for second bot stopped + await asyncio.wait_for( + observer.second_bot_stopped.wait(), timeout=5.0 + ) + + await task.cancel() + + await asyncio.gather( + run_pipeline(), + run_test(), + return_exceptions=True, + ) + # VERIFY: First bot started - should be muted (MuteUntilFirstBotComplete) assert len(observer.mute_status_on_bot_started) >= 2 assert observer.mute_status_on_bot_started[0] is True, ( @@ -432,55 +422,50 @@ class TestUserMutingDuringBotSpeech: new_callable=AsyncMock, return_value=1, ): - with patch( - "api.services.workflow.pipecat_engine.apply_disposition_mapping", + with patch.object( + VariableExtractionManager, + "_perform_extraction", new_callable=AsyncMock, - return_value="completed", + return_value={}, ): - with patch.object( - VariableExtractionManager, - "_perform_extraction", - new_callable=AsyncMock, - return_value={}, - ): - async def run_pipeline(): - await run_pipeline_worker(task) + async def run_pipeline(): + await run_pipeline_worker(task) - async def run_test(): - await asyncio.sleep(0.01) - await engine.initialize() - await engine.set_node(engine.workflow.start_node_id) + async def run_test(): + await asyncio.sleep(0.01) + await engine.initialize() + await engine.set_node(engine.workflow.start_node_id) - # Trigger first LLM completion - await engine.llm.queue_frame(LLMContextFrame(engine.context)) + # Trigger first LLM completion + await engine.llm.queue_frame(LLMContextFrame(engine.context)) - # Wait for first bot stopped (first response complete) - await asyncio.wait_for( - observer.first_bot_stopped.wait(), timeout=5.0 - ) - - # Queue user speaking frames for second llm generation - await queue_user_speaking_and_transcript_frames(task) - - # Wait for second bot started - await asyncio.wait_for( - observer.second_bot_started.wait(), timeout=5.0 - ) - - # Wait for second bot stopped - await asyncio.wait_for( - observer.second_bot_stopped.wait(), timeout=5.0 - ) - - await task.cancel() - - await asyncio.gather( - run_pipeline(), - run_test(), - return_exceptions=True, + # Wait for first bot stopped (first response complete) + await asyncio.wait_for( + observer.first_bot_stopped.wait(), timeout=5.0 ) + # Queue user speaking frames for second llm generation + await queue_user_speaking_and_transcript_frames(task) + + # Wait for second bot started + await asyncio.wait_for( + observer.second_bot_started.wait(), timeout=5.0 + ) + + # Wait for second bot stopped + await asyncio.wait_for( + observer.second_bot_stopped.wait(), timeout=5.0 + ) + + await task.cancel() + + await asyncio.gather( + run_pipeline(), + run_test(), + return_exceptions=True, + ) + # VERIFY: First bot started - should be muted (MuteUntilFirstBotComplete) assert len(observer.mute_status_on_bot_started) >= 2 assert observer.mute_status_on_bot_started[0] is True, ( diff --git a/sdk/python/src/dograh_sdk/_generated_models.py b/sdk/python/src/dograh_sdk/_generated_models.py index cd56b905..090eee70 100644 --- a/sdk/python/src/dograh_sdk/_generated_models.py +++ b/sdk/python/src/dograh_sdk/_generated_models.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: -# filename: dograh-openapi-XXXXXX.json.rRr9IUrKFk -# timestamp: 2026-06-23T13:02:10+00:00 +# filename: dograh-openapi-XXXXXX.json.YWer7ilGLp +# timestamp: 2026-06-24T16:36:26+00:00 from __future__ import annotations