fix: add error in cloudonix cdr report

This commit is contained in:
Abhishek Kumar 2026-01-29 20:43:53 +05:30
parent b1c982a52e
commit e9c5da16c5
7 changed files with 94 additions and 17 deletions

View file

@ -0,0 +1,32 @@
"""add index in workflow run
Revision ID: 02ffd7f23d1d
Revises: d1dac4c93e61
Create Date: 2026-01-29 20:36:57.924887
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '02ffd7f23d1d'
down_revision: Union[str, None] = 'd1dac4c93e61'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_index('idx_workflow_runs_campaign_id', 'workflow_runs', ['campaign_id'], unique=False)
op.create_index('idx_workflow_runs_workflow_id', 'workflow_runs', ['workflow_id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('idx_workflow_runs_workflow_id', table_name='workflow_runs')
op.drop_index('idx_workflow_runs_campaign_id', table_name='workflow_runs')
# ### end Alembic commands ###

View file

@ -287,6 +287,8 @@ class EmbedTokenClient(BaseDBClient):
Returns:
Dictionary with usage statistics
"""
from sqlalchemy import func
async with self.async_session() as session:
# Get the token
result = await session.execute(
@ -302,16 +304,16 @@ class EmbedTokenClient(BaseDBClient):
if not token:
return {}
# Count active sessions
# Count active sessions using SQL COUNT
active_sessions_result = await session.execute(
select(EmbedSessionModel).where(
select(func.count(EmbedSessionModel.id)).where(
and_(
EmbedSessionModel.embed_token_id == token_id,
EmbedSessionModel.expires_at > datetime.now(UTC),
)
)
)
active_sessions = len(active_sessions_result.scalars().all())
active_sessions = active_sessions_result.scalar() or 0
return {
"token_id": token_id,

View file

@ -224,23 +224,56 @@ class LoopTalkClient(BaseDBClient):
self, load_test_group_id: str, organization_id: int
) -> Dict[str, Any]:
"""Get statistics for a load test group."""
from sqlalchemy import case, func
async with self.async_session() as session:
# Get all sessions in the group
result = await session.execute(
select(LoopTalkTestSession).where(
# Get status counts using SQL aggregation
counts_result = await session.execute(
select(
func.count().label("total"),
func.sum(
case((LoopTalkTestSession.status == "pending", 1), else_=0)
).label("pending"),
func.sum(
case((LoopTalkTestSession.status == "running", 1), else_=0)
).label("running"),
func.sum(
case((LoopTalkTestSession.status == "completed", 1), else_=0)
).label("completed"),
func.sum(
case((LoopTalkTestSession.status == "failed", 1), else_=0)
).label("failed"),
).where(
LoopTalkTestSession.load_test_group_id == load_test_group_id,
LoopTalkTestSession.organization_id == organization_id,
)
)
sessions = result.scalars().all()
counts = counts_result.one()
# Get session details (still needed for the sessions list)
sessions_result = await session.execute(
select(
LoopTalkTestSession.id,
LoopTalkTestSession.name,
LoopTalkTestSession.status,
LoopTalkTestSession.test_index,
LoopTalkTestSession.created_at,
LoopTalkTestSession.started_at,
LoopTalkTestSession.completed_at,
LoopTalkTestSession.error,
).where(
LoopTalkTestSession.load_test_group_id == load_test_group_id,
LoopTalkTestSession.organization_id == organization_id,
)
)
sessions = sessions_result.all()
# Calculate stats
stats = {
"total": len(sessions),
"pending": sum(1 for s in sessions if s.status == "pending"),
"running": sum(1 for s in sessions if s.status == "running"),
"completed": sum(1 for s in sessions if s.status == "completed"),
"failed": sum(1 for s in sessions if s.status == "failed"),
"total": counts.total or 0,
"pending": counts.pending or 0,
"running": counts.running or 0,
"completed": counts.completed or 0,
"failed": counts.failed or 0,
"sessions": [
{
"id": s.id,

View file

@ -377,6 +377,8 @@ class WorkflowRunModel(Base):
text("(gathered_context->>'call_id')"),
postgresql_where=text("gathered_context->>'call_id' IS NOT NULL"),
),
Index("idx_workflow_runs_workflow_id", "workflow_id"),
Index("idx_workflow_runs_campaign_id", "campaign_id"),
)

View file

@ -721,7 +721,7 @@ async def _process_status_update(workflow_run_id: int, status: StatusCallbackReq
state=WorkflowRunState.COMPLETED.value,
)
elif status.status in ["failed", "busy", "no-answer", "canceled"]:
elif status.status in ["failed", "busy", "no-answer", "canceled", "error"]:
logger.warning(
f"[run {workflow_run_id}] Call failed with status: {status.status}"
)

View file

@ -325,6 +325,7 @@ class CloudonixProvider(TelephonyProvider):
"busy": "busy",
"no-answer": "no-answer",
"canceled": "canceled",
"error": "error",
}
call_status = data.get("status", "")

View file

@ -49,15 +49,22 @@ async def run_integrations_post_workflow_run(_ctx, workflow_run_id: int):
nodes = workflow_definition.get("nodes", [])
webhook_nodes = [n for n in nodes if n.get("type") == "webhook"]
# Step 4: Generate public access token if webhooks exist or campaign_id is set
has_campaign = workflow_run.campaign_id is not None
if not webhook_nodes and not has_campaign:
logger.debug("No webhook nodes and no campaign, skipping")
return
public_token = None
if webhook_nodes or has_campaign:
public_token = await db_client.ensure_public_access_token(workflow_run_id)
if not webhook_nodes:
logger.debug("No webhook nodes in workflow")
return
logger.info(f"Found {len(webhook_nodes)} webhook nodes to execute")
# Step 4: Generate public access token (on-demand, only when webhooks exist)
public_token = await db_client.ensure_public_access_token(workflow_run_id)
# Step 5: Build render context
render_context = _build_render_context(workflow_run, public_token)