dograh/api/routes/campaign.py

665 lines
22 KiB
Python
Raw Permalink Normal View History

import json
2025-09-09 14:37:32 +05:30
from datetime import datetime
from typing import List, Optional
from zoneinfo import ZoneInfo
2025-09-09 14:37:32 +05:30
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field, field_validator, model_validator
2025-09-09 14:37:32 +05:30
from api.constants import (
DEFAULT_CAMPAIGN_RETRY_CONFIG,
DEFAULT_ORG_CONCURRENCY_LIMIT,
)
2025-09-09 14:37:32 +05:30
from api.db import db_client
from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey
from api.services.auth.depends import get_user
from api.services.campaign.runner import campaign_runner_service
from api.services.campaign.source_sync_factory import get_sync_service
from api.services.quota_service import check_dograh_quota
from api.services.storage import storage_fs
2025-09-09 14:37:32 +05:30
router = APIRouter(prefix="/campaign")
async def _get_org_concurrent_limit(organization_id: int) -> int:
"""Get the concurrent call limit for an organization."""
try:
config = await db_client.get_configuration(
organization_id,
OrganizationConfigurationKey.CONCURRENT_CALL_LIMIT.value,
)
if config and config.value:
return int(config.value.get("value", DEFAULT_ORG_CONCURRENCY_LIMIT))
except Exception:
pass
return DEFAULT_ORG_CONCURRENCY_LIMIT
async def _get_from_numbers_count(organization_id: int) -> int:
"""Get the number of configured from_numbers for an organization."""
try:
config = await db_client.get_configuration(
organization_id,
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
if config and config.value:
return len(config.value.get("from_numbers", []))
except Exception:
pass
return 0
async def _validate_max_concurrency(max_concurrency: int, organization_id: int) -> None:
"""Validate max_concurrency against org limit and configured phone numbers.
Raises HTTPException(400) if the value exceeds the effective limit.
"""
org_limit = await _get_org_concurrent_limit(organization_id)
from_numbers_count = await _get_from_numbers_count(organization_id)
effective_limit = (
min(org_limit, from_numbers_count) if from_numbers_count > 0 else org_limit
)
if max_concurrency > effective_limit:
if from_numbers_count > 0 and from_numbers_count < org_limit:
raise HTTPException(
status_code=400,
detail=f"max_concurrency ({max_concurrency}) cannot exceed {effective_limit}. You have {from_numbers_count} phone number(s) configured. Add more CLIs in telephony configuration to increase concurrency.",
)
raise HTTPException(
status_code=400,
detail=f"max_concurrency ({max_concurrency}) cannot exceed organization limit ({effective_limit})",
)
class RetryConfigRequest(BaseModel):
enabled: bool = True
max_retries: int = Field(default=2, ge=0, le=10)
retry_delay_seconds: int = Field(default=120, ge=30, le=3600)
retry_on_busy: bool = True
retry_on_no_answer: bool = True
retry_on_voicemail: bool = True
class RetryConfigResponse(BaseModel):
enabled: bool
max_retries: int
retry_delay_seconds: int
retry_on_busy: bool
retry_on_no_answer: bool
retry_on_voicemail: bool
class TimeSlotRequest(BaseModel):
day_of_week: int = Field(..., ge=0, le=6)
start_time: str = Field(..., pattern=r"^\d{2}:\d{2}$")
end_time: str = Field(..., pattern=r"^\d{2}:\d{2}$")
@model_validator(mode="after")
def validate_times(self):
if self.start_time >= self.end_time:
raise ValueError("start_time must be before end_time")
return self
class ScheduleConfigRequest(BaseModel):
enabled: bool = True
timezone: str = "UTC"
slots: List[TimeSlotRequest] = Field(..., min_length=1, max_length=50)
@field_validator("timezone")
@classmethod
def validate_timezone(cls, v: str) -> str:
try:
ZoneInfo(v)
except (KeyError, Exception):
raise ValueError(f"Invalid timezone: {v}")
return v
class TimeSlotResponse(BaseModel):
day_of_week: int
start_time: str
end_time: str
class ScheduleConfigResponse(BaseModel):
enabled: bool
timezone: str
slots: List[TimeSlotResponse]
class CircuitBreakerConfigRequest(BaseModel):
enabled: bool = True
failure_threshold: float = Field(default=0.5, ge=0.0, le=1.0)
window_seconds: int = Field(default=120, ge=30, le=600)
min_calls_in_window: int = Field(default=5, ge=1, le=100)
class CircuitBreakerConfigResponse(BaseModel):
enabled: bool = False
failure_threshold: float = 0.5
window_seconds: int = 120
min_calls_in_window: int = 5
2025-09-09 14:37:32 +05:30
class CreateCampaignRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
workflow_id: int
source_type: str = Field(..., pattern="^(google-sheet|csv)$")
source_id: str # Google Sheet URL or CSV file key
retry_config: Optional[RetryConfigRequest] = None
max_concurrency: Optional[int] = Field(default=None, ge=1, le=100)
schedule_config: Optional[ScheduleConfigRequest] = None
circuit_breaker: Optional[CircuitBreakerConfigRequest] = None
class UpdateCampaignRequest(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=255)
retry_config: Optional[RetryConfigRequest] = None
max_concurrency: Optional[int] = Field(default=None, ge=1, le=100)
schedule_config: Optional[ScheduleConfigRequest] = None
circuit_breaker: Optional[CircuitBreakerConfigRequest] = None
2025-09-09 14:37:32 +05:30
class CampaignResponse(BaseModel):
id: int
name: str
workflow_id: int
workflow_name: str
state: str
source_type: str
source_id: str
total_rows: Optional[int]
processed_rows: int
failed_rows: int
created_at: datetime
started_at: Optional[datetime]
completed_at: Optional[datetime]
retry_config: RetryConfigResponse
max_concurrency: Optional[int] = None
schedule_config: Optional[ScheduleConfigResponse] = None
circuit_breaker: Optional[CircuitBreakerConfigResponse] = None
2025-09-09 14:37:32 +05:30
class CampaignsResponse(BaseModel):
campaigns: List[CampaignResponse]
class WorkflowRunResponse(BaseModel):
id: int
workflow_id: int
state: str
created_at: datetime
completed_at: Optional[datetime]
class CampaignRunsResponse(BaseModel):
"""Paginated response for campaign workflow runs"""
runs: List[dict] # WorkflowRunResponseSchema from schemas
total_count: int
page: int
limit: int
total_pages: int
2025-09-09 14:37:32 +05:30
class CampaignProgressResponse(BaseModel):
campaign_id: int
state: str
total_rows: int
processed_rows: int
failed_calls: int
progress_percentage: float
source_sync: dict
rate_limit: int
started_at: Optional[datetime]
completed_at: Optional[datetime]
# Default retry config for campaigns
def _build_campaign_response(campaign, workflow_name: str) -> CampaignResponse:
"""Build a CampaignResponse from a campaign model."""
# Get retry_config from campaign or use defaults
retry_config = (
campaign.retry_config
if campaign.retry_config
else DEFAULT_CAMPAIGN_RETRY_CONFIG
)
# Get max_concurrency, schedule_config, circuit_breaker from orchestrator_metadata
max_concurrency = None
schedule_config = None
circuit_breaker_config = CircuitBreakerConfigResponse()
if campaign.orchestrator_metadata:
max_concurrency = campaign.orchestrator_metadata.get("max_concurrency")
sc = campaign.orchestrator_metadata.get("schedule_config")
if sc:
schedule_config = ScheduleConfigResponse(
enabled=sc.get("enabled", False),
timezone=sc.get("timezone", "UTC"),
slots=[TimeSlotResponse(**slot) for slot in sc.get("slots", [])],
)
cb = campaign.orchestrator_metadata.get("circuit_breaker")
if cb:
circuit_breaker_config = CircuitBreakerConfigResponse(**cb)
return CampaignResponse(
id=campaign.id,
name=campaign.name,
workflow_id=campaign.workflow_id,
workflow_name=workflow_name,
state=campaign.state,
source_type=campaign.source_type,
source_id=campaign.source_id,
total_rows=campaign.total_rows,
processed_rows=campaign.processed_rows,
failed_rows=campaign.failed_rows,
created_at=campaign.created_at,
started_at=campaign.started_at,
completed_at=campaign.completed_at,
retry_config=RetryConfigResponse(**retry_config),
max_concurrency=max_concurrency,
schedule_config=schedule_config,
circuit_breaker=circuit_breaker_config,
)
2025-09-09 14:37:32 +05:30
@router.post("/create")
async def create_campaign(
request: CreateCampaignRequest,
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Create a new campaign"""
# Verify workflow exists and belongs to organization
workflow_name = await db_client.get_workflow_name(request.workflow_id, user.id)
if not workflow_name:
raise HTTPException(status_code=404, detail="Workflow not found")
# Validate source data (phone_number column and format)
sync_service = get_sync_service(request.source_type)
validation_result = await sync_service.validate_source(
request.source_id, user.selected_organization_id
)
if not validation_result.is_valid:
raise HTTPException(status_code=400, detail=validation_result.error.message)
if request.max_concurrency is not None:
await _validate_max_concurrency(
request.max_concurrency, user.selected_organization_id
)
# Build retry_config dict if provided
retry_config = None
if request.retry_config:
retry_config = request.retry_config.model_dump()
# Build schedule_config dict if provided
schedule_config = None
if request.schedule_config:
schedule_config = request.schedule_config.model_dump()
# Build circuit_breaker dict if provided
circuit_breaker_config = None
if request.circuit_breaker:
circuit_breaker_config = request.circuit_breaker.model_dump()
2025-09-09 14:37:32 +05:30
campaign = await db_client.create_campaign(
name=request.name,
workflow_id=request.workflow_id,
source_type=request.source_type,
2025-09-09 14:37:32 +05:30
source_id=request.source_id,
user_id=user.id,
organization_id=user.selected_organization_id,
retry_config=retry_config,
max_concurrency=request.max_concurrency,
schedule_config=schedule_config,
circuit_breaker=circuit_breaker_config,
2025-09-09 14:37:32 +05:30
)
return _build_campaign_response(campaign, workflow_name)
2025-09-09 14:37:32 +05:30
@router.get("/")
async def get_campaigns(
user: UserModel = Depends(get_user),
) -> CampaignsResponse:
"""Get campaigns for user's organization"""
campaigns = await db_client.get_campaigns(user.selected_organization_id)
# Get workflow names for all campaigns
workflow_ids = list(set(c.workflow_id for c in campaigns))
workflows = await db_client.get_workflows_by_ids(
workflow_ids, user.selected_organization_id
)
workflow_map = {w.id: w.name for w in workflows}
campaign_responses = [
_build_campaign_response(c, workflow_map.get(c.workflow_id, "Unknown"))
2025-09-09 14:37:32 +05:30
for c in campaigns
]
return CampaignsResponse(campaigns=campaign_responses)
@router.get("/{campaign_id}")
async def get_campaign(
campaign_id: int,
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Get campaign details"""
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id)
return _build_campaign_response(campaign, workflow_name or "Unknown")
2025-09-09 14:37:32 +05:30
@router.post("/{campaign_id}/start")
async def start_campaign(
campaign_id: int,
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Start campaign execution"""
# Check if organization has TELEPHONY_CONFIGURATION configured
2025-09-09 14:37:32 +05:30
twilio_config = await db_client.get_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
2025-09-09 14:37:32 +05:30
)
if not twilio_config or not twilio_config.value:
2025-09-09 14:37:32 +05:30
raise HTTPException(
status_code=401,
detail="You must configure telephony first by going to APP_URL/configure-telephony",
2025-09-09 14:37:32 +05:30
)
# Check Dograh quota before starting campaign
quota_result = await check_dograh_quota(user)
if not quota_result.has_quota:
raise HTTPException(status_code=402, detail=quota_result.error_message)
2025-09-09 14:37:32 +05:30
# Verify campaign exists and belongs to organization
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
# Start the campaign using the runner service
try:
await campaign_runner_service.start_campaign(campaign_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Get updated campaign
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id)
return _build_campaign_response(campaign, workflow_name or "Unknown")
2025-09-09 14:37:32 +05:30
@router.post("/{campaign_id}/pause")
async def pause_campaign(
campaign_id: int,
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Pause campaign execution"""
# Verify campaign exists and belongs to organization
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
# Pause the campaign using the runner service
try:
await campaign_runner_service.pause_campaign(campaign_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Get updated campaign
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id)
return _build_campaign_response(campaign, workflow_name or "Unknown")
2025-09-09 14:37:32 +05:30
@router.patch("/{campaign_id}")
async def update_campaign(
campaign_id: int,
request: UpdateCampaignRequest,
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Update campaign settings (name, retry config, max concurrency, schedule)"""
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
if campaign.state in ["completed", "failed"]:
raise HTTPException(
status_code=400,
detail=f"Cannot update a {campaign.state} campaign",
)
if request.max_concurrency is not None:
await _validate_max_concurrency(
request.max_concurrency, user.selected_organization_id
)
# Build update kwargs
update_kwargs = {}
if request.name is not None:
update_kwargs["name"] = request.name
if request.retry_config is not None:
update_kwargs["retry_config"] = request.retry_config.model_dump()
# Merge max_concurrency and schedule_config into orchestrator_metadata
metadata = campaign.orchestrator_metadata or {}
metadata_changed = False
if request.max_concurrency is not None:
metadata["max_concurrency"] = request.max_concurrency
metadata_changed = True
if request.schedule_config is not None:
metadata["schedule_config"] = request.schedule_config.model_dump()
metadata_changed = True
if request.circuit_breaker is not None:
metadata["circuit_breaker"] = request.circuit_breaker.model_dump()
metadata_changed = True
if metadata_changed:
update_kwargs["orchestrator_metadata"] = metadata
if update_kwargs:
await db_client.update_campaign(campaign_id=campaign_id, **update_kwargs)
# Re-fetch to return updated data
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id)
return _build_campaign_response(campaign, workflow_name or "Unknown")
2025-09-09 14:37:32 +05:30
@router.get("/{campaign_id}/runs")
async def get_campaign_runs(
campaign_id: int,
page: int = 1,
limit: int = 50,
filters: Optional[str] = Query(None, description="JSON-encoded filter criteria"),
sort_by: Optional[str] = Query(
None, description="Field to sort by (e.g., 'duration', 'created_at')"
),
sort_order: Optional[str] = Query(
"desc", description="Sort order ('asc' or 'desc')"
),
2025-09-09 14:37:32 +05:30
user: UserModel = Depends(get_user),
) -> CampaignRunsResponse:
"""Get campaign workflow runs with pagination, filters and sorting"""
offset = (page - 1) * limit
# Parse filters if provided
filter_criteria = []
if filters:
try:
filter_criteria = json.loads(filters)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid filter format")
# Restrict allowed filter attributes for regular users
allowed_attributes = {
"dateRange",
"dispositionCode",
"duration",
"status",
"tokenUsage",
}
for filter_item in filter_criteria:
attribute = filter_item.get("attribute")
if attribute and attribute not in allowed_attributes:
raise HTTPException(
status_code=403, detail=f"Invalid attribute '{attribute}'"
)
try:
runs, total_count = await db_client.get_campaign_runs_paginated(
campaign_id,
user.selected_organization_id,
limit=limit,
offset=offset,
filters=filter_criteria if filter_criteria else None,
sort_by=sort_by,
sort_order=sort_order,
2025-09-09 14:37:32 +05:30
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
total_pages = (total_count + limit - 1) // limit
return CampaignRunsResponse(
runs=[run.model_dump() for run in runs],
total_count=total_count,
page=page,
limit=limit,
total_pages=total_pages,
)
2025-09-09 14:37:32 +05:30
@router.post("/{campaign_id}/resume")
async def resume_campaign(
campaign_id: int,
user: UserModel = Depends(get_user),
) -> CampaignResponse:
"""Resume a paused campaign"""
# Check if organization has TELEPHONY_CONFIGURATION configured
2025-09-09 14:37:32 +05:30
twilio_config = await db_client.get_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
2025-09-09 14:37:32 +05:30
)
if not twilio_config or not twilio_config.value:
2025-09-09 14:37:32 +05:30
raise HTTPException(
status_code=401,
detail="You must configure telephony first by going to APP_URL/configure-telephony",
2025-09-09 14:37:32 +05:30
)
# Check Dograh quota before resuming campaign
quota_result = await check_dograh_quota(user)
if not quota_result.has_quota:
raise HTTPException(status_code=402, detail=quota_result.error_message)
2025-09-09 14:37:32 +05:30
# Verify campaign exists and belongs to organization
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
# Resume the campaign using the runner service
try:
await campaign_runner_service.resume_campaign(campaign_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Get updated campaign
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id)
return _build_campaign_response(campaign, workflow_name or "Unknown")
2025-09-09 14:37:32 +05:30
@router.get("/{campaign_id}/progress")
async def get_campaign_progress(
campaign_id: int,
user: UserModel = Depends(get_user),
) -> CampaignProgressResponse:
"""Get current campaign progress and statistics"""
# Verify campaign exists and belongs to organization
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
# Get progress from runner service
try:
progress = await campaign_runner_service.get_campaign_status(campaign_id)
return CampaignProgressResponse(**progress)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
class CampaignSourceDownloadResponse(BaseModel):
download_url: str
expires_in: int
@router.get("/{campaign_id}/source-download-url")
async def get_campaign_source_download_url(
campaign_id: int,
user: UserModel = Depends(get_user),
) -> CampaignSourceDownloadResponse:
"""Get presigned download URL for campaign CSV source file
Only works for CSV source type. For Google Sheets, use the source_id directly.
Validates that the campaign belongs to the user's organization for security.
"""
# Verify campaign exists and belongs to organization
campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
# Only generate download URL for CSV files
if campaign.source_type != "csv":
raise HTTPException(
status_code=400,
detail=f"Download URL only available for CSV sources. This campaign uses {campaign.source_type}",
)
# Verify the file key belongs to the user's organization
# File key format: campaigns/{org_id}/{uuid}_{filename}.csv
if not campaign.source_id.startswith(f"campaigns/{user.selected_organization_id}/"):
raise HTTPException(
status_code=403,
detail="Access denied: Source file does not belong to your organization",
)
# Generate presigned download URL
try:
download_url = await storage_fs.aget_signed_url(
campaign.source_id,
expiration=3600, # 1 hour
)
if not download_url:
raise HTTPException(
status_code=500, detail="Failed to generate download URL"
)
return CampaignSourceDownloadResponse(
download_url=download_url, expires_in=3600
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to generate download URL: {str(e)}"
)