fix: add lock in workflow run update, refactor _handle_stasis_start

This commit is contained in:
Sabiha Khan 2026-02-17 18:58:16 +05:30
parent f3b4de9eb6
commit fe80c11ae5
2 changed files with 14 additions and 15 deletions

View file

@ -321,8 +321,11 @@ class WorkflowRunClient(BaseDBClient):
state: str | None = None,
) -> WorkflowRunModel:
async with self.async_session() as session:
# Use SELECT FOR UPDATE to lock the row during the update
result = await session.execute(
select(WorkflowRunModel).where(WorkflowRunModel.id == run_id)
select(WorkflowRunModel)
.where(WorkflowRunModel.id == run_id)
.with_for_update()
)
run = result.scalars().first()
if not run:

View file

@ -479,7 +479,10 @@ class ARIConnection:
f"(caller={caller_number}, called={called_number})"
)
# 5. Delegate to the standard pipeline
# 5. Answer the inbound channel
await self._answer_channel(channel_id)
# 6. Delegate to the standard pipeline
await self._handle_stasis_start(
channel_id,
channel_state,
@ -505,24 +508,17 @@ class ARIConnection:
workflow_id: str,
user_id: str,
):
"""Handle StasisStart by answering (if needed), creating external media, and bridging."""
"""Handle StasisStart by creating external media and bridging."""
try:
# 1. Only answer the channel if it's not already up
# For outbound calls, the channel enters Stasis in "Up" state
# after the remote party answers — no need to answer again.
# For inbound calls, the channel may be in "Ring" state.
if channel_state != "Up":
await self._answer_channel(channel_id)
logger.info(
f"[ARI org={self.organization_id}] Setting up external media for "
f"channel {channel_id} via ws_client={self.ws_client_name}"
)
# 2. Track channel for StasisEnd cleanup (Redis)
# 1. Track channel for StasisEnd cleanup (Redis)
await self._set_channel_run(channel_id, workflow_run_id)
# 3. Create external media channel via chan_websocket
# 2. Create external media channel via chan_websocket
# Asterisk connects to our backend using websocket_client.conf config,
# with routing params appended as URI query params via v()
ext_channel_id = await self._create_external_media(
@ -534,10 +530,10 @@ class ARIConnection:
)
return
# 4. Track ext channel for StasisEnd cleanup (Redis)
# 3. Track ext channel for StasisEnd cleanup (Redis)
await self._set_channel_run(ext_channel_id, workflow_run_id)
# 5. Bridge the call channel with the external media channel
# 4. Bridge the call channel with the external media channel
bridge_id = await self._create_bridge_and_add_channels(
[channel_id, ext_channel_id]
)
@ -547,7 +543,7 @@ class ARIConnection:
)
return
# 6. Store ARI resource IDs in gathered_context for cleanup/debugging
# 5. Store ARI resource IDs in gathered_context for cleanup/debugging
await db_client.update_workflow_run(
run_id=int(workflow_run_id),
gathered_context={