import asyncio import time from datetime import UTC, datetime from typing import TYPE_CHECKING, Optional from loguru import logger from api.constants import DEFAULT_ORG_CONCURRENCY_LIMIT from api.db import db_client from api.db.models import QueuedRunModel, WorkflowRunModel from api.enums import OrganizationConfigurationKey, WorkflowRunState from api.services.campaign.circuit_breaker import circuit_breaker from api.services.campaign.errors import ( ConcurrentSlotAcquisitionError, PhoneNumberPoolExhaustedError, ) from api.services.campaign.rate_limiter import rate_limiter from api.utils.common import get_backend_endpoints if TYPE_CHECKING: # Type-only — importing api.services.telephony eagerly triggers the # provider package init, which can pull in this module via the routes # chain and create a circular import. Runtime calls below lazy-import the # factory helpers inside methods instead. from api.services.telephony.base import TelephonyProvider class CampaignCallDispatcher: """Manages rate-limited and concurrent-limited call dispatching""" def __init__(self): self.default_concurrent_limit = int(DEFAULT_ORG_CONCURRENCY_LIMIT) async def get_provider_for_campaign(self, campaign) -> "TelephonyProvider": """Get the telephony provider pinned to this campaign's config. Falls back to the org's default config for legacy campaigns whose ``telephony_configuration_id`` was never backfilled.""" from api.services.telephony.factory import ( get_default_telephony_provider, get_telephony_provider_by_id, ) if campaign.telephony_configuration_id: return await get_telephony_provider_by_id( campaign.telephony_configuration_id, campaign.organization_id ) logger.warning( f"Campaign {campaign.id} has no telephony_configuration_id; " f"falling back to org default for {campaign.organization_id}" ) return await get_default_telephony_provider(campaign.organization_id) async def get_org_concurrent_limit(self, organization_id: int) -> int: """Get the concurrent call limit for an organization.""" try: config = await db_client.get_configuration( organization_id, OrganizationConfigurationKey.CONCURRENT_CALL_LIMIT.value, ) if config and config.value: return int(config.value["value"]) except Exception as e: logger.warning( f"Error getting concurrent limit for org {organization_id}: {e}" ) return self.default_concurrent_limit async def process_batch(self, campaign_id: int, batch_size: int = 10) -> int: """ Processes a batch of queued runs with priority for scheduled retries. Thread-safe: uses SELECT FOR UPDATE SKIP LOCKED to prevent concurrent processing. Returns: number of processed runs """ # Get campaign details campaign = await db_client.get_campaign_by_id(campaign_id) if not campaign: raise ValueError(f"Campaign {campaign_id} not found") # Check if campaign is in running state if campaign.state != "running": logger.info( f"Campaign {campaign_id} is not in running state: {campaign.state}" ) return 0 # Atomically claim queued runs for processing (thread-safe) # This uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions queued_runs = await db_client.claim_queued_runs_for_processing( campaign_id=campaign_id, scheduled_before=datetime.now(UTC), limit=batch_size, ) if not queued_runs: logger.info(f"No more queued runs for campaign {campaign_id}") return 0 # Initialize from_number pool for this campaign's telephony config. try: provider = await self.get_provider_for_campaign(campaign) if provider.from_numbers: await rate_limiter.initialize_from_number_pool( campaign.organization_id, provider.from_numbers, telephony_configuration_id=campaign.telephony_configuration_id, ) except Exception as e: logger.warning(f"Failed to initialize from_number pool: {e}") processed_count = 0 processed_run_ids: set[int] = set() for i, queued_run in enumerate(queued_runs): try: # Apply rate limiting, i.e lets not initiate more than rate_limit_per_second # calls per second. It is different than concurrency limit. await self.apply_rate_limit( campaign.organization_id, campaign.rate_limit_per_second ) # Acquire concurrent slot - waits until a slot is available slot_id = await self.acquire_concurrent_slot( campaign.organization_id, campaign ) # Dispatch the call workflow_run = await self.dispatch_call(queued_run, campaign, slot_id) # Update queued run as processed await db_client.update_queued_run( queued_run_id=queued_run.id, state="processed", workflow_run_id=workflow_run.id, processed_at=datetime.now(UTC), ) processed_count += 1 processed_run_ids.add(queued_run.id) # Update campaign processed count await db_client.update_campaign( campaign_id=campaign_id, processed_rows=campaign.processed_rows + 1 ) except asyncio.CancelledError: logger.warning( f"Campaign {campaign_id} batch cancelled; returning claimed " "queued runs that were not dispatched" ) await self._return_unprocessed_claims( queued_runs, processed_run_ids, reason="task_cancelled" ) raise except PhoneNumberPoolExhaustedError as e: logger.warning( f"Phone number pool exhausted for campaign {campaign_id}; " "returning claimed queued runs that were not dispatched: " f"{e}" ) await self._return_unprocessed_claims( queued_runs, processed_run_ids, reason="phone_number_pool_exhausted", ) # Re-raise to propagate to process_campaign_batch raise except ConcurrentSlotAcquisitionError as e: logger.warning( f"Concurrent slot acquisition failed for campaign {campaign_id}; " "returning claimed queued runs that were not dispatched: " f"{e}" ) await self._return_unprocessed_claims( queued_runs, processed_run_ids, reason="concurrent_slot_acquisition_failed", ) # Re-raise to propagate to process_campaign_batch raise except Exception as e: logger.warning(f"Error processing queued run {queued_run.id}: {e}") # Mark the queued run as failed to prevent infinite retry loops try: await db_client.update_queued_run( queued_run_id=queued_run.id, state="failed", processed_at=datetime.now(UTC), ) logger.info( f"Marked queued run {queued_run.id} as failed due to error: {e}" ) except Exception as update_error: logger.error( f"Failed to mark queued run {queued_run.id} as failed: {update_error}" ) return processed_count async def _return_unprocessed_claims( self, queued_runs: list[QueuedRunModel], processed_run_ids: set[int], *, reason: str, ) -> None: queued_run_ids = [ queued_run.id for queued_run in queued_runs if queued_run.id not in processed_run_ids ] if not queued_run_ids: return try: returned_count = ( await db_client.return_processing_queued_runs_without_workflow( queued_run_ids ) ) logger.info( f"Returned {returned_count}/{len(queued_run_ids)} claimed queued runs " f"back to queued state; reason={reason}; " f"queued_run_ids={queued_run_ids}" ) except Exception as revert_error: logger.error( f"Failed to return claimed queued runs; reason={reason}; " f"queued_run_ids={queued_run_ids}; error={revert_error}" ) async def dispatch_call( self, queued_run: QueuedRunModel, campaign: any, slot_id: str ) -> Optional[WorkflowRunModel]: """Creates workflow run and initiates call. Requires a pre-acquired slot_id.""" from_number = None # Get workflow details workflow = await db_client.get_workflow_by_id(campaign.workflow_id) if not workflow: # Release slot before raising await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) raise ValueError(f"Workflow {campaign.workflow_id} not found") # Extract phone number phone_number = queued_run.context_variables.get("phone_number") if not phone_number: # Release slot before raising await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) raise ValueError(f"No phone number in queued run {queued_run.id}") # Get provider for this campaign's pinned telephony config. provider = await self.get_provider_for_campaign(campaign) workflow_run_mode = provider.PROVIDER_NAME # Acquire a unique from_number from the pool scoped to this campaign's # telephony configuration so orgs with multiple configs don't leak # caller IDs across configs. from_number = await self.acquire_from_number( campaign.organization_id, telephony_configuration_id=campaign.telephony_configuration_id, ) if from_number is None: # Release concurrent slot before raising await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) raise PhoneNumberPoolExhaustedError( organization_id=campaign.organization_id ) logger.info(f"Provider name: {provider.PROVIDER_NAME}") logger.info(f"Queued run context: {queued_run.context_variables}") # Merge context variables (queued_run context already includes retry info if applicable) initial_context = { **queued_run.context_variables, "campaign_id": campaign.id, "provider": provider.PROVIDER_NAME, "source_uuid": queued_run.source_uuid, "caller_number": from_number, "called_number": phone_number, "telephony_configuration_id": campaign.telephony_configuration_id, } logger.info(f"Final initial_context: {initial_context}") # Create workflow run with queued_run_id tracking workflow_run_name = f"WR-CAMPAIGN-{campaign.id}-{queued_run.id}" try: workflow_run = await db_client.create_workflow_run( name=workflow_run_name, workflow_id=campaign.workflow_id, mode=workflow_run_mode, user_id=campaign.created_by, initial_context=initial_context, campaign_id=campaign.id, queued_run_id=queued_run.id, # Link to queued run for retry tracking ) # Store slot_id mapping in Redis for cleanup later await rate_limiter.store_workflow_slot_mapping( workflow_run.id, campaign.organization_id, slot_id ) # Store from_number mapping for cleanup on call completion await rate_limiter.store_workflow_from_number_mapping( workflow_run.id, campaign.organization_id, from_number, telephony_configuration_id=campaign.telephony_configuration_id, ) except Exception as e: # Release slot and from_number on error await rate_limiter.release_concurrent_slot( campaign.organization_id, slot_id ) if from_number: await rate_limiter.release_from_number( campaign.organization_id, from_number, telephony_configuration_id=campaign.telephony_configuration_id, ) raise # Add "retry" tag if this is a retry call if queued_run.context_variables.get("is_retry"): retry_reason = queued_run.context_variables.get("retry_reason", "unknown") await db_client.update_workflow_run( run_id=workflow_run.id, gathered_context={ "call_tags": ["retry", f"retry_reason_{retry_reason}"] }, ) # Initiate call via telephony provider try: # Construct webhook URL with parameters backend_endpoint, _ = await get_backend_endpoints() webhook_endpoint = provider.WEBHOOK_ENDPOINT webhook_url = ( f"{backend_endpoint}/api/v1/telephony/{webhook_endpoint}" f"?workflow_id={campaign.workflow_id}" f"&user_id={campaign.created_by}" f"&workflow_run_id={workflow_run.id}" f"&organization_id={campaign.organization_id}" ) call_result = await provider.initiate_call( to_number=phone_number, webhook_url=webhook_url, workflow_run_id=workflow_run.id, from_number=from_number, workflow_id=campaign.workflow_id, user_id=campaign.created_by, ) # Store provider type and metadata in gathered_context # (required for WebSocket handler to route to correct provider) await db_client.update_workflow_run( run_id=workflow_run.id, gathered_context={ "provider": provider.PROVIDER_NAME, **(call_result.provider_metadata or {}), }, ) logger.info( f"Call initiated for workflow run {workflow_run.id}, Call ID: {call_result.call_id}" ) except Exception as e: logger.error( f"Failed to initiate call for workflow run {workflow_run.id}: {e}" ) # Update workflow run as failed telephony_callback_logs = workflow_run.logs.get( "telephony_status_callbacks", [] ) telephony_callback_log = { "status": "failed", "timestamp": datetime.now(UTC).isoformat(), "data": {"error": str(e)}, } telephony_callback_logs.append(telephony_callback_log) await db_client.update_workflow_run( run_id=workflow_run.id, is_completed=True, state=WorkflowRunState.COMPLETED.value, gathered_context={ "error": str(e), }, logs={ "telephony_status_callbacks": telephony_callback_logs, }, ) # Record call initiation failure in circuit breaker await circuit_breaker.record_and_evaluate( campaign.id, is_failure=True, workflow_run_id=workflow_run.id, reason="call_initiation_failed", ) # Release concurrent slot on failure mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run.id) if mapping: org_id, slot_id = mapping await rate_limiter.release_concurrent_slot(org_id, slot_id) await rate_limiter.delete_workflow_slot_mapping(workflow_run.id) # Release from_number on failure from_number_mapping = await rate_limiter.get_workflow_from_number_mapping( workflow_run.id ) if from_number_mapping: fn_org_id, fn_number, fn_tcid = from_number_mapping await rate_limiter.release_from_number( fn_org_id, fn_number, telephony_configuration_id=fn_tcid ) await rate_limiter.delete_workflow_from_number_mapping(workflow_run.id) raise return workflow_run async def apply_rate_limit(self, organization_id: int, rate_limit: int) -> None: """ Enforces rate limiting - waits if necessary to comply with rate limit Example usage: ``` # This will wait up to 1 second if needed to respect rate limit await self.apply_rate_limit(org_id, 1) # 1 call per second await twilio.initiate_call(...) # Now safe to call ``` """ max_wait = 1.0 # Maximum time to wait for a slot start_time = time.time() while True: # Try to acquire token if await rate_limiter.acquire_token(organization_id, rate_limit): return # Got permission to proceed # Check how long to wait wait_time = await rate_limiter.get_next_available_slot( organization_id, rate_limit ) # Don't wait forever if time.time() - start_time + wait_time > max_wait: raise TimeoutError("Rate limit timeout - try again later") # Wait for next available slot await asyncio.sleep(wait_time) async def acquire_concurrent_slot( self, organization_id: int, campaign: any, timeout: float = 600 ) -> str: """ Acquires a concurrent call slot - waits if necessary until a slot is available. Args: organization_id: The organization ID campaign: The campaign object timeout: Maximum time to wait for a slot (default 10 minutes) Returns the slot_id which must be released when the call completes. Raises: ConcurrentSlotAcquisitionError: If slot cannot be acquired within timeout """ # Get concurrent limit for organization org_concurrent_limit = await self.get_org_concurrent_limit(organization_id) # Check for campaign-level max_concurrency in orchestrator_metadata campaign_max_concurrency = None if campaign.orchestrator_metadata: campaign_max_concurrency = campaign.orchestrator_metadata.get( "max_concurrency" ) # Use the lower of campaign limit and org limit if campaign_max_concurrency is not None: max_concurrent = min(campaign_max_concurrency, org_concurrent_limit) else: max_concurrent = org_concurrent_limit # Track wait time for alerting wait_start = time.time() # Wait until we can acquire a concurrent slot while True: slot_id = await rate_limiter.try_acquire_concurrent_slot( organization_id, max_concurrent ) if slot_id: return slot_id # Check if we've been waiting too long wait_time = time.time() - wait_start if wait_time > timeout: raise ConcurrentSlotAcquisitionError( organization_id=organization_id, campaign_id=campaign.id, wait_time=wait_time, ) logger.debug( f"Attempting to get a slot for {organization_id} {campaign.id}, " f"waited {wait_time:.1f}s" ) # Wait before retrying await asyncio.sleep(1) async def acquire_from_number( self, organization_id: int, telephony_configuration_id: int | None, timeout: float = 600, ) -> Optional[str]: """ Acquire a from_number from the (org, telephony config) pool with retry. Waits up to timeout seconds, polling every 1s. Returns: The acquired phone number as a string, or None if timeout is exceeded. """ wait_start = time.time() while True: from_number = await rate_limiter.acquire_from_number( organization_id, telephony_configuration_id ) if from_number: return from_number wait_time = time.time() - wait_start if wait_time > timeout: logger.warning( f"From number pool exhausted for org {organization_id} " f"config {telephony_configuration_id} after waiting " f"{wait_time:.1f}s" ) return None logger.debug( f"All from_numbers in use for org {organization_id} " f"config {telephony_configuration_id}, waited {wait_time:.1f}s, " "retrying..." ) await asyncio.sleep(1) async def release_call_slot(self, workflow_run_id: int) -> bool: """ Release concurrent slot and from_number when a call completes. Called by Twilio webhooks or workflow completion handlers. """ slot_released = False mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run_id) if mapping: org_id, slot_id = mapping success = await rate_limiter.release_concurrent_slot(org_id, slot_id) if success: await rate_limiter.delete_workflow_slot_mapping(workflow_run_id) logger.info( f"Released concurrent slot for workflow run {workflow_run_id}" ) slot_released = True # Release from_number back to its (org, telephony config) pool from_number_mapping = await rate_limiter.get_workflow_from_number_mapping( workflow_run_id ) if from_number_mapping: fn_org_id, fn_number, fn_tcid = from_number_mapping fn_success = await rate_limiter.release_from_number( fn_org_id, fn_number, telephony_configuration_id=fn_tcid ) if fn_success: await rate_limiter.delete_workflow_from_number_mapping(workflow_run_id) logger.info( f"Released from_number {fn_number} for workflow run {workflow_run_id}" ) return slot_released # Global instance campaign_call_dispatcher = CampaignCallDispatcher()