diff --git a/api/alembic/versions/02ffd7f23d1d_add_index_in_workflow_run.py b/api/alembic/versions/02ffd7f23d1d_add_index_in_workflow_run.py new file mode 100644 index 0000000..eca83e7 --- /dev/null +++ b/api/alembic/versions/02ffd7f23d1d_add_index_in_workflow_run.py @@ -0,0 +1,32 @@ +"""add index in workflow run + +Revision ID: 02ffd7f23d1d +Revises: d1dac4c93e61 +Create Date: 2026-01-29 20:36:57.924887 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '02ffd7f23d1d' +down_revision: Union[str, None] = 'd1dac4c93e61' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_index('idx_workflow_runs_campaign_id', 'workflow_runs', ['campaign_id'], unique=False) + op.create_index('idx_workflow_runs_workflow_id', 'workflow_runs', ['workflow_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('idx_workflow_runs_workflow_id', table_name='workflow_runs') + op.drop_index('idx_workflow_runs_campaign_id', table_name='workflow_runs') + # ### end Alembic commands ### diff --git a/api/db/embed_token_client.py b/api/db/embed_token_client.py index 550676f..2ef30b9 100644 --- a/api/db/embed_token_client.py +++ b/api/db/embed_token_client.py @@ -287,6 +287,8 @@ class EmbedTokenClient(BaseDBClient): Returns: Dictionary with usage statistics """ + from sqlalchemy import func + async with self.async_session() as session: # Get the token result = await session.execute( @@ -302,16 +304,16 @@ class EmbedTokenClient(BaseDBClient): if not token: return {} - # Count active sessions + # Count active sessions using SQL COUNT active_sessions_result = await session.execute( - select(EmbedSessionModel).where( + select(func.count(EmbedSessionModel.id)).where( and_( EmbedSessionModel.embed_token_id == token_id, EmbedSessionModel.expires_at > datetime.now(UTC), ) ) ) - active_sessions = len(active_sessions_result.scalars().all()) + active_sessions = active_sessions_result.scalar() or 0 return { "token_id": token_id, diff --git a/api/db/looptalk_client.py b/api/db/looptalk_client.py index 3a9a9e1..3d09a6c 100644 --- a/api/db/looptalk_client.py +++ b/api/db/looptalk_client.py @@ -224,23 +224,56 @@ class LoopTalkClient(BaseDBClient): self, load_test_group_id: str, organization_id: int ) -> Dict[str, Any]: """Get statistics for a load test group.""" + from sqlalchemy import case, func + async with self.async_session() as session: - # Get all sessions in the group - result = await session.execute( - select(LoopTalkTestSession).where( + # Get status counts using SQL aggregation + counts_result = await session.execute( + select( + func.count().label("total"), + func.sum( + case((LoopTalkTestSession.status == "pending", 1), else_=0) + ).label("pending"), + func.sum( + case((LoopTalkTestSession.status == "running", 1), else_=0) + ).label("running"), + func.sum( + case((LoopTalkTestSession.status == "completed", 1), else_=0) + ).label("completed"), + func.sum( + case((LoopTalkTestSession.status == "failed", 1), else_=0) + ).label("failed"), + ).where( LoopTalkTestSession.load_test_group_id == load_test_group_id, LoopTalkTestSession.organization_id == organization_id, ) ) - sessions = result.scalars().all() + counts = counts_result.one() + + # Get session details (still needed for the sessions list) + sessions_result = await session.execute( + select( + LoopTalkTestSession.id, + LoopTalkTestSession.name, + LoopTalkTestSession.status, + LoopTalkTestSession.test_index, + LoopTalkTestSession.created_at, + LoopTalkTestSession.started_at, + LoopTalkTestSession.completed_at, + LoopTalkTestSession.error, + ).where( + LoopTalkTestSession.load_test_group_id == load_test_group_id, + LoopTalkTestSession.organization_id == organization_id, + ) + ) + sessions = sessions_result.all() - # Calculate stats stats = { - "total": len(sessions), - "pending": sum(1 for s in sessions if s.status == "pending"), - "running": sum(1 for s in sessions if s.status == "running"), - "completed": sum(1 for s in sessions if s.status == "completed"), - "failed": sum(1 for s in sessions if s.status == "failed"), + "total": counts.total or 0, + "pending": counts.pending or 0, + "running": counts.running or 0, + "completed": counts.completed or 0, + "failed": counts.failed or 0, "sessions": [ { "id": s.id, diff --git a/api/db/models.py b/api/db/models.py index 3cafdcd..ff544cb 100644 --- a/api/db/models.py +++ b/api/db/models.py @@ -377,6 +377,8 @@ class WorkflowRunModel(Base): text("(gathered_context->>'call_id')"), postgresql_where=text("gathered_context->>'call_id' IS NOT NULL"), ), + Index("idx_workflow_runs_workflow_id", "workflow_id"), + Index("idx_workflow_runs_campaign_id", "campaign_id"), ) diff --git a/api/routes/telephony.py b/api/routes/telephony.py index d7c03b4..b031ec8 100644 --- a/api/routes/telephony.py +++ b/api/routes/telephony.py @@ -721,7 +721,7 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq state=WorkflowRunState.COMPLETED.value, ) - elif status.status in ["failed", "busy", "no-answer", "canceled"]: + elif status.status in ["failed", "busy", "no-answer", "canceled", "error"]: logger.warning( f"[run {workflow_run_id}] Call failed with status: {status.status}" ) diff --git a/api/services/telephony/providers/cloudonix_provider.py b/api/services/telephony/providers/cloudonix_provider.py index 3678617..ef2f329 100644 --- a/api/services/telephony/providers/cloudonix_provider.py +++ b/api/services/telephony/providers/cloudonix_provider.py @@ -325,6 +325,7 @@ class CloudonixProvider(TelephonyProvider): "busy": "busy", "no-answer": "no-answer", "canceled": "canceled", + "error": "error", } call_status = data.get("status", "") diff --git a/api/tasks/run_integrations.py b/api/tasks/run_integrations.py index 8a3d77c..fb38e5c 100644 --- a/api/tasks/run_integrations.py +++ b/api/tasks/run_integrations.py @@ -49,15 +49,22 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int): nodes = workflow_definition.get("nodes", []) webhook_nodes = [n for n in nodes if n.get("type") == "webhook"] + # Step 4: Generate public access token if webhooks exist or campaign_id is set + has_campaign = workflow_run.campaign_id is not None + if not webhook_nodes and not has_campaign: + logger.debug("No webhook nodes and no campaign, skipping") + return + + public_token = None + if webhook_nodes or has_campaign: + public_token = await db_client.ensure_public_access_token(workflow_run_id) + if not webhook_nodes: logger.debug("No webhook nodes in workflow") return logger.info(f"Found {len(webhook_nodes)} webhook nodes to execute") - # Step 4: Generate public access token (on-demand, only when webhooks exist) - public_token = await db_client.ensure_public_access_token(workflow_run_id) - # Step 5: Build render context render_context = _build_render_context(workflow_run, public_token)