import asyncio import time from datetime import UTC, datetime from typing import Optional from loguru import logger from api.constants import DEFAULT_ORG_CONCURRENCY_LIMIT from api.db import db_client from api.db.models import QueuedRunModel, WorkflowRunModel from api.enums import OrganizationConfigurationKey, WorkflowRunState from api.services.campaign.circuit_breaker import circuit_breaker from api.services.campaign.errors import ( ConcurrentSlotAcquisitionError, PhoneNumberPoolExhaustedError, ) from api.services.campaign.rate_limiter import rate_limiter from api.services.telephony.base import TelephonyProvider from api.services.telephony.factory import get_telephony_provider from api.utils.common import get_backend_endpoints class CampaignCallDispatcher: """Manages rate-limited and concurrent-limited call dispatching""" def __init__(self): self.default_concurrent_limit = int(DEFAULT_ORG_CONCURRENCY_LIMIT) async def get_telephony_provider(self, organization_id: int) -> TelephonyProvider: """Get telephony provider instance for specific organization""" return await get_telephony_provider(organization_id) async def get_org_concurrent_limit(self, organization_id: int) -> int: """Get the concurrent call limit for an organization.""" try: config = await db_client.get_configuration( organization_id, OrganizationConfigurationKey.CONCURRENT_CALL_LIMIT.value, ) if config and config.value: return int(config.value["value"]) except Exception as e: logger.warning( f"Error getting concurrent limit for org {organization_id}: {e}" ) return self.default_concurrent_limit async def process_batch(self, campaign_id: int, batch_size: int = 10) -> int: """ Processes a batch of queued runs with priority for scheduled retries. Thread-safe: uses SELECT FOR UPDATE SKIP LOCKED to prevent concurrent processing. Returns: number of processed runs """ # Get campaign details campaign = await db_client.get_campaign_by_id(campaign_id) if not campaign: raise ValueError(f"Campaign {campaign_id} not found") # Check if campaign is in running state if campaign.state != "running": logger.info( f"Campaign {campaign_id} is not in running state: {campaign.state}" ) return 0 # Atomically claim queued runs for processing (thread-safe) # This uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions queued_runs = await db_client.claim_queued_runs_for_processing( campaign_id=campaign_id, scheduled_before=datetime.now(UTC), limit=batch_size, ) if not queued_runs: logger.info(f"No more queued runs for campaign {campaign_id}") return 0 # Initialize from_number pool for this org's provider try: provider = await self.get_telephony_provider(campaign.organization_id) if provider.from_numbers: await rate_limiter.initialize_from_number_pool( campaign.organization_id, provider.from_numbers ) except Exception as e: logger.warning(f"Failed to initialize from_number pool: {e}") processed_count = 0 for i, queued_run in enumerate(queued_runs): try: # Apply rate limiting, i.e lets not initiate more than rate_limit_per_second # calls per second. It is different than concurrency limit. await self.apply_rate_limit( campaign.organization_id, campaign.rate_limit_per_second ) # Acquire concurrent slot - waits until a slot is available slot_id = await self.acquire_concurrent_slot( campaign.organization_id, campaign ) # Dispatch the call workflow_run = await self.dispatch_call(queued_run, campaign, slot_id) # Update queued run as processed await db_client.update_queued_run( queued_run_id=queued_run.id, state="processed", workflow_run_id=workflow_run.id, processed_at=datetime.now(UTC), ) processed_count += 1 # Update campaign processed count await db_client.update_campaign( campaign_id=campaign_id, processed_rows=campaign.processed_rows + 1 ) except (ConcurrentSlotAcquisitionError, PhoneNumberPoolExhaustedError): # Revert all unprocessed runs (current and remaining) back to queued # so they can be picked up again when campaign is resumed for unprocessed_run in queued_runs[i:]: try: await db_client.update_queued_run( queued_run_id=unprocessed_run.id, state="queued", ) logger.info( f"Reverted queued run {unprocessed_run.id} back to queued state" ) except Exception as revert_error: logger.error( f"Failed to revert queued run {unprocessed_run.id}: {revert_error}" ) # Re-raise to propagate to process_campaign_batch raise except Exception as e: logger.warning(f"Error processing queued run {queued_run.id}: {e}") # Mark the queued run as failed to prevent infinite retry loops try: await db_client.update_queued_run( queued_run_id=queued_run.id, state="failed", processed_at=datetime.now(UTC), ) logger.info( f"Marked queued run {queued_run.id} as failed due to error: {e}" ) except Exception as update_error: logger.error( f"Failed to mark queued run {queued_run.id} as failed: {update_error}" ) return processed_count async def dispatch_call( self, queued_run: QueuedRunModel, campaign: any, slot_id: str ) -> Optional[WorkflowRunModel]: """Creates workflow run and initiates call. Requires a pre-acquired slot_id.""" from_number = None # Get workflow details workflow = await db_client.get_workflow_by_id(campaign.workflow_id) if not workflow: # Release slot before raising await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) raise ValueError(f"Workflow {campaign.workflow_id} not found") # Extract phone number phone_number = queued_run.context_variables.get("phone_number") if not phone_number: # Release slot before raising await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) raise ValueError(f"No phone number in queued run {queued_run.id}") # Get provider first to determine the mode provider = await self.get_telephony_provider(campaign.organization_id) workflow_run_mode = provider.PROVIDER_NAME # Acquire a unique from_number from the pool from_number = await self.acquire_from_number(campaign.organization_id) if from_number is None: # Release concurrent slot before raising await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) raise PhoneNumberPoolExhaustedError( organization_id=campaign.organization_id ) logger.info(f"Provider name: {provider.PROVIDER_NAME}") logger.info(f"Queued run context: {queued_run.context_variables}") # Merge context variables (queued_run context already includes retry info if applicable) initial_context = { **workflow.template_context_variables, **queued_run.context_variables, "campaign_id": campaign.id, "provider": provider.PROVIDER_NAME, "source_uuid": queued_run.source_uuid, } logger.info(f"Final initial_context: {initial_context}") # Create workflow run with queued_run_id tracking workflow_run_name = f"WR-CAMPAIGN-{campaign.id}-{queued_run.id}" try: workflow_run = await db_client.create_workflow_run( name=workflow_run_name, workflow_id=campaign.workflow_id, mode=workflow_run_mode, user_id=campaign.created_by, initial_context=initial_context, campaign_id=campaign.id, queued_run_id=queued_run.id, # Link to queued run for retry tracking ) # Store slot_id mapping in Redis for cleanup later await rate_limiter.store_workflow_slot_mapping( workflow_run.id, campaign.organization_id, slot_id ) # Store from_number mapping for cleanup on call completion await rate_limiter.store_workflow_from_number_mapping( workflow_run.id, campaign.organization_id, from_number ) except Exception as e: # Release slot and from_number on error await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) if from_number: await rate_limiter.release_from_number( campaign.organization_id, from_number ) raise # Add "retry" tag if this is a retry call if queued_run.context_variables.get("is_retry"): retry_reason = queued_run.context_variables.get("retry_reason", "unknown") await db_client.update_workflow_run( run_id=workflow_run.id, gathered_context={ "call_tags": ["retry", f"retry_reason_{retry_reason}"] }, ) # Initiate call via telephony provider try: # Construct webhook URL with parameters backend_endpoint, _ = await get_backend_endpoints() webhook_endpoint = provider.WEBHOOK_ENDPOINT webhook_url = ( f"{backend_endpoint}/api/v1/telephony/{webhook_endpoint}" f"?workflow_id={campaign.workflow_id}" f"&user_id={campaign.created_by}" f"&workflow_run_id={workflow_run.id}" f"&campaign_id={campaign.id}" f"&organization_id={campaign.organization_id}" ) call_result = await provider.initiate_call( to_number=phone_number, webhook_url=webhook_url, workflow_run_id=workflow_run.id, from_number=from_number, workflow_id=campaign.workflow_id, user_id=campaign.created_by, ) # Store provider type and metadata in gathered_context # (required for WebSocket handler to route to correct provider) await db_client.update_workflow_run( run_id=workflow_run.id, gathered_context={ "provider": provider.PROVIDER_NAME, **(call_result.provider_metadata or {}), }, ) logger.info( f"Call initiated for workflow run {workflow_run.id}, Call ID: {call_result.call_id}" ) except Exception as e: logger.error( f"Failed to initiate call for workflow run {workflow_run.id}: {e}" ) # Update workflow run as failed telephony_callback_logs = workflow_run.logs.get( "telephony_status_callbacks", [] ) telephony_callback_log = { "status": "failed", "timestamp": datetime.now(UTC).isoformat(), "data": {"error": str(e)}, } telephony_callback_logs.append(telephony_callback_log) await db_client.update_workflow_run( run_id=workflow_run.id, is_completed=True, state=WorkflowRunState.COMPLETED.value, gathered_context={ "error": str(e), }, logs={ "telephony_status_callbacks": telephony_callback_logs, }, ) # Record call initiation failure in circuit breaker await circuit_breaker.record_and_evaluate(campaign.id, is_failure=True) # Release concurrent slot on failure mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run.id) if mapping: org_id, slot_id = mapping await rate_limiter.release_concurrent_slot(org_id, slot_id) await rate_limiter.delete_workflow_slot_mapping(workflow_run.id) # Release from_number on failure from_number_mapping = await rate_limiter.get_workflow_from_number_mapping( workflow_run.id ) if from_number_mapping: fn_org_id, fn_number = from_number_mapping await rate_limiter.release_from_number(fn_org_id, fn_number) await rate_limiter.delete_workflow_from_number_mapping(workflow_run.id) raise return workflow_run async def apply_rate_limit(self, organization_id: int, rate_limit: int) -> None: """ Enforces rate limiting - waits if necessary to comply with rate limit Example usage: ``` # This will wait up to 1 second if needed to respect rate limit await self.apply_rate_limit(org_id, 1) # 1 call per second await twilio.initiate_call(...) # Now safe to call ``` """ max_wait = 1.0 # Maximum time to wait for a slot start_time = time.time() while True: # Try to acquire token if await rate_limiter.acquire_token(organization_id, rate_limit): return # Got permission to proceed # Check how long to wait wait_time = await rate_limiter.get_next_available_slot( organization_id, rate_limit ) # Don't wait forever if time.time() - start_time + wait_time > max_wait: raise TimeoutError("Rate limit timeout - try again later") # Wait for next available slot await asyncio.sleep(wait_time) async def acquire_concurrent_slot( self, organization_id: int, campaign: any, timeout: float = 600 ) -> str: """ Acquires a concurrent call slot - waits if necessary until a slot is available. Args: organization_id: The organization ID campaign: The campaign object timeout: Maximum time to wait for a slot (default 10 minutes) Returns the slot_id which must be released when the call completes. Raises: ConcurrentSlotAcquisitionError: If slot cannot be acquired within timeout """ # Get concurrent limit for organization org_concurrent_limit = await self.get_org_concurrent_limit(organization_id) # Check for campaign-level max_concurrency in orchestrator_metadata campaign_max_concurrency = None if campaign.orchestrator_metadata: campaign_max_concurrency = campaign.orchestrator_metadata.get( "max_concurrency" ) # Use the lower of campaign limit and org limit if campaign_max_concurrency is not None: max_concurrent = min(campaign_max_concurrency, org_concurrent_limit) else: max_concurrent = org_concurrent_limit # Track wait time for alerting wait_start = time.time() # Wait until we can acquire a concurrent slot while True: slot_id = await rate_limiter.try_acquire_concurrent_slot( organization_id, max_concurrent ) if slot_id: return slot_id # Check if we've been waiting too long wait_time = time.time() - wait_start if wait_time > timeout: raise ConcurrentSlotAcquisitionError( organization_id=organization_id, campaign_id=campaign.id, wait_time=wait_time, ) logger.debug( f"Attempting to get a slot for {organization_id} {campaign.id}, " f"waited {wait_time:.1f}s" ) # Wait before retrying await asyncio.sleep(1) async def acquire_from_number( self, organization_id: int, timeout: float = 60 ) -> Optional[str]: """ Acquire a from_number from the pool with retry. Waits up to timeout seconds, polling every 1s. Returns the phone number or None if timeout is exceeded. """ wait_start = time.time() while True: from_number = await rate_limiter.acquire_from_number(organization_id) if from_number: return from_number wait_time = time.time() - wait_start if wait_time > timeout: logger.warning( f"From number pool exhausted for org {organization_id} " f"after waiting {wait_time:.1f}s" ) return None logger.debug( f"All from_numbers in use for org {organization_id}, " f"waited {wait_time:.1f}s, retrying..." ) await asyncio.sleep(1) async def release_call_slot(self, workflow_run_id: int) -> bool: """ Release concurrent slot and from_number when a call completes. Called by Twilio webhooks or workflow completion handlers. """ slot_released = False mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run_id) if mapping: org_id, slot_id = mapping success = await rate_limiter.release_concurrent_slot(org_id, slot_id) if success: await rate_limiter.delete_workflow_slot_mapping(workflow_run_id) logger.info( f"Released concurrent slot for workflow run {workflow_run_id}" ) slot_released = True # Release from_number back to pool from_number_mapping = await rate_limiter.get_workflow_from_number_mapping( workflow_run_id ) if from_number_mapping: fn_org_id, fn_number = from_number_mapping fn_success = await rate_limiter.release_from_number(fn_org_id, fn_number) if fn_success: await rate_limiter.delete_workflow_from_number_mapping(workflow_run_id) logger.info( f"Released from_number {fn_number} for workflow run {workflow_run_id}" ) return slot_released # Global instance campaign_call_dispatcher = CampaignCallDispatcher()