import json from datetime import datetime from typing import List, Optional from zoneinfo import ZoneInfo from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field, field_validator, model_validator from api.constants import ( DEFAULT_CAMPAIGN_RETRY_CONFIG, DEFAULT_ORG_CONCURRENCY_LIMIT, ) from api.db import db_client from api.db.models import UserModel from api.enums import OrganizationConfigurationKey from api.services.auth.depends import get_user from api.services.campaign.runner import campaign_runner_service from api.services.campaign.source_sync_factory import get_sync_service from api.services.quota_service import check_dograh_quota from api.services.storage import storage_fs router = APIRouter(prefix="/campaign") async def _get_org_concurrent_limit(organization_id: int) -> int: """Get the concurrent call limit for an organization.""" try: config = await db_client.get_configuration( organization_id, OrganizationConfigurationKey.CONCURRENT_CALL_LIMIT.value, ) if config and config.value: return int(config.value.get("value", DEFAULT_ORG_CONCURRENCY_LIMIT)) except Exception: pass return DEFAULT_ORG_CONCURRENCY_LIMIT async def _get_from_numbers_count(organization_id: int) -> int: """Get the number of configured from_numbers for an organization.""" try: config = await db_client.get_configuration( organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value, ) if config and config.value: return len(config.value.get("from_numbers", [])) except Exception: pass return 0 async def _validate_max_concurrency(max_concurrency: int, organization_id: int) -> None: """Validate max_concurrency against org limit and configured phone numbers. Raises HTTPException(400) if the value exceeds the effective limit. """ org_limit = await _get_org_concurrent_limit(organization_id) from_numbers_count = await _get_from_numbers_count(organization_id) effective_limit = ( min(org_limit, from_numbers_count) if from_numbers_count > 0 else org_limit ) if max_concurrency > effective_limit: if from_numbers_count > 0 and from_numbers_count < org_limit: raise HTTPException( status_code=400, detail=f"max_concurrency ({max_concurrency}) cannot exceed {effective_limit}. You have {from_numbers_count} phone number(s) configured. Add more CLIs in telephony configuration to increase concurrency.", ) raise HTTPException( status_code=400, detail=f"max_concurrency ({max_concurrency}) cannot exceed organization limit ({effective_limit})", ) class RetryConfigRequest(BaseModel): enabled: bool = True max_retries: int = Field(default=2, ge=0, le=10) retry_delay_seconds: int = Field(default=120, ge=30, le=3600) retry_on_busy: bool = True retry_on_no_answer: bool = True retry_on_voicemail: bool = True class RetryConfigResponse(BaseModel): enabled: bool max_retries: int retry_delay_seconds: int retry_on_busy: bool retry_on_no_answer: bool retry_on_voicemail: bool class TimeSlotRequest(BaseModel): day_of_week: int = Field(..., ge=0, le=6) start_time: str = Field(..., pattern=r"^\d{2}:\d{2}$") end_time: str = Field(..., pattern=r"^\d{2}:\d{2}$") @model_validator(mode="after") def validate_times(self): if self.start_time >= self.end_time: raise ValueError("start_time must be before end_time") return self class ScheduleConfigRequest(BaseModel): enabled: bool = True timezone: str = "UTC" slots: List[TimeSlotRequest] = Field(..., min_length=1, max_length=50) @field_validator("timezone") @classmethod def validate_timezone(cls, v: str) -> str: try: ZoneInfo(v) except (KeyError, Exception): raise ValueError(f"Invalid timezone: {v}") return v class TimeSlotResponse(BaseModel): day_of_week: int start_time: str end_time: str class ScheduleConfigResponse(BaseModel): enabled: bool timezone: str slots: List[TimeSlotResponse] class CircuitBreakerConfigRequest(BaseModel): enabled: bool = True failure_threshold: float = Field(default=0.5, ge=0.0, le=1.0) window_seconds: int = Field(default=120, ge=30, le=600) min_calls_in_window: int = Field(default=5, ge=1, le=100) class CircuitBreakerConfigResponse(BaseModel): enabled: bool = False failure_threshold: float = 0.5 window_seconds: int = 120 min_calls_in_window: int = 5 class CreateCampaignRequest(BaseModel): name: str = Field(..., min_length=1, max_length=255) workflow_id: int source_type: str = Field(..., pattern="^(google-sheet|csv)$") source_id: str # Google Sheet URL or CSV file key retry_config: Optional[RetryConfigRequest] = None max_concurrency: Optional[int] = Field(default=None, ge=1, le=100) schedule_config: Optional[ScheduleConfigRequest] = None circuit_breaker: Optional[CircuitBreakerConfigRequest] = None class UpdateCampaignRequest(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=255) retry_config: Optional[RetryConfigRequest] = None max_concurrency: Optional[int] = Field(default=None, ge=1, le=100) schedule_config: Optional[ScheduleConfigRequest] = None circuit_breaker: Optional[CircuitBreakerConfigRequest] = None class CampaignResponse(BaseModel): id: int name: str workflow_id: int workflow_name: str state: str source_type: str source_id: str total_rows: Optional[int] processed_rows: int failed_rows: int created_at: datetime started_at: Optional[datetime] completed_at: Optional[datetime] retry_config: RetryConfigResponse max_concurrency: Optional[int] = None schedule_config: Optional[ScheduleConfigResponse] = None circuit_breaker: Optional[CircuitBreakerConfigResponse] = None class CampaignsResponse(BaseModel): campaigns: List[CampaignResponse] class WorkflowRunResponse(BaseModel): id: int workflow_id: int state: str created_at: datetime completed_at: Optional[datetime] class CampaignRunsResponse(BaseModel): """Paginated response for campaign workflow runs""" runs: List[dict] # WorkflowRunResponseSchema from schemas total_count: int page: int limit: int total_pages: int class CampaignProgressResponse(BaseModel): campaign_id: int state: str total_rows: int processed_rows: int failed_calls: int progress_percentage: float source_sync: dict rate_limit: int started_at: Optional[datetime] completed_at: Optional[datetime] # Default retry config for campaigns def _build_campaign_response(campaign, workflow_name: str) -> CampaignResponse: """Build a CampaignResponse from a campaign model.""" # Get retry_config from campaign or use defaults retry_config = ( campaign.retry_config if campaign.retry_config else DEFAULT_CAMPAIGN_RETRY_CONFIG ) # Get max_concurrency, schedule_config, circuit_breaker from orchestrator_metadata max_concurrency = None schedule_config = None circuit_breaker_config = CircuitBreakerConfigResponse() if campaign.orchestrator_metadata: max_concurrency = campaign.orchestrator_metadata.get("max_concurrency") sc = campaign.orchestrator_metadata.get("schedule_config") if sc: schedule_config = ScheduleConfigResponse( enabled=sc.get("enabled", False), timezone=sc.get("timezone", "UTC"), slots=[TimeSlotResponse(**slot) for slot in sc.get("slots", [])], ) cb = campaign.orchestrator_metadata.get("circuit_breaker") if cb: circuit_breaker_config = CircuitBreakerConfigResponse(**cb) return CampaignResponse( id=campaign.id, name=campaign.name, workflow_id=campaign.workflow_id, workflow_name=workflow_name, state=campaign.state, source_type=campaign.source_type, source_id=campaign.source_id, total_rows=campaign.total_rows, processed_rows=campaign.processed_rows, failed_rows=campaign.failed_rows, created_at=campaign.created_at, started_at=campaign.started_at, completed_at=campaign.completed_at, retry_config=RetryConfigResponse(**retry_config), max_concurrency=max_concurrency, schedule_config=schedule_config, circuit_breaker=circuit_breaker_config, ) @router.post("/create") async def create_campaign( request: CreateCampaignRequest, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Create a new campaign""" # Verify workflow exists and belongs to organization workflow_name = await db_client.get_workflow_name(request.workflow_id, user.id) if not workflow_name: raise HTTPException(status_code=404, detail="Workflow not found") # Validate source data (phone_number column and format) sync_service = get_sync_service(request.source_type) validation_result = await sync_service.validate_source( request.source_id, user.selected_organization_id ) if not validation_result.is_valid: raise HTTPException(status_code=400, detail=validation_result.error.message) if request.max_concurrency is not None: await _validate_max_concurrency( request.max_concurrency, user.selected_organization_id ) # Build retry_config dict if provided retry_config = None if request.retry_config: retry_config = request.retry_config.model_dump() # Build schedule_config dict if provided schedule_config = None if request.schedule_config: schedule_config = request.schedule_config.model_dump() # Build circuit_breaker dict if provided circuit_breaker_config = None if request.circuit_breaker: circuit_breaker_config = request.circuit_breaker.model_dump() campaign = await db_client.create_campaign( name=request.name, workflow_id=request.workflow_id, source_type=request.source_type, source_id=request.source_id, user_id=user.id, organization_id=user.selected_organization_id, retry_config=retry_config, max_concurrency=request.max_concurrency, schedule_config=schedule_config, circuit_breaker=circuit_breaker_config, ) return _build_campaign_response(campaign, workflow_name) @router.get("/") async def get_campaigns( user: UserModel = Depends(get_user), ) -> CampaignsResponse: """Get campaigns for user's organization""" campaigns = await db_client.get_campaigns(user.selected_organization_id) # Get workflow names for all campaigns workflow_ids = list(set(c.workflow_id for c in campaigns)) workflows = await db_client.get_workflows_by_ids( workflow_ids, user.selected_organization_id ) workflow_map = {w.id: w.name for w in workflows} campaign_responses = [ _build_campaign_response(c, workflow_map.get(c.workflow_id, "Unknown")) for c in campaigns ] return CampaignsResponse(campaigns=campaign_responses) @router.get("/{campaign_id}") async def get_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Get campaign details""" campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.post("/{campaign_id}/start") async def start_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Start campaign execution""" # Check if organization has TELEPHONY_CONFIGURATION configured twilio_config = await db_client.get_configuration( user.selected_organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value, ) if not twilio_config or not twilio_config.value: raise HTTPException( status_code=401, detail="You must configure telephony first by going to APP_URL/configure-telephony", ) # Check Dograh quota before starting campaign quota_result = await check_dograh_quota(user) if not quota_result.has_quota: raise HTTPException(status_code=402, detail=quota_result.error_message) # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Start the campaign using the runner service try: await campaign_runner_service.start_campaign(campaign_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Get updated campaign campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.post("/{campaign_id}/pause") async def pause_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Pause campaign execution""" # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Pause the campaign using the runner service try: await campaign_runner_service.pause_campaign(campaign_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Get updated campaign campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.patch("/{campaign_id}") async def update_campaign( campaign_id: int, request: UpdateCampaignRequest, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Update campaign settings (name, retry config, max concurrency, schedule)""" campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") if campaign.state in ["completed", "failed"]: raise HTTPException( status_code=400, detail=f"Cannot update a {campaign.state} campaign", ) if request.max_concurrency is not None: await _validate_max_concurrency( request.max_concurrency, user.selected_organization_id ) # Build update kwargs update_kwargs = {} if request.name is not None: update_kwargs["name"] = request.name if request.retry_config is not None: update_kwargs["retry_config"] = request.retry_config.model_dump() # Merge max_concurrency and schedule_config into orchestrator_metadata metadata = campaign.orchestrator_metadata or {} metadata_changed = False if request.max_concurrency is not None: metadata["max_concurrency"] = request.max_concurrency metadata_changed = True if request.schedule_config is not None: metadata["schedule_config"] = request.schedule_config.model_dump() metadata_changed = True if request.circuit_breaker is not None: metadata["circuit_breaker"] = request.circuit_breaker.model_dump() metadata_changed = True if metadata_changed: update_kwargs["orchestrator_metadata"] = metadata if update_kwargs: await db_client.update_campaign(campaign_id=campaign_id, **update_kwargs) # Re-fetch to return updated data campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.get("/{campaign_id}/runs") async def get_campaign_runs( campaign_id: int, page: int = 1, limit: int = 50, filters: Optional[str] = Query(None, description="JSON-encoded filter criteria"), sort_by: Optional[str] = Query( None, description="Field to sort by (e.g., 'duration', 'created_at')" ), sort_order: Optional[str] = Query( "desc", description="Sort order ('asc' or 'desc')" ), user: UserModel = Depends(get_user), ) -> CampaignRunsResponse: """Get campaign workflow runs with pagination, filters and sorting""" offset = (page - 1) * limit # Parse filters if provided filter_criteria = [] if filters: try: filter_criteria = json.loads(filters) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid filter format") # Restrict allowed filter attributes for regular users allowed_attributes = { "dateRange", "dispositionCode", "duration", "status", "tokenUsage", } for filter_item in filter_criteria: attribute = filter_item.get("attribute") if attribute and attribute not in allowed_attributes: raise HTTPException( status_code=403, detail=f"Invalid attribute '{attribute}'" ) try: runs, total_count = await db_client.get_campaign_runs_paginated( campaign_id, user.selected_organization_id, limit=limit, offset=offset, filters=filter_criteria if filter_criteria else None, sort_by=sort_by, sort_order=sort_order, ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) total_pages = (total_count + limit - 1) // limit return CampaignRunsResponse( runs=[run.model_dump() for run in runs], total_count=total_count, page=page, limit=limit, total_pages=total_pages, ) @router.post("/{campaign_id}/resume") async def resume_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Resume a paused campaign""" # Check if organization has TELEPHONY_CONFIGURATION configured twilio_config = await db_client.get_configuration( user.selected_organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value, ) if not twilio_config or not twilio_config.value: raise HTTPException( status_code=401, detail="You must configure telephony first by going to APP_URL/configure-telephony", ) # Check Dograh quota before resuming campaign quota_result = await check_dograh_quota(user) if not quota_result.has_quota: raise HTTPException(status_code=402, detail=quota_result.error_message) # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Resume the campaign using the runner service try: await campaign_runner_service.resume_campaign(campaign_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Get updated campaign campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.get("/{campaign_id}/progress") async def get_campaign_progress( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignProgressResponse: """Get current campaign progress and statistics""" # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Get progress from runner service try: progress = await campaign_runner_service.get_campaign_status(campaign_id) return CampaignProgressResponse(**progress) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) class CampaignSourceDownloadResponse(BaseModel): download_url: str expires_in: int @router.get("/{campaign_id}/source-download-url") async def get_campaign_source_download_url( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignSourceDownloadResponse: """Get presigned download URL for campaign CSV source file Only works for CSV source type. For Google Sheets, use the source_id directly. Validates that the campaign belongs to the user's organization for security. """ # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Only generate download URL for CSV files if campaign.source_type != "csv": raise HTTPException( status_code=400, detail=f"Download URL only available for CSV sources. This campaign uses {campaign.source_type}", ) # Verify the file key belongs to the user's organization # File key format: campaigns/{org_id}/{uuid}_{filename}.csv if not campaign.source_id.startswith(f"campaigns/{user.selected_organization_id}/"): raise HTTPException( status_code=403, detail="Access denied: Source file does not belong to your organization", ) # Generate presigned download URL try: download_url = await storage_fs.aget_signed_url( campaign.source_id, expiration=3600, # 1 hour ) if not download_url: raise HTTPException( status_code=500, detail="Failed to generate download URL" ) return CampaignSourceDownloadResponse( download_url=download_url, expires_in=3600 ) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to generate download URL: {str(e)}" )