feat: add redial option in campaigns

This commit is contained in:
Abhishek Kumar 2026-04-13 23:25:43 +05:30
parent 79116e6af2
commit 7fab959e26
14 changed files with 998 additions and 58 deletions

View file

@ -1,7 +1,7 @@
from datetime import UTC, datetime
from typing import Any, Dict, List, Optional
from sqlalchemy import func
from sqlalchemy import func, text
from sqlalchemy.future import select
from api.db.base_client import BaseDBClient
@ -271,6 +271,153 @@ class CampaignClient(BaseDBClient):
]
return runs, total_count
async def create_redial_campaign(
self,
parent_campaign: CampaignModel,
new_name: str,
retry_config: Optional[dict],
queued_runs_data: list[dict],
) -> CampaignModel:
"""Atomically create a redial child campaign, seed its queued_runs, and
link the parent.
- The child inherits `workflow_id`, `source_type`, `source_id`,
`created_by`, `organization_id`, and orchestrator settings
(`max_concurrency`, `schedule_config`, `circuit_breaker`) from the
parent. `parent_campaign_id` is stored in the child's
orchestrator_metadata.
- `queued_runs_data` should be pre-built dicts with campaign_id set to 0
(will be replaced once the child id is known).
- Parent's orchestrator_metadata gets `redialed_campaign_id` set.
- All inserts/updates happen in a single transaction.
"""
async with self.async_session() as session:
parent_meta = dict(parent_campaign.orchestrator_metadata or {})
if parent_meta.get("redialed_campaign_id"):
raise ValueError(
f"Campaign {parent_campaign.id} has already been redialed"
)
child_meta = {
k: v
for k, v in parent_meta.items()
if k in ("max_concurrency", "schedule_config", "circuit_breaker")
}
child_meta["parent_campaign_id"] = parent_campaign.id
child = CampaignModel(
name=new_name,
workflow_id=parent_campaign.workflow_id,
source_type=parent_campaign.source_type,
source_id=parent_campaign.source_id,
created_by=parent_campaign.created_by,
organization_id=parent_campaign.organization_id,
retry_config=retry_config
if retry_config
else CampaignModel.retry_config.default.arg,
orchestrator_metadata=child_meta,
rate_limit_per_second=parent_campaign.rate_limit_per_second,
total_rows=len(queued_runs_data),
source_sync_status="completed",
)
session.add(child)
await session.flush() # assign child.id
for data in queued_runs_data:
data["campaign_id"] = child.id
session.add_all([QueuedRunModel(**data) for data in queued_runs_data])
parent_meta["redialed_campaign_id"] = child.id
parent_stmt = select(CampaignModel).where(
CampaignModel.id == parent_campaign.id
)
parent_result = await session.execute(parent_stmt)
parent_row = parent_result.scalar_one()
parent_row.orchestrator_metadata = parent_meta
parent_row.updated_at = datetime.now(UTC)
try:
await session.commit()
except Exception as e:
await session.rollback()
raise e
await session.refresh(child)
return child
async def get_redial_candidates(
self,
campaign_id: int,
include_voicemail: bool,
include_no_answer: bool,
include_busy: bool,
) -> list[dict]:
"""Return root context_variables for subscribers whose LATEST
workflow_run indicates the call should be redialed.
A subscriber (identified by `source_uuid`) is a redial candidate iff
the latest workflow_run (by created_at) for that source_uuid has a
`call_tags` entry matching any of the selected failure reasons. Uses
the root queued_run (retry_count=0) for the original context.
"""
tag_clauses = []
if include_voicemail:
tag_clauses.append(
"(lr.gathered_context::jsonb -> 'call_tags') @> '[\"voicemail_detected\"]'::jsonb"
)
if include_no_answer:
tag_clauses.append(
"(lr.gathered_context::jsonb -> 'call_tags') @> '[\"telephony_no-answer\"]'::jsonb"
)
if include_busy:
tag_clauses.append(
"(lr.gathered_context::jsonb -> 'call_tags') @> '[\"telephony_busy\"]'::jsonb"
)
if not tag_clauses:
return []
tag_filter = " OR ".join(tag_clauses)
# Retries create new queued_runs with suffixed source_uuids linked via
# parent_queued_run_id, so group by the ROOT queued_run using a
# recursive walk and pick the latest workflow_run across the tree.
sql = text(
f"""
WITH RECURSIVE run_tree AS (
SELECT id AS root_id, id AS run_id
FROM queued_runs
WHERE campaign_id = :cid
AND parent_queued_run_id IS NULL
UNION ALL
SELECT rt.root_id, q.id
FROM run_tree rt
JOIN queued_runs q ON q.parent_queued_run_id = rt.run_id
WHERE q.campaign_id = :cid
),
latest_run_per_root AS (
SELECT DISTINCT ON (rt.root_id)
rt.root_id,
wr.gathered_context
FROM run_tree rt
JOIN workflow_runs wr
ON wr.queued_run_id = rt.run_id
AND wr.campaign_id = :cid
ORDER BY rt.root_id, wr.created_at DESC
)
SELECT q0.source_uuid, q0.context_variables
FROM queued_runs q0
JOIN latest_run_per_root lr ON lr.root_id = q0.id
WHERE q0.campaign_id = :cid
AND ({tag_filter})
"""
)
async with self.async_session() as session:
result = await session.execute(sql, {"cid": campaign_id})
return [
{"source_uuid": row[0], "context_variables": row[1]}
for row in result.all()
]
async def get_campaign_by_id(self, campaign_id: int) -> Optional[CampaignModel]:
"""Get campaign by ID without organization check (for internal use)"""
async with self.async_session() as session:
@ -352,6 +499,35 @@ class CampaignClient(BaseDBClient):
result = await session.execute(query)
return result.scalar() or 0
async def get_queued_runs_stats_for_campaigns(
self, campaign_ids: List[int]
) -> Dict[int, Dict[str, int]]:
"""Return {campaign_id: {"total": N, "executed": M}} for given campaigns.
"executed" means queued runs in the "processed" state.
"""
if not campaign_ids:
return {}
async with self.async_session() as session:
query = (
select(
QueuedRunModel.campaign_id,
QueuedRunModel.state,
func.count(QueuedRunModel.id),
)
.where(QueuedRunModel.campaign_id.in_(campaign_ids))
.group_by(QueuedRunModel.campaign_id, QueuedRunModel.state)
)
result = await session.execute(query)
stats: Dict[int, Dict[str, int]] = {
cid: {"total": 0, "executed": 0} for cid in campaign_ids
}
for campaign_id, state, count in result.all():
stats[campaign_id]["total"] += count
if state == "processed":
stats[campaign_id]["executed"] += count
return stats
async def get_workflow_runs_by_campaign(
self, campaign_id: int
) -> list[WorkflowRunModel]:
@ -367,22 +543,31 @@ class CampaignClient(BaseDBClient):
async def get_completed_runs_for_report(
self,
campaign_id: int,
*,
campaign_id: Optional[int] = None,
workflow_id: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> list:
"""Get completed workflow runs for campaign report CSV.
"""Get completed workflow runs for a run report CSV.
Scope the query by exactly one of campaign_id or workflow_id.
Returns rows with only the columns needed for report generation.
"""
if (campaign_id is None) == (workflow_id is None):
raise ValueError("Provide exactly one of campaign_id or workflow_id")
async with self.async_session() as session:
conditions = [
WorkflowRunModel.campaign_id == campaign_id,
WorkflowRunModel.is_completed.is_(True),
WorkflowRunModel.cost_info["call_duration_seconds"]
.as_string()
.isnot(None),
]
if campaign_id is not None:
conditions.append(WorkflowRunModel.campaign_id == campaign_id)
if workflow_id is not None:
conditions.append(WorkflowRunModel.workflow_id == workflow_id)
if start_date is not None:
conditions.append(WorkflowRunModel.created_at >= start_date)
if end_date is not None:
@ -391,11 +576,13 @@ class CampaignClient(BaseDBClient):
query = (
select(
WorkflowRunModel.id,
WorkflowRunModel.workflow_id,
WorkflowRunModel.definition_id,
WorkflowRunModel.campaign_id,
WorkflowRunModel.created_at,
WorkflowRunModel.initial_context,
WorkflowRunModel.gathered_context,
WorkflowRunModel.cost_info,
WorkflowRunModel.logs,
WorkflowRunModel.public_access_token,
)
.where(*conditions)