diff --git a/api/db/workflow_run_client.py b/api/db/workflow_run_client.py index 657804e..3cbf1bc 100644 --- a/api/db/workflow_run_client.py +++ b/api/db/workflow_run_client.py @@ -321,8 +321,11 @@ class WorkflowRunClient(BaseDBClient): state: str | None = None, ) -> WorkflowRunModel: async with self.async_session() as session: + # Use SELECT FOR UPDATE to lock the row during the update result = await session.execute( - select(WorkflowRunModel).where(WorkflowRunModel.id == run_id) + select(WorkflowRunModel) + .where(WorkflowRunModel.id == run_id) + .with_for_update() ) run = result.scalars().first() if not run: diff --git a/api/services/telephony/ari_manager.py b/api/services/telephony/ari_manager.py index 47e9f30..a033c5a 100644 --- a/api/services/telephony/ari_manager.py +++ b/api/services/telephony/ari_manager.py @@ -479,7 +479,10 @@ class ARIConnection: f"(caller={caller_number}, called={called_number})" ) - # 5. Delegate to the standard pipeline + # 5. Answer the inbound channel + await self._answer_channel(channel_id) + + # 6. Delegate to the standard pipeline await self._handle_stasis_start( channel_id, channel_state, @@ -505,24 +508,17 @@ class ARIConnection: workflow_id: str, user_id: str, ): - """Handle StasisStart by answering (if needed), creating external media, and bridging.""" + """Handle StasisStart by creating external media and bridging.""" try: - # 1. Only answer the channel if it's not already up - # For outbound calls, the channel enters Stasis in "Up" state - # after the remote party answers — no need to answer again. - # For inbound calls, the channel may be in "Ring" state. - if channel_state != "Up": - await self._answer_channel(channel_id) - logger.info( f"[ARI org={self.organization_id}] Setting up external media for " f"channel {channel_id} via ws_client={self.ws_client_name}" ) - # 2. Track channel for StasisEnd cleanup (Redis) + # 1. Track channel for StasisEnd cleanup (Redis) await self._set_channel_run(channel_id, workflow_run_id) - # 3. Create external media channel via chan_websocket + # 2. Create external media channel via chan_websocket # Asterisk connects to our backend using websocket_client.conf config, # with routing params appended as URI query params via v() ext_channel_id = await self._create_external_media( @@ -534,10 +530,10 @@ class ARIConnection: ) return - # 4. Track ext channel for StasisEnd cleanup (Redis) + # 3. Track ext channel for StasisEnd cleanup (Redis) await self._set_channel_run(ext_channel_id, workflow_run_id) - # 5. Bridge the call channel with the external media channel + # 4. Bridge the call channel with the external media channel bridge_id = await self._create_bridge_and_add_channels( [channel_id, ext_channel_id] ) @@ -547,7 +543,7 @@ class ARIConnection: ) return - # 6. Store ARI resource IDs in gathered_context for cleanup/debugging + # 5. Store ARI resource IDs in gathered_context for cleanup/debugging await db_client.update_workflow_run( run_id=int(workflow_run_id), gathered_context={