import csv import io import json from datetime import datetime from typing import List, Optional from zoneinfo import ZoneInfo from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field, field_validator, model_validator from api.constants import ( BACKEND_API_ENDPOINT, DEFAULT_CAMPAIGN_RETRY_CONFIG, DEFAULT_ORG_CONCURRENCY_LIMIT, ) from api.db import db_client from api.db.models import UserModel from api.enums import OrganizationConfigurationKey from api.services.auth.depends import get_user from api.services.campaign.runner import campaign_runner_service from api.services.campaign.source_sync_factory import get_sync_service from api.services.quota_service import check_dograh_quota from api.services.storage import storage_fs from api.utils.transcript import generate_transcript_text router = APIRouter(prefix="/campaign") async def _get_org_concurrent_limit(organization_id: int) -> int: """Get the concurrent call limit for an organization.""" try: config = await db_client.get_configuration( organization_id, OrganizationConfigurationKey.CONCURRENT_CALL_LIMIT.value, ) if config and config.value: return int(config.value.get("value", DEFAULT_ORG_CONCURRENCY_LIMIT)) except Exception: pass return DEFAULT_ORG_CONCURRENCY_LIMIT async def _get_from_numbers_count(organization_id: int) -> int: """Get the number of configured from_numbers for an organization.""" try: config = await db_client.get_configuration( organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value, ) if config and config.value: return len(config.value.get("from_numbers", [])) except Exception: pass return 0 async def _validate_max_concurrency(max_concurrency: int, organization_id: int) -> None: """Validate max_concurrency against org limit and configured phone numbers. Raises HTTPException(400) if the value exceeds the effective limit. """ org_limit = await _get_org_concurrent_limit(organization_id) from_numbers_count = await _get_from_numbers_count(organization_id) effective_limit = ( min(org_limit, from_numbers_count) if from_numbers_count > 0 else org_limit ) if max_concurrency > effective_limit: if from_numbers_count > 0 and from_numbers_count < org_limit: raise HTTPException( status_code=400, detail=f"max_concurrency ({max_concurrency}) cannot exceed {effective_limit}. You have {from_numbers_count} phone number(s) configured. Add more CLIs in telephony configuration to increase concurrency.", ) raise HTTPException( status_code=400, detail=f"max_concurrency ({max_concurrency}) cannot exceed organization limit ({effective_limit})", ) class RetryConfigRequest(BaseModel): enabled: bool = True max_retries: int = Field(default=2, ge=0, le=10) retry_delay_seconds: int = Field(default=120, ge=30, le=3600) retry_on_busy: bool = True retry_on_no_answer: bool = True retry_on_voicemail: bool = True class RetryConfigResponse(BaseModel): enabled: bool max_retries: int retry_delay_seconds: int retry_on_busy: bool retry_on_no_answer: bool retry_on_voicemail: bool class TimeSlotRequest(BaseModel): day_of_week: int = Field(..., ge=0, le=6) start_time: str = Field(..., pattern=r"^\d{2}:\d{2}$") end_time: str = Field(..., pattern=r"^\d{2}:\d{2}$") @model_validator(mode="after") def validate_times(self): if self.start_time >= self.end_time: raise ValueError("start_time must be before end_time") return self class ScheduleConfigRequest(BaseModel): enabled: bool = True timezone: str = "UTC" slots: List[TimeSlotRequest] = Field(..., min_length=1, max_length=50) @field_validator("timezone") @classmethod def validate_timezone(cls, v: str) -> str: try: ZoneInfo(v) except (KeyError, Exception): raise ValueError(f"Invalid timezone: {v}") return v class TimeSlotResponse(BaseModel): day_of_week: int start_time: str end_time: str class ScheduleConfigResponse(BaseModel): enabled: bool timezone: str slots: List[TimeSlotResponse] class CircuitBreakerConfigRequest(BaseModel): enabled: bool = True failure_threshold: float = Field(default=0.5, ge=0.0, le=1.0) window_seconds: int = Field(default=120, ge=30, le=600) min_calls_in_window: int = Field(default=5, ge=1, le=100) class CircuitBreakerConfigResponse(BaseModel): enabled: bool = False failure_threshold: float = 0.5 window_seconds: int = 120 min_calls_in_window: int = 5 class CreateCampaignRequest(BaseModel): name: str = Field(..., min_length=1, max_length=255) workflow_id: int source_type: str = Field(..., pattern="^(google-sheet|csv)$") source_id: str # Google Sheet URL or CSV file key retry_config: Optional[RetryConfigRequest] = None max_concurrency: Optional[int] = Field(default=None, ge=1, le=100) schedule_config: Optional[ScheduleConfigRequest] = None circuit_breaker: Optional[CircuitBreakerConfigRequest] = None class UpdateCampaignRequest(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=255) retry_config: Optional[RetryConfigRequest] = None max_concurrency: Optional[int] = Field(default=None, ge=1, le=100) schedule_config: Optional[ScheduleConfigRequest] = None circuit_breaker: Optional[CircuitBreakerConfigRequest] = None class CampaignResponse(BaseModel): id: int name: str workflow_id: int workflow_name: str state: str source_type: str source_id: str total_rows: Optional[int] processed_rows: int failed_rows: int created_at: datetime started_at: Optional[datetime] completed_at: Optional[datetime] retry_config: RetryConfigResponse max_concurrency: Optional[int] = None schedule_config: Optional[ScheduleConfigResponse] = None circuit_breaker: Optional[CircuitBreakerConfigResponse] = None class CampaignsResponse(BaseModel): campaigns: List[CampaignResponse] class WorkflowRunResponse(BaseModel): id: int workflow_id: int state: str created_at: datetime completed_at: Optional[datetime] class CampaignRunsResponse(BaseModel): """Paginated response for campaign workflow runs""" runs: List[dict] # WorkflowRunResponseSchema from schemas total_count: int page: int limit: int total_pages: int class CampaignProgressResponse(BaseModel): campaign_id: int state: str total_rows: int processed_rows: int failed_calls: int progress_percentage: float source_sync: dict rate_limit: int started_at: Optional[datetime] completed_at: Optional[datetime] # Default retry config for campaigns def _build_campaign_response(campaign, workflow_name: str) -> CampaignResponse: """Build a CampaignResponse from a campaign model.""" # Get retry_config from campaign or use defaults retry_config = ( campaign.retry_config if campaign.retry_config else DEFAULT_CAMPAIGN_RETRY_CONFIG ) # Get max_concurrency, schedule_config, circuit_breaker from orchestrator_metadata max_concurrency = None schedule_config = None circuit_breaker_config = CircuitBreakerConfigResponse() if campaign.orchestrator_metadata: max_concurrency = campaign.orchestrator_metadata.get("max_concurrency") sc = campaign.orchestrator_metadata.get("schedule_config") if sc: schedule_config = ScheduleConfigResponse( enabled=sc.get("enabled", False), timezone=sc.get("timezone", "UTC"), slots=[TimeSlotResponse(**slot) for slot in sc.get("slots", [])], ) cb = campaign.orchestrator_metadata.get("circuit_breaker") if cb: circuit_breaker_config = CircuitBreakerConfigResponse(**cb) return CampaignResponse( id=campaign.id, name=campaign.name, workflow_id=campaign.workflow_id, workflow_name=workflow_name, state=campaign.state, source_type=campaign.source_type, source_id=campaign.source_id, total_rows=campaign.total_rows, processed_rows=campaign.processed_rows, failed_rows=campaign.failed_rows, created_at=campaign.created_at, started_at=campaign.started_at, completed_at=campaign.completed_at, retry_config=RetryConfigResponse(**retry_config), max_concurrency=max_concurrency, schedule_config=schedule_config, circuit_breaker=circuit_breaker_config, ) @router.post("/create") async def create_campaign( request: CreateCampaignRequest, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Create a new campaign""" # Verify workflow exists and belongs to organization workflow_name = await db_client.get_workflow_name(request.workflow_id, user.id) if not workflow_name: raise HTTPException(status_code=404, detail="Workflow not found") # Validate source data (phone_number column and format) sync_service = get_sync_service(request.source_type) validation_result = await sync_service.validate_source( request.source_id, user.selected_organization_id ) if not validation_result.is_valid: raise HTTPException(status_code=400, detail=validation_result.error.message) if request.max_concurrency is not None: await _validate_max_concurrency( request.max_concurrency, user.selected_organization_id ) # Build retry_config dict if provided retry_config = None if request.retry_config: retry_config = request.retry_config.model_dump() # Build schedule_config dict if provided schedule_config = None if request.schedule_config: schedule_config = request.schedule_config.model_dump() # Build circuit_breaker dict if provided circuit_breaker_config = None if request.circuit_breaker: circuit_breaker_config = request.circuit_breaker.model_dump() campaign = await db_client.create_campaign( name=request.name, workflow_id=request.workflow_id, source_type=request.source_type, source_id=request.source_id, user_id=user.id, organization_id=user.selected_organization_id, retry_config=retry_config, max_concurrency=request.max_concurrency, schedule_config=schedule_config, circuit_breaker=circuit_breaker_config, ) return _build_campaign_response(campaign, workflow_name) @router.get("/") async def get_campaigns( user: UserModel = Depends(get_user), ) -> CampaignsResponse: """Get campaigns for user's organization""" campaigns = await db_client.get_campaigns(user.selected_organization_id) # Get workflow names for all campaigns workflow_ids = list(set(c.workflow_id for c in campaigns)) workflows = await db_client.get_workflows_by_ids( workflow_ids, user.selected_organization_id ) workflow_map = {w.id: w.name for w in workflows} campaign_responses = [ _build_campaign_response(c, workflow_map.get(c.workflow_id, "Unknown")) for c in campaigns ] return CampaignsResponse(campaigns=campaign_responses) @router.get("/{campaign_id}") async def get_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Get campaign details""" campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.post("/{campaign_id}/start") async def start_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Start campaign execution""" # Check if organization has TELEPHONY_CONFIGURATION configured twilio_config = await db_client.get_configuration( user.selected_organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value, ) if not twilio_config or not twilio_config.value: raise HTTPException( status_code=401, detail="You must configure telephony first by going to APP_URL/configure-telephony", ) # Check Dograh quota before starting campaign quota_result = await check_dograh_quota(user) if not quota_result.has_quota: raise HTTPException(status_code=402, detail=quota_result.error_message) # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Start the campaign using the runner service try: await campaign_runner_service.start_campaign(campaign_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Get updated campaign campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.post("/{campaign_id}/pause") async def pause_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Pause campaign execution""" # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Pause the campaign using the runner service try: await campaign_runner_service.pause_campaign(campaign_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Get updated campaign campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.patch("/{campaign_id}") async def update_campaign( campaign_id: int, request: UpdateCampaignRequest, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Update campaign settings (name, retry config, max concurrency, schedule)""" campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") if campaign.state in ["completed", "failed"]: raise HTTPException( status_code=400, detail=f"Cannot update a {campaign.state} campaign", ) if request.max_concurrency is not None: await _validate_max_concurrency( request.max_concurrency, user.selected_organization_id ) # Build update kwargs update_kwargs = {} if request.name is not None: update_kwargs["name"] = request.name if request.retry_config is not None: update_kwargs["retry_config"] = request.retry_config.model_dump() # Merge max_concurrency and schedule_config into orchestrator_metadata metadata = campaign.orchestrator_metadata or {} metadata_changed = False if request.max_concurrency is not None: metadata["max_concurrency"] = request.max_concurrency metadata_changed = True if request.schedule_config is not None: metadata["schedule_config"] = request.schedule_config.model_dump() metadata_changed = True if request.circuit_breaker is not None: metadata["circuit_breaker"] = request.circuit_breaker.model_dump() metadata_changed = True if metadata_changed: update_kwargs["orchestrator_metadata"] = metadata if update_kwargs: await db_client.update_campaign(campaign_id=campaign_id, **update_kwargs) # Re-fetch to return updated data campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.get("/{campaign_id}/runs") async def get_campaign_runs( campaign_id: int, page: int = 1, limit: int = 50, filters: Optional[str] = Query(None, description="JSON-encoded filter criteria"), sort_by: Optional[str] = Query( None, description="Field to sort by (e.g., 'duration', 'created_at')" ), sort_order: Optional[str] = Query( "desc", description="Sort order ('asc' or 'desc')" ), user: UserModel = Depends(get_user), ) -> CampaignRunsResponse: """Get campaign workflow runs with pagination, filters and sorting""" offset = (page - 1) * limit # Parse filters if provided filter_criteria = [] if filters: try: filter_criteria = json.loads(filters) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid filter format") # Restrict allowed filter attributes for regular users allowed_attributes = { "dateRange", "dispositionCode", "duration", "status", "tokenUsage", } for filter_item in filter_criteria: attribute = filter_item.get("attribute") if attribute and attribute not in allowed_attributes: raise HTTPException( status_code=403, detail=f"Invalid attribute '{attribute}'" ) try: runs, total_count = await db_client.get_campaign_runs_paginated( campaign_id, user.selected_organization_id, limit=limit, offset=offset, filters=filter_criteria if filter_criteria else None, sort_by=sort_by, sort_order=sort_order, ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) total_pages = (total_count + limit - 1) // limit return CampaignRunsResponse( runs=[run.model_dump() for run in runs], total_count=total_count, page=page, limit=limit, total_pages=total_pages, ) @router.post("/{campaign_id}/resume") async def resume_campaign( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignResponse: """Resume a paused campaign""" # Check if organization has TELEPHONY_CONFIGURATION configured twilio_config = await db_client.get_configuration( user.selected_organization_id, OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value, ) if not twilio_config or not twilio_config.value: raise HTTPException( status_code=401, detail="You must configure telephony first by going to APP_URL/configure-telephony", ) # Check Dograh quota before resuming campaign quota_result = await check_dograh_quota(user) if not quota_result.has_quota: raise HTTPException(status_code=402, detail=quota_result.error_message) # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Resume the campaign using the runner service try: await campaign_runner_service.resume_campaign(campaign_id) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Get updated campaign campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) workflow_name = await db_client.get_workflow_name(campaign.workflow_id, user.id) return _build_campaign_response(campaign, workflow_name or "Unknown") @router.get("/{campaign_id}/progress") async def get_campaign_progress( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignProgressResponse: """Get current campaign progress and statistics""" # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Get progress from runner service try: progress = await campaign_runner_service.get_campaign_status(campaign_id) return CampaignProgressResponse(**progress) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) class CampaignSourceDownloadResponse(BaseModel): download_url: str expires_in: int @router.get("/{campaign_id}/source-download-url") async def get_campaign_source_download_url( campaign_id: int, user: UserModel = Depends(get_user), ) -> CampaignSourceDownloadResponse: """Get presigned download URL for campaign CSV source file Only works for CSV source type. For Google Sheets, use the source_id directly. Validates that the campaign belongs to the user's organization for security. """ # Verify campaign exists and belongs to organization campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") # Only generate download URL for CSV files if campaign.source_type != "csv": raise HTTPException( status_code=400, detail=f"Download URL only available for CSV sources. This campaign uses {campaign.source_type}", ) # Verify the file key belongs to the user's organization # File key format: campaigns/{org_id}/{uuid}_{filename}.csv if not campaign.source_id.startswith(f"campaigns/{user.selected_organization_id}/"): raise HTTPException( status_code=403, detail="Access denied: Source file does not belong to your organization", ) # Generate presigned download URL try: download_url = await storage_fs.aget_signed_url( campaign.source_id, expiration=3600, # 1 hour ) if not download_url: raise HTTPException( status_code=500, detail="Failed to generate download URL" ) return CampaignSourceDownloadResponse( download_url=download_url, expires_in=3600 ) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to generate download URL: {str(e)}" ) def _transcript_from_logs(logs: dict | None) -> str: """Extract transcript text from workflow run logs JSON.""" if not logs: return "" events = logs.get("realtime_feedback_events", []) return generate_transcript_text(events).strip() @router.get("/{campaign_id}/report") async def download_campaign_report( campaign_id: int, user: UserModel = Depends(get_user), ) -> StreamingResponse: """Download a CSV report of completed campaign runs.""" campaign = await db_client.get_campaign(campaign_id, user.selected_organization_id) if not campaign: raise HTTPException(status_code=404, detail="Campaign not found") runs = await db_client.get_completed_runs_for_report(campaign_id) output = io.StringIO() writer = csv.writer(output) writer.writerow( [ "Run ID", "Created At", "Customer Name", "Phone Number", "Call Disposition", "Call Tags", "Call Duration (s)", "Transcript", "Recording URL", ] ) for run in runs: initial = run.initial_context or {} gathered = run.gathered_context or {} cost = run.cost_info or {} recording_url = "" if run.public_access_token: recording_url = ( f"{BACKEND_API_ENDPOINT}/api/v1/public/download/workflow" f"/{run.public_access_token}/recording" ) call_tags = gathered.get("call_tags", []) if isinstance(call_tags, list): call_tags = ", ".join(str(t) for t in call_tags) writer.writerow( [ run.id, run.created_at.isoformat() if run.created_at else "", initial.get("first_name", ""), initial.get("phone_number", ""), gathered.get("mapped_call_disposition", ""), call_tags, cost.get("call_duration_seconds", ""), _transcript_from_logs(run.logs), recording_url, ] ) output.seek(0) filename = f"campaign_{campaign_id}_report.csv" return StreamingResponse( output, media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, )