chore: refactor workfow run view

This commit is contained in:
Abhishek Kumar 2026-01-30 17:01:01 +05:30
parent ae0dc812cd
commit 1065ae001f
15 changed files with 794 additions and 387 deletions

View file

@ -1,11 +1,13 @@
from datetime import UTC, datetime
from typing import Optional
from typing import Any, Dict, List, Optional
from sqlalchemy import func
from sqlalchemy.future import select
from api.db.base_client import BaseDBClient
from api.db.filters import apply_workflow_run_filters, get_workflow_run_order_clause
from api.db.models import CampaignModel, QueuedRunModel, WorkflowRunModel
from api.schemas.workflow import WorkflowRunResponseSchema
class CampaignClient(BaseDBClient):
@ -165,6 +167,89 @@ class CampaignClient(BaseDBClient):
result = await session.execute(query)
return list(result.scalars().all())
async def get_campaign_runs_paginated(
self,
campaign_id: int,
organization_id: int,
limit: int = 50,
offset: int = 0,
filters: Optional[List[Dict[str, Any]]] = None,
sort_by: Optional[str] = None,
sort_order: Optional[str] = "desc",
) -> tuple[list[WorkflowRunResponseSchema], int]:
"""Get workflow runs for a campaign with pagination, filters and sorting"""
async with self.async_session() as session:
# First verify campaign belongs to organization
campaign_query = select(CampaignModel).where(
CampaignModel.id == campaign_id,
CampaignModel.organization_id == organization_id,
)
campaign_result = await session.execute(campaign_query)
campaign = campaign_result.scalar_one_or_none()
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
# Build base query
base_query = select(WorkflowRunModel).where(
WorkflowRunModel.campaign_id == campaign_id
)
# Apply filters
base_query = apply_workflow_run_filters(base_query, filters)
# Count total with filters
count_query = base_query.with_only_columns(func.count(WorkflowRunModel.id))
count_result = await session.execute(count_query)
total_count = count_result.scalar()
# Get paginated results with filters and sorting
order_clause = get_workflow_run_order_clause(sort_by, sort_order)
result = await session.execute(
base_query.order_by(order_clause).limit(limit).offset(offset)
)
runs = [
WorkflowRunResponseSchema.model_validate(
{
"id": run.id,
"workflow_id": run.workflow_id,
"name": run.name,
"mode": run.mode,
"created_at": run.created_at,
"is_completed": run.is_completed,
"recording_url": run.recording_url,
"transcript_url": run.transcript_url,
"cost_info": {
"dograh_token_usage": (
run.cost_info.get("dograh_token_usage")
if run.cost_info
and "dograh_token_usage" in run.cost_info
else round(
float(run.cost_info.get("total_cost_usd", 0)) * 100,
2,
)
if run.cost_info and "total_cost_usd" in run.cost_info
else 0
),
"call_duration_seconds": int(
round(run.cost_info.get("call_duration_seconds") or 0)
)
if run.cost_info
else None,
}
if run.cost_info
else None,
"definition_id": run.definition_id,
"initial_context": run.initial_context,
"gathered_context": run.gathered_context,
"call_type": run.call_type,
}
)
for run in result.scalars().all()
]
return runs, total_count
async def get_campaign_by_id(self, campaign_id: int) -> Optional[CampaignModel]:
"""Get campaign by ID without organization check (for internal use)"""
async with self.async_session() as session:

View file

@ -3,16 +3,47 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import Integer, and_, cast, func
from sqlalchemy import Float, Integer, and_, cast, func
from sqlalchemy.dialects.postgresql import JSONB
from api.db.models import WorkflowRunModel
def get_workflow_run_order_clause(
sort_by: Optional[str] = None,
sort_order: str = "desc",
):
"""
Get the order clause for workflow run queries.
Args:
sort_by: Field to sort by ('duration', 'created_at', etc.)
sort_order: 'asc' or 'desc'
Returns:
SQLAlchemy order clause
"""
# Determine sort column
if sort_by == "duration":
sort_column = WorkflowRunModel.cost_info.op("->>")(
"call_duration_seconds"
).cast(Float)
else:
# Default to created_at
sort_column = WorkflowRunModel.created_at
# Apply sort order
if sort_order == "asc":
return sort_column.asc().nullslast()
else:
return sort_column.desc().nullslast()
# Mapping of attribute names to database fields
ATTRIBUTE_FIELD_MAPPING = {
"dateRange": "created_at",
"dispositionCode": "gathered_context.mapped_call_disposition",
"duration": "usage_info.call_duration_seconds",
"duration": "cost_info.call_duration_seconds",
"status": "is_completed",
"tokenUsage": "cost_info.total_cost_usd",
"runId": "id",
@ -153,7 +184,7 @@ def apply_workflow_run_filters(
min_val = value.get("min")
max_val = value.get("max")
if field == "usage_info.call_duration_seconds":
if field == "cost_info.call_duration_seconds":
# Use ->> operator for compatibility with all PostgreSQL versions
# (subscript [] only works in PostgreSQL 14+)
duration_text = cast(WorkflowRunModel.usage_info, JSONB).op("->>")(

View file

@ -2,12 +2,12 @@ import uuid
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from sqlalchemy import Float, func
from sqlalchemy import func
from sqlalchemy.future import select
from sqlalchemy.orm import joinedload, selectinload
from api.db.base_client import BaseDBClient
from api.db.filters import apply_workflow_run_filters
from api.db.filters import apply_workflow_run_filters, get_workflow_run_order_clause
from api.db.models import (
OrganizationModel,
UserModel,
@ -134,21 +134,8 @@ class WorkflowRunClient(BaseDBClient):
count_result = await session.execute(count_query)
total_count = count_result.scalar()
# Determine sort column
if sort_by == "duration":
# Sort by call_duration_seconds from usage_info JSON field
sort_column = WorkflowRunModel.usage_info.op("->>")("call_duration_seconds").cast(Float)
else:
# Default to created_at
sort_column = WorkflowRunModel.created_at
# Apply sort order
if sort_order == "asc":
order_clause = sort_column.asc().nullslast()
else:
order_clause = sort_column.desc().nullslast()
# Get paginated results with filters
# Get paginated results with filters and sorting
order_clause = get_workflow_run_order_clause(sort_by, sort_order)
result = await session.execute(
base_query.options(
joinedload(WorkflowRunModel.workflow).joinedload(
@ -245,6 +232,8 @@ class WorkflowRunClient(BaseDBClient):
limit: int = 50,
offset: int = 0,
filters: Optional[List[Dict[str, Any]]] = None,
sort_by: Optional[str] = None,
sort_order: Optional[str] = "desc",
) -> tuple[list[WorkflowRunResponseSchema], int]:
async with self.async_session() as session:
# Build base query
@ -271,11 +260,10 @@ class WorkflowRunClient(BaseDBClient):
count_result = await session.execute(count_query)
total_count = count_result.scalar()
# Get paginated results with filters
# Get paginated results with filters and sorting
order_clause = get_workflow_run_order_clause(sort_by, sort_order)
result = await session.execute(
base_query.order_by(WorkflowRunModel.created_at.desc())
.limit(limit)
.offset(offset)
base_query.order_by(order_clause).limit(limit).offset(offset)
)
runs = [
WorkflowRunResponseSchema.model_validate(

View file

@ -1,7 +1,8 @@
import json
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from api.constants import DEFAULT_CAMPAIGN_RETRY_CONFIG, DEFAULT_ORG_CONCURRENCY_LIMIT
@ -91,6 +92,16 @@ class WorkflowRunResponse(BaseModel):
completed_at: Optional[datetime]
class CampaignRunsResponse(BaseModel):
"""Paginated response for campaign workflow runs"""
runs: List[dict] # WorkflowRunResponseSchema from schemas
total_count: int
page: int
limit: int
total_pages: int
class CampaignProgressResponse(BaseModel):
campaign_id: int
state: str
@ -296,21 +307,65 @@ async def pause_campaign(
@router.get("/{campaign_id}/runs")
async def get_campaign_runs(
campaign_id: int,
page: int = 1,
limit: int = 50,
filters: Optional[str] = Query(None, description="JSON-encoded filter criteria"),
sort_by: Optional[str] = Query(
None, description="Field to sort by (e.g., 'duration', 'created_at')"
),
sort_order: Optional[str] = Query(
"desc", description="Sort order ('asc' or 'desc')"
),
user: UserModel = Depends(get_user),
) -> List[WorkflowRunResponse]:
"""Get campaign workflow runs"""
runs = await db_client.get_campaign_runs(campaign_id, user.selected_organization_id)
) -> CampaignRunsResponse:
"""Get campaign workflow runs with pagination, filters and sorting"""
offset = (page - 1) * limit
return [
WorkflowRunResponse(
id=run.id,
workflow_id=run.workflow_id,
state="completed" if run.is_completed else "running",
created_at=run.created_at,
completed_at=run.created_at if run.is_completed else None,
# Parse filters if provided
filter_criteria = []
if filters:
try:
filter_criteria = json.loads(filters)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid filter format")
# Restrict allowed filter attributes for regular users
allowed_attributes = {
"dateRange",
"dispositionCode",
"duration",
"status",
"tokenUsage",
}
for filter_item in filter_criteria:
attribute = filter_item.get("attribute")
if attribute and attribute not in allowed_attributes:
raise HTTPException(
status_code=403, detail=f"Invalid attribute '{attribute}'"
)
try:
runs, total_count = await db_client.get_campaign_runs_paginated(
campaign_id,
user.selected_organization_id,
limit=limit,
offset=offset,
filters=filter_criteria if filter_criteria else None,
sort_by=sort_by,
sort_order=sort_order,
)
for run in runs
]
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
total_pages = (total_count + limit - 1) // limit
return CampaignRunsResponse(
runs=[run.model_dump() for run in runs],
total_count=total_count,
page=page,
limit=limit,
total_pages=total_pages,
)
@router.post("/{campaign_id}/resume")

View file

@ -105,8 +105,12 @@ async def get_workflow_runs(
page: int = Query(1, ge=1, description="Page number (starts from 1)"),
limit: int = Query(50, ge=1, le=100, description="Number of items per page"),
filters: Optional[str] = Query(None, description="JSON-encoded filter criteria"),
sort_by: Optional[str] = Query(None, description="Field to sort by (e.g., 'duration', 'created_at')"),
sort_order: Optional[str] = Query("desc", description="Sort order ('asc' or 'desc')"),
sort_by: Optional[str] = Query(
None, description="Field to sort by (e.g., 'duration', 'created_at')"
),
sort_order: Optional[str] = Query(
"desc", description="Sort order ('asc' or 'desc')"
),
user: UserModel = Depends(get_superuser),
) -> SuperuserWorkflowRunsListResponse:
"""
@ -131,7 +135,11 @@ async def get_workflow_runs(
sort_order = "desc"
workflow_runs, total_count = await db_client.get_workflow_runs_for_superadmin(
limit=limit, offset=offset, filters=filter_criteria, sort_by=sort_by, sort_order=sort_order
limit=limit,
offset=offset,
filters=filter_criteria,
sort_by=sort_by,
sort_order=sort_order,
)
total_pages = (total_count + limit - 1) // limit # Ceiling division

View file

@ -666,10 +666,16 @@ async def get_workflow_runs(
page: int = 1,
limit: int = 50,
filters: Optional[str] = Query(None, description="JSON-encoded filter criteria"),
sort_by: Optional[str] = Query(
None, description="Field to sort by (e.g., 'duration', 'created_at')"
),
sort_order: Optional[str] = Query(
"desc", description="Sort order ('asc' or 'desc')"
),
user: UserModel = Depends(get_user),
) -> WorkflowRunsResponse:
"""
Get workflow runs with optional filtering.
Get workflow runs with optional filtering and sorting.
Filters should be provided as a JSON-encoded array of filter criteria.
Example: [{"attribute": "dateRange", "value": {"from": "2024-01-01", "to": "2024-01-31"}}]
@ -699,23 +705,15 @@ async def get_workflow_runs(
status_code=403, detail=f"Invalid attribute '{attribute}'"
)
# Apply filters if any
if filter_criteria:
runs, total_count = await db_client.get_workflow_runs_by_workflow_id(
workflow_id,
organization_id=user.selected_organization_id,
limit=limit,
offset=offset,
filters=filter_criteria,
)
else:
# Use existing logic for unfiltered results
runs, total_count = await db_client.get_workflow_runs_by_workflow_id(
workflow_id,
organization_id=user.selected_organization_id,
limit=limit,
offset=offset,
)
runs, total_count = await db_client.get_workflow_runs_by_workflow_id(
workflow_id,
organization_id=user.selected_organization_id,
limit=limit,
offset=offset,
filters=filter_criteria if filter_criteria else None,
sort_by=sort_by,
sort_order=sort_order,
)
total_pages = (total_count + limit - 1) // limit