2026-01-30 17:08:15 +05:30
import json
2025-09-09 14:37:32 +05:30
from datetime import datetime
from typing import List , Optional
2026-02-17 21:04:15 +05:30
from zoneinfo import ZoneInfo
2025-09-09 14:37:32 +05:30
2026-01-30 17:08:15 +05:30
from fastapi import APIRouter , Depends , HTTPException , Query
2026-02-17 21:04:15 +05:30
from pydantic import BaseModel , Field , field_validator , model_validator
2025-09-09 14:37:32 +05:30
2026-03-05 13:43:13 +05:30
from api . constants import (
DEFAULT_CAMPAIGN_RETRY_CONFIG ,
DEFAULT_ORG_CONCURRENCY_LIMIT ,
)
2025-09-09 14:37:32 +05:30
from api . db import db_client
from api . db . models import UserModel
from api . enums import OrganizationConfigurationKey
from api . services . auth . depends import get_user
from api . services . campaign . runner import campaign_runner_service
2026-02-07 13:45:21 +05:30
from api . services . campaign . source_sync_factory import get_sync_service
2025-12-22 14:08:30 +05:30
from api . services . quota_service import check_dograh_quota
2025-10-09 17:54:31 +05:30
from api . services . storage import storage_fs
2025-09-09 14:37:32 +05:30
router = APIRouter ( prefix = " /campaign " )
2026-01-29 11:57:57 +05:30
async def _get_org_concurrent_limit ( organization_id : int ) - > int :
""" Get the concurrent call limit for an organization. """
try :
config = await db_client . get_configuration (
organization_id ,
OrganizationConfigurationKey . CONCURRENT_CALL_LIMIT . value ,
)
if config and config . value :
return int ( config . value . get ( " value " , DEFAULT_ORG_CONCURRENCY_LIMIT ) )
except Exception :
pass
return DEFAULT_ORG_CONCURRENCY_LIMIT
2026-02-07 13:45:21 +05:30
async def _get_from_numbers_count ( organization_id : int ) - > int :
""" Get the number of configured from_numbers for an organization. """
try :
config = await db_client . get_configuration (
organization_id ,
OrganizationConfigurationKey . TELEPHONY_CONFIGURATION . value ,
)
if config and config . value :
return len ( config . value . get ( " from_numbers " , [ ] ) )
except Exception :
pass
return 0
2026-02-17 21:04:15 +05:30
async def _validate_max_concurrency ( max_concurrency : int , organization_id : int ) - > None :
""" Validate max_concurrency against org limit and configured phone numbers.
Raises HTTPException ( 400 ) if the value exceeds the effective limit .
"""
org_limit = await _get_org_concurrent_limit ( organization_id )
from_numbers_count = await _get_from_numbers_count ( organization_id )
effective_limit = (
min ( org_limit , from_numbers_count ) if from_numbers_count > 0 else org_limit
)
if max_concurrency > effective_limit :
if from_numbers_count > 0 and from_numbers_count < org_limit :
raise HTTPException (
status_code = 400 ,
detail = f " max_concurrency ( { max_concurrency } ) cannot exceed { effective_limit } . You have { from_numbers_count } phone number(s) configured. Add more CLIs in telephony configuration to increase concurrency. " ,
)
raise HTTPException (
status_code = 400 ,
detail = f " max_concurrency ( { max_concurrency } ) cannot exceed organization limit ( { effective_limit } ) " ,
)
2026-01-29 11:57:57 +05:30
class RetryConfigRequest ( BaseModel ) :
enabled : bool = True
max_retries : int = Field ( default = 2 , ge = 0 , le = 10 )
retry_delay_seconds : int = Field ( default = 120 , ge = 30 , le = 3600 )
retry_on_busy : bool = True
retry_on_no_answer : bool = True
retry_on_voicemail : bool = True
class RetryConfigResponse ( BaseModel ) :
enabled : bool
max_retries : int
retry_delay_seconds : int
retry_on_busy : bool
retry_on_no_answer : bool
retry_on_voicemail : bool
2026-02-17 21:04:15 +05:30
class TimeSlotRequest ( BaseModel ) :
day_of_week : int = Field ( . . . , ge = 0 , le = 6 )
start_time : str = Field ( . . . , pattern = r " ^ \ d {2} : \ d {2} $ " )
end_time : str = Field ( . . . , pattern = r " ^ \ d {2} : \ d {2} $ " )
@model_validator ( mode = " after " )
def validate_times ( self ) :
if self . start_time > = self . end_time :
raise ValueError ( " start_time must be before end_time " )
return self
class ScheduleConfigRequest ( BaseModel ) :
enabled : bool = True
timezone : str = " UTC "
slots : List [ TimeSlotRequest ] = Field ( . . . , min_length = 1 , max_length = 50 )
@field_validator ( " timezone " )
@classmethod
def validate_timezone ( cls , v : str ) - > str :
try :
ZoneInfo ( v )
except ( KeyError , Exception ) :
raise ValueError ( f " Invalid timezone: { v } " )
return v
class TimeSlotResponse ( BaseModel ) :
day_of_week : int
start_time : str
end_time : str
class ScheduleConfigResponse ( BaseModel ) :
enabled : bool
timezone : str
slots : List [ TimeSlotResponse ]
2026-03-05 13:43:13 +05:30
class CircuitBreakerConfigRequest ( BaseModel ) :
enabled : bool = True
failure_threshold : float = Field ( default = 0.5 , ge = 0.0 , le = 1.0 )
window_seconds : int = Field ( default = 120 , ge = 30 , le = 600 )
min_calls_in_window : int = Field ( default = 5 , ge = 1 , le = 100 )
class CircuitBreakerConfigResponse ( BaseModel ) :
2026-03-05 15:12:24 +05:30
enabled : bool = False
failure_threshold : float = 0.5
window_seconds : int = 120
min_calls_in_window : int = 5
2026-03-05 13:43:13 +05:30
2025-09-09 14:37:32 +05:30
class CreateCampaignRequest ( BaseModel ) :
name : str = Field ( . . . , min_length = 1 , max_length = 255 )
workflow_id : int
2025-10-09 17:54:31 +05:30
source_type : str = Field ( . . . , pattern = " ^(google-sheet|csv)$ " )
source_id : str # Google Sheet URL or CSV file key
2026-01-29 11:57:57 +05:30
retry_config : Optional [ RetryConfigRequest ] = None
max_concurrency : Optional [ int ] = Field ( default = None , ge = 1 , le = 100 )
2026-02-17 21:04:15 +05:30
schedule_config : Optional [ ScheduleConfigRequest ] = None
2026-03-05 13:43:13 +05:30
circuit_breaker : Optional [ CircuitBreakerConfigRequest ] = None
2026-02-17 21:04:15 +05:30
class UpdateCampaignRequest ( BaseModel ) :
name : Optional [ str ] = Field ( None , min_length = 1 , max_length = 255 )
retry_config : Optional [ RetryConfigRequest ] = None
max_concurrency : Optional [ int ] = Field ( default = None , ge = 1 , le = 100 )
schedule_config : Optional [ ScheduleConfigRequest ] = None
2026-03-05 13:43:13 +05:30
circuit_breaker : Optional [ CircuitBreakerConfigRequest ] = None
2025-09-09 14:37:32 +05:30
class CampaignResponse ( BaseModel ) :
id : int
name : str
workflow_id : int
workflow_name : str
state : str
source_type : str
source_id : str
total_rows : Optional [ int ]
processed_rows : int
failed_rows : int
created_at : datetime
started_at : Optional [ datetime ]
completed_at : Optional [ datetime ]
2026-01-29 11:57:57 +05:30
retry_config : RetryConfigResponse
max_concurrency : Optional [ int ] = None
2026-02-17 21:04:15 +05:30
schedule_config : Optional [ ScheduleConfigResponse ] = None
2026-03-05 13:43:13 +05:30
circuit_breaker : Optional [ CircuitBreakerConfigResponse ] = None
2025-09-09 14:37:32 +05:30
class CampaignsResponse ( BaseModel ) :
campaigns : List [ CampaignResponse ]
class WorkflowRunResponse ( BaseModel ) :
id : int
workflow_id : int
state : str
created_at : datetime
completed_at : Optional [ datetime ]
2026-01-30 17:08:15 +05:30
class CampaignRunsResponse ( BaseModel ) :
""" Paginated response for campaign workflow runs """
runs : List [ dict ] # WorkflowRunResponseSchema from schemas
total_count : int
page : int
limit : int
total_pages : int
2025-09-09 14:37:32 +05:30
class CampaignProgressResponse ( BaseModel ) :
campaign_id : int
state : str
total_rows : int
processed_rows : int
failed_calls : int
progress_percentage : float
source_sync : dict
rate_limit : int
started_at : Optional [ datetime ]
completed_at : Optional [ datetime ]
2026-01-29 11:57:57 +05:30
# Default retry config for campaigns
def _build_campaign_response ( campaign , workflow_name : str ) - > CampaignResponse :
""" Build a CampaignResponse from a campaign model. """
# Get retry_config from campaign or use defaults
retry_config = (
campaign . retry_config
if campaign . retry_config
else DEFAULT_CAMPAIGN_RETRY_CONFIG
)
2026-03-05 13:43:13 +05:30
# Get max_concurrency, schedule_config, circuit_breaker from orchestrator_metadata
2026-01-29 11:57:57 +05:30
max_concurrency = None
2026-02-17 21:04:15 +05:30
schedule_config = None
2026-03-05 15:12:24 +05:30
circuit_breaker_config = CircuitBreakerConfigResponse ( )
2026-01-29 11:57:57 +05:30
if campaign . orchestrator_metadata :
max_concurrency = campaign . orchestrator_metadata . get ( " max_concurrency " )
2026-02-17 21:04:15 +05:30
sc = campaign . orchestrator_metadata . get ( " schedule_config " )
if sc :
schedule_config = ScheduleConfigResponse (
enabled = sc . get ( " enabled " , False ) ,
timezone = sc . get ( " timezone " , " UTC " ) ,
slots = [ TimeSlotResponse ( * * slot ) for slot in sc . get ( " slots " , [ ] ) ] ,
)
2026-03-05 13:43:13 +05:30
cb = campaign . orchestrator_metadata . get ( " circuit_breaker " )
if cb :
circuit_breaker_config = CircuitBreakerConfigResponse ( * * cb )
2026-01-29 11:57:57 +05:30
return CampaignResponse (
id = campaign . id ,
name = campaign . name ,
workflow_id = campaign . workflow_id ,
workflow_name = workflow_name ,
state = campaign . state ,
source_type = campaign . source_type ,
source_id = campaign . source_id ,
total_rows = campaign . total_rows ,
processed_rows = campaign . processed_rows ,
failed_rows = campaign . failed_rows ,
created_at = campaign . created_at ,
started_at = campaign . started_at ,
completed_at = campaign . completed_at ,
retry_config = RetryConfigResponse ( * * retry_config ) ,
max_concurrency = max_concurrency ,
2026-02-17 21:04:15 +05:30
schedule_config = schedule_config ,
2026-03-05 13:43:13 +05:30
circuit_breaker = circuit_breaker_config ,
2026-01-29 11:57:57 +05:30
)
2025-09-09 14:37:32 +05:30
@router.post ( " /create " )
async def create_campaign (
request : CreateCampaignRequest ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignResponse :
""" Create a new campaign """
# Verify workflow exists and belongs to organization
workflow_name = await db_client . get_workflow_name ( request . workflow_id , user . id )
if not workflow_name :
raise HTTPException ( status_code = 404 , detail = " Workflow not found " )
2026-01-29 11:57:57 +05:30
# Validate source data (phone_number column and format)
2026-02-07 13:45:21 +05:30
sync_service = get_sync_service ( request . source_type )
validation_result = await sync_service . validate_source (
request . source_id , user . selected_organization_id
)
if not validation_result . is_valid :
raise HTTPException ( status_code = 400 , detail = validation_result . error . message )
2026-01-29 11:57:57 +05:30
if request . max_concurrency is not None :
2026-02-17 21:04:15 +05:30
await _validate_max_concurrency (
request . max_concurrency , user . selected_organization_id
2026-02-07 13:45:21 +05:30
)
2026-01-29 11:57:57 +05:30
# Build retry_config dict if provided
retry_config = None
if request . retry_config :
retry_config = request . retry_config . model_dump ( )
2026-02-17 21:04:15 +05:30
# Build schedule_config dict if provided
schedule_config = None
if request . schedule_config :
schedule_config = request . schedule_config . model_dump ( )
2026-03-05 13:43:13 +05:30
# Build circuit_breaker dict if provided
circuit_breaker_config = None
if request . circuit_breaker :
circuit_breaker_config = request . circuit_breaker . model_dump ( )
2025-09-09 14:37:32 +05:30
campaign = await db_client . create_campaign (
name = request . name ,
workflow_id = request . workflow_id ,
2025-10-09 17:54:31 +05:30
source_type = request . source_type ,
2025-09-09 14:37:32 +05:30
source_id = request . source_id ,
user_id = user . id ,
organization_id = user . selected_organization_id ,
2026-01-29 11:57:57 +05:30
retry_config = retry_config ,
max_concurrency = request . max_concurrency ,
2026-02-17 21:04:15 +05:30
schedule_config = schedule_config ,
2026-03-05 13:43:13 +05:30
circuit_breaker = circuit_breaker_config ,
2025-09-09 14:37:32 +05:30
)
2026-01-29 11:57:57 +05:30
return _build_campaign_response ( campaign , workflow_name )
2025-09-09 14:37:32 +05:30
@router.get ( " / " )
async def get_campaigns (
user : UserModel = Depends ( get_user ) ,
) - > CampaignsResponse :
""" Get campaigns for user ' s organization """
campaigns = await db_client . get_campaigns ( user . selected_organization_id )
# Get workflow names for all campaigns
workflow_ids = list ( set ( c . workflow_id for c in campaigns ) )
workflows = await db_client . get_workflows_by_ids (
workflow_ids , user . selected_organization_id
)
workflow_map = { w . id : w . name for w in workflows }
campaign_responses = [
2026-01-29 11:57:57 +05:30
_build_campaign_response ( c , workflow_map . get ( c . workflow_id , " Unknown " ) )
2025-09-09 14:37:32 +05:30
for c in campaigns
]
return CampaignsResponse ( campaigns = campaign_responses )
@router.get ( " / {campaign_id} " )
async def get_campaign (
campaign_id : int ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignResponse :
""" Get campaign details """
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
if not campaign :
raise HTTPException ( status_code = 404 , detail = " Campaign not found " )
workflow_name = await db_client . get_workflow_name ( campaign . workflow_id , user . id )
2026-01-29 11:57:57 +05:30
return _build_campaign_response ( campaign , workflow_name or " Unknown " )
2025-09-09 14:37:32 +05:30
@router.post ( " / {campaign_id} /start " )
async def start_campaign (
campaign_id : int ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignResponse :
""" Start campaign execution """
2025-10-27 15:29:57 +05:30
# Check if organization has TELEPHONY_CONFIGURATION configured
2025-09-09 14:37:32 +05:30
twilio_config = await db_client . get_configuration (
user . selected_organization_id ,
2025-10-27 15:29:57 +05:30
OrganizationConfigurationKey . TELEPHONY_CONFIGURATION . value ,
2025-09-09 14:37:32 +05:30
)
2025-10-09 17:54:31 +05:30
if not twilio_config or not twilio_config . value :
2025-09-09 14:37:32 +05:30
raise HTTPException (
status_code = 401 ,
2025-10-09 17:54:31 +05:30
detail = " You must configure telephony first by going to APP_URL/configure-telephony " ,
2025-09-09 14:37:32 +05:30
)
2025-12-22 14:08:30 +05:30
# Check Dograh quota before starting campaign
quota_result = await check_dograh_quota ( user )
if not quota_result . has_quota :
raise HTTPException ( status_code = 402 , detail = quota_result . error_message )
2025-09-09 14:37:32 +05:30
# Verify campaign exists and belongs to organization
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
if not campaign :
raise HTTPException ( status_code = 404 , detail = " Campaign not found " )
# Start the campaign using the runner service
try :
await campaign_runner_service . start_campaign ( campaign_id )
except ValueError as e :
raise HTTPException ( status_code = 400 , detail = str ( e ) )
# Get updated campaign
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
workflow_name = await db_client . get_workflow_name ( campaign . workflow_id , user . id )
2026-01-29 11:57:57 +05:30
return _build_campaign_response ( campaign , workflow_name or " Unknown " )
2025-09-09 14:37:32 +05:30
@router.post ( " / {campaign_id} /pause " )
async def pause_campaign (
campaign_id : int ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignResponse :
""" Pause campaign execution """
# Verify campaign exists and belongs to organization
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
if not campaign :
raise HTTPException ( status_code = 404 , detail = " Campaign not found " )
# Pause the campaign using the runner service
try :
await campaign_runner_service . pause_campaign ( campaign_id )
except ValueError as e :
raise HTTPException ( status_code = 400 , detail = str ( e ) )
# Get updated campaign
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
workflow_name = await db_client . get_workflow_name ( campaign . workflow_id , user . id )
2026-01-29 11:57:57 +05:30
return _build_campaign_response ( campaign , workflow_name or " Unknown " )
2025-09-09 14:37:32 +05:30
2026-02-17 21:04:15 +05:30
@router.patch ( " / {campaign_id} " )
async def update_campaign (
campaign_id : int ,
request : UpdateCampaignRequest ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignResponse :
""" Update campaign settings (name, retry config, max concurrency, schedule) """
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
if not campaign :
raise HTTPException ( status_code = 404 , detail = " Campaign not found " )
if campaign . state in [ " completed " , " failed " ] :
raise HTTPException (
status_code = 400 ,
detail = f " Cannot update a { campaign . state } campaign " ,
)
if request . max_concurrency is not None :
await _validate_max_concurrency (
request . max_concurrency , user . selected_organization_id
)
# Build update kwargs
update_kwargs = { }
if request . name is not None :
update_kwargs [ " name " ] = request . name
if request . retry_config is not None :
update_kwargs [ " retry_config " ] = request . retry_config . model_dump ( )
# Merge max_concurrency and schedule_config into orchestrator_metadata
metadata = campaign . orchestrator_metadata or { }
metadata_changed = False
if request . max_concurrency is not None :
metadata [ " max_concurrency " ] = request . max_concurrency
metadata_changed = True
if request . schedule_config is not None :
metadata [ " schedule_config " ] = request . schedule_config . model_dump ( )
metadata_changed = True
2026-03-05 13:43:13 +05:30
if request . circuit_breaker is not None :
metadata [ " circuit_breaker " ] = request . circuit_breaker . model_dump ( )
metadata_changed = True
2026-02-17 21:04:15 +05:30
if metadata_changed :
update_kwargs [ " orchestrator_metadata " ] = metadata
if update_kwargs :
await db_client . update_campaign ( campaign_id = campaign_id , * * update_kwargs )
# Re-fetch to return updated data
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
workflow_name = await db_client . get_workflow_name ( campaign . workflow_id , user . id )
return _build_campaign_response ( campaign , workflow_name or " Unknown " )
2025-09-09 14:37:32 +05:30
@router.get ( " / {campaign_id} /runs " )
async def get_campaign_runs (
campaign_id : int ,
2026-01-30 17:08:15 +05:30
page : int = 1 ,
limit : int = 50 ,
filters : Optional [ str ] = Query ( None , description = " JSON-encoded filter criteria " ) ,
sort_by : Optional [ str ] = Query (
None , description = " Field to sort by (e.g., ' duration ' , ' created_at ' ) "
) ,
sort_order : Optional [ str ] = Query (
" desc " , description = " Sort order ( ' asc ' or ' desc ' ) "
) ,
2025-09-09 14:37:32 +05:30
user : UserModel = Depends ( get_user ) ,
2026-01-30 17:08:15 +05:30
) - > CampaignRunsResponse :
""" Get campaign workflow runs with pagination, filters and sorting """
offset = ( page - 1 ) * limit
# Parse filters if provided
filter_criteria = [ ]
if filters :
try :
filter_criteria = json . loads ( filters )
except json . JSONDecodeError :
raise HTTPException ( status_code = 400 , detail = " Invalid filter format " )
# Restrict allowed filter attributes for regular users
allowed_attributes = {
" dateRange " ,
" dispositionCode " ,
" duration " ,
" status " ,
" tokenUsage " ,
}
for filter_item in filter_criteria :
attribute = filter_item . get ( " attribute " )
if attribute and attribute not in allowed_attributes :
raise HTTPException (
status_code = 403 , detail = f " Invalid attribute ' { attribute } ' "
)
try :
runs , total_count = await db_client . get_campaign_runs_paginated (
campaign_id ,
user . selected_organization_id ,
limit = limit ,
offset = offset ,
filters = filter_criteria if filter_criteria else None ,
sort_by = sort_by ,
sort_order = sort_order ,
2025-09-09 14:37:32 +05:30
)
2026-01-30 17:08:15 +05:30
except ValueError as e :
raise HTTPException ( status_code = 404 , detail = str ( e ) )
total_pages = ( total_count + limit - 1 ) / / limit
return CampaignRunsResponse (
runs = [ run . model_dump ( ) for run in runs ] ,
total_count = total_count ,
page = page ,
limit = limit ,
total_pages = total_pages ,
)
2025-09-09 14:37:32 +05:30
@router.post ( " / {campaign_id} /resume " )
async def resume_campaign (
campaign_id : int ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignResponse :
""" Resume a paused campaign """
2025-10-27 15:29:57 +05:30
# Check if organization has TELEPHONY_CONFIGURATION configured
2025-09-09 14:37:32 +05:30
twilio_config = await db_client . get_configuration (
user . selected_organization_id ,
2025-10-27 15:29:57 +05:30
OrganizationConfigurationKey . TELEPHONY_CONFIGURATION . value ,
2025-09-09 14:37:32 +05:30
)
2025-10-09 17:54:31 +05:30
if not twilio_config or not twilio_config . value :
2025-09-09 14:37:32 +05:30
raise HTTPException (
status_code = 401 ,
2025-10-09 17:54:31 +05:30
detail = " You must configure telephony first by going to APP_URL/configure-telephony " ,
2025-09-09 14:37:32 +05:30
)
2025-12-22 14:08:30 +05:30
# Check Dograh quota before resuming campaign
quota_result = await check_dograh_quota ( user )
if not quota_result . has_quota :
raise HTTPException ( status_code = 402 , detail = quota_result . error_message )
2025-09-09 14:37:32 +05:30
# Verify campaign exists and belongs to organization
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
if not campaign :
raise HTTPException ( status_code = 404 , detail = " Campaign not found " )
# Resume the campaign using the runner service
try :
await campaign_runner_service . resume_campaign ( campaign_id )
except ValueError as e :
raise HTTPException ( status_code = 400 , detail = str ( e ) )
# Get updated campaign
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
workflow_name = await db_client . get_workflow_name ( campaign . workflow_id , user . id )
2026-01-29 11:57:57 +05:30
return _build_campaign_response ( campaign , workflow_name or " Unknown " )
2025-09-09 14:37:32 +05:30
@router.get ( " / {campaign_id} /progress " )
async def get_campaign_progress (
campaign_id : int ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignProgressResponse :
""" Get current campaign progress and statistics """
# Verify campaign exists and belongs to organization
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
if not campaign :
raise HTTPException ( status_code = 404 , detail = " Campaign not found " )
# Get progress from runner service
try :
progress = await campaign_runner_service . get_campaign_status ( campaign_id )
return CampaignProgressResponse ( * * progress )
except ValueError as e :
raise HTTPException ( status_code = 400 , detail = str ( e ) )
2025-10-09 17:54:31 +05:30
class CampaignSourceDownloadResponse ( BaseModel ) :
download_url : str
expires_in : int
@router.get ( " / {campaign_id} /source-download-url " )
async def get_campaign_source_download_url (
campaign_id : int ,
user : UserModel = Depends ( get_user ) ,
) - > CampaignSourceDownloadResponse :
""" Get presigned download URL for campaign CSV source file
Only works for CSV source type . For Google Sheets , use the source_id directly .
Validates that the campaign belongs to the user ' s organization for security.
"""
# Verify campaign exists and belongs to organization
campaign = await db_client . get_campaign ( campaign_id , user . selected_organization_id )
if not campaign :
raise HTTPException ( status_code = 404 , detail = " Campaign not found " )
# Only generate download URL for CSV files
if campaign . source_type != " csv " :
raise HTTPException (
status_code = 400 ,
detail = f " Download URL only available for CSV sources. This campaign uses { campaign . source_type } " ,
)
# Verify the file key belongs to the user's organization
# File key format: campaigns/{org_id}/{uuid}_{filename}.csv
if not campaign . source_id . startswith ( f " campaigns/ { user . selected_organization_id } / " ) :
raise HTTPException (
status_code = 403 ,
detail = " Access denied: Source file does not belong to your organization " ,
)
# Generate presigned download URL
try :
download_url = await storage_fs . aget_signed_url (
campaign . source_id ,
expiration = 3600 , # 1 hour
)
if not download_url :
raise HTTPException (
status_code = 500 , detail = " Failed to generate download URL "
)
return CampaignSourceDownloadResponse (
download_url = download_url , expires_in = 3600
)
except Exception as e :
raise HTTPException (
status_code = 500 , detail = f " Failed to generate download URL: { str ( e ) } "
)