diff --git a/api/routes/telephony.py b/api/routes/telephony.py index 5537c55..71571bd 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -747,11 +747,6 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq logs={"telephony_status_callbacks": telephony_callback_logs}, ) - # The workflow run state is already marked as completed from either status-update - # callbacks or CDR update callbacks. Lets skip processing. - if workflow_run.state == WorkflowRunState.COMPLETED.value: - return - # Handle call completion - make these updates idempotent - i.e # they should handle multiple API calls (one due to status update, # and other due to CDR updates.) @@ -768,11 +763,12 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq ) # Mark workflow run as completed - await db_client.update_workflow_run( - run_id=workflow_run_id, - is_completed=True, - state=WorkflowRunState.COMPLETED.value, - ) + if workflow_run.state != WorkflowRunState.COMPLETED.value: + await db_client.update_workflow_run( + run_id=workflow_run_id, + is_completed=True, + state=WorkflowRunState.COMPLETED.value, + ) elif status.status in ["failed", "busy", "no-answer", "canceled", "error"]: logger.warning( @@ -813,6 +809,10 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq state=WorkflowRunState.COMPLETED.value, gathered_context={"call_tags": call_tags}, ) + else: + logger.warning( + f"[run {workflow_run_id}] Unexpected status update: {status.status}" + ) @router.post("/vonage/events/{workflow_run_id}") diff --git a/api/services/pipecat/event_handlers.py b/api/services/pipecat/event_handlers.py index 0304f9c..76842a4 100644 --- a/api/services/pipecat/event_handlers.py +++ b/api/services/pipecat/event_handlers.py @@ -2,7 +2,6 @@ from loguru import logger from api.db import db_client from api.enums import WorkflowRunState -from api.services.campaign.campaign_call_dispatcher import campaign_call_dispatcher from api.services.campaign.circuit_breaker import circuit_breaker from api.services.pipecat.audio_config import AudioConfig from api.services.pipecat.in_memory_buffers import ( @@ -186,7 +185,9 @@ def register_event_handlers( usage_info = pipeline_metrics_aggregator.get_all_usage_metrics_serialized() - logger.debug(f"Usage metrics: {usage_info}") + logger.debug( + f"Usage metrics: {usage_info}, Gathered context: {gathered_context}" + ) await db_client.update_workflow_run( run_id=workflow_run_id, @@ -212,10 +213,6 @@ def register_event_handlers( else: logger.debug("Logs buffer is empty, skipping save") - # Release concurrent slot for campaign calls - if workflow_run and workflow_run.campaign_id: - await campaign_call_dispatcher.release_call_slot(workflow_run_id) - # Write buffers to temp files and enqueue combined processing task audio_temp_path = None transcript_temp_path = None diff --git a/pipecat b/pipecat index 927ed9b..26e335f 160000 --- a/pipecat +++ b/pipecat @@ -1 +1 @@ -Subproject commit 927ed9bae29e52c14230feb9f61e7ef2a551b0b8 +Subproject commit 26e335fbfdf792d54ad37da414998762f111e231