dograh/api/services/campaign/campaign_call_dispatcher.py
Abhishek Kumar 6d93be3ef6 fix: number pool initialization in multi telephony setup
If there are multiple telephony configurations, the form number should be initialized from the campaigns given telephonic configuration rather than the organization default telephonic configuration.
2026-05-08 14:48:53 +05:30

554 lines
22 KiB
Python

import asyncio
import time
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Optional
from loguru import logger
from api.constants import DEFAULT_ORG_CONCURRENCY_LIMIT
from api.db import db_client
from api.db.models import QueuedRunModel, WorkflowRunModel
from api.enums import OrganizationConfigurationKey, WorkflowRunState
from api.services.campaign.circuit_breaker import circuit_breaker
from api.services.campaign.errors import (
ConcurrentSlotAcquisitionError,
PhoneNumberPoolExhaustedError,
)
from api.services.campaign.rate_limiter import rate_limiter
from api.utils.common import get_backend_endpoints
if TYPE_CHECKING:
# Type-only — importing api.services.telephony eagerly triggers the
# provider package init, which can pull in this module via the routes
# chain and create a circular import. Runtime calls below go through
# ``factory.get_telephony_provider`` (lazy import inside the method).
from api.services.telephony.base import TelephonyProvider
class CampaignCallDispatcher:
"""Manages rate-limited and concurrent-limited call dispatching"""
def __init__(self):
self.default_concurrent_limit = int(DEFAULT_ORG_CONCURRENCY_LIMIT)
async def get_telephony_provider(self, organization_id: int) -> "TelephonyProvider":
"""Get telephony provider instance for specific organization (default config)."""
from api.services.telephony.factory import get_default_telephony_provider
return await get_default_telephony_provider(organization_id)
async def get_provider_for_campaign(self, campaign) -> "TelephonyProvider":
"""Get the telephony provider pinned to this campaign's config. Falls back
to the org's default config for legacy campaigns whose
``telephony_configuration_id`` was never backfilled."""
from api.services.telephony.factory import (
get_default_telephony_provider,
get_telephony_provider_by_id,
)
if campaign.telephony_configuration_id:
return await get_telephony_provider_by_id(
campaign.telephony_configuration_id, campaign.organization_id
)
logger.warning(
f"Campaign {campaign.id} has no telephony_configuration_id; "
f"falling back to org default for {campaign.organization_id}"
)
return await get_default_telephony_provider(campaign.organization_id)
async def get_org_concurrent_limit(self, organization_id: int) -> int:
"""Get the concurrent call limit for an organization."""
try:
config = await db_client.get_configuration(
organization_id,
OrganizationConfigurationKey.CONCURRENT_CALL_LIMIT.value,
)
if config and config.value:
return int(config.value["value"])
except Exception as e:
logger.warning(
f"Error getting concurrent limit for org {organization_id}: {e}"
)
return self.default_concurrent_limit
async def process_batch(self, campaign_id: int, batch_size: int = 10) -> int:
"""
Processes a batch of queued runs with priority for scheduled retries.
Thread-safe: uses SELECT FOR UPDATE SKIP LOCKED to prevent concurrent processing.
Returns: number of processed runs
"""
# Get campaign details
campaign = await db_client.get_campaign_by_id(campaign_id)
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
# Check if campaign is in running state
if campaign.state != "running":
logger.info(
f"Campaign {campaign_id} is not in running state: {campaign.state}"
)
return 0
# Atomically claim queued runs for processing (thread-safe)
# This uses SELECT FOR UPDATE SKIP LOCKED to prevent race conditions
queued_runs = await db_client.claim_queued_runs_for_processing(
campaign_id=campaign_id,
scheduled_before=datetime.now(UTC),
limit=batch_size,
)
if not queued_runs:
logger.info(f"No more queued runs for campaign {campaign_id}")
return 0
# Initialize from_number pool for this campaign's telephony config.
try:
provider = await self.get_provider_for_campaign(campaign)
if provider.from_numbers:
await rate_limiter.initialize_from_number_pool(
campaign.organization_id,
provider.from_numbers,
telephony_configuration_id=campaign.telephony_configuration_id,
)
except Exception as e:
logger.warning(f"Failed to initialize from_number pool: {e}")
processed_count = 0
for i, queued_run in enumerate(queued_runs):
try:
# Apply rate limiting, i.e lets not initiate more than rate_limit_per_second
# calls per second. It is different than concurrency limit.
await self.apply_rate_limit(
campaign.organization_id, campaign.rate_limit_per_second
)
# Acquire concurrent slot - waits until a slot is available
slot_id = await self.acquire_concurrent_slot(
campaign.organization_id, campaign
)
# Dispatch the call
workflow_run = await self.dispatch_call(queued_run, campaign, slot_id)
# Update queued run as processed
await db_client.update_queued_run(
queued_run_id=queued_run.id,
state="processed",
workflow_run_id=workflow_run.id,
processed_at=datetime.now(UTC),
)
processed_count += 1
# Update campaign processed count
await db_client.update_campaign(
campaign_id=campaign_id, processed_rows=campaign.processed_rows + 1
)
except (ConcurrentSlotAcquisitionError, PhoneNumberPoolExhaustedError):
# Revert all unprocessed runs (current and remaining) back to queued
# so they can be picked up again when campaign is resumed
for unprocessed_run in queued_runs[i:]:
try:
await db_client.update_queued_run(
queued_run_id=unprocessed_run.id,
state="queued",
)
logger.info(
f"Reverted queued run {unprocessed_run.id} back to queued state"
)
except Exception as revert_error:
logger.error(
f"Failed to revert queued run {unprocessed_run.id}: {revert_error}"
)
# Re-raise to propagate to process_campaign_batch
raise
except Exception as e:
logger.warning(f"Error processing queued run {queued_run.id}: {e}")
# Mark the queued run as failed to prevent infinite retry loops
try:
await db_client.update_queued_run(
queued_run_id=queued_run.id,
state="failed",
processed_at=datetime.now(UTC),
)
logger.info(
f"Marked queued run {queued_run.id} as failed due to error: {e}"
)
except Exception as update_error:
logger.error(
f"Failed to mark queued run {queued_run.id} as failed: {update_error}"
)
return processed_count
async def dispatch_call(
self, queued_run: QueuedRunModel, campaign: any, slot_id: str
) -> Optional[WorkflowRunModel]:
"""Creates workflow run and initiates call. Requires a pre-acquired slot_id."""
from_number = None
# Get workflow details
workflow = await db_client.get_workflow_by_id(campaign.workflow_id)
if not workflow:
# Release slot before raising
await rate_limiter.release_concurrent_slot(
campaign.organization_id, slot_id
)
raise ValueError(f"Workflow {campaign.workflow_id} not found")
# Extract phone number
phone_number = queued_run.context_variables.get("phone_number")
if not phone_number:
# Release slot before raising
await rate_limiter.release_concurrent_slot(
campaign.organization_id, slot_id
)
raise ValueError(f"No phone number in queued run {queued_run.id}")
# Get provider for this campaign's pinned telephony config.
provider = await self.get_provider_for_campaign(campaign)
workflow_run_mode = provider.PROVIDER_NAME
# Acquire a unique from_number from the pool scoped to this campaign's
# telephony configuration so orgs with multiple configs don't leak
# caller IDs across configs.
from_number = await self.acquire_from_number(
campaign.organization_id,
telephony_configuration_id=campaign.telephony_configuration_id,
)
if from_number is None:
# Release concurrent slot before raising
await rate_limiter.release_concurrent_slot(
campaign.organization_id, slot_id
)
raise PhoneNumberPoolExhaustedError(
organization_id=campaign.organization_id
)
logger.info(f"Provider name: {provider.PROVIDER_NAME}")
logger.info(f"Queued run context: {queued_run.context_variables}")
# Merge context variables (queued_run context already includes retry info if applicable)
initial_context = {
**queued_run.context_variables,
"campaign_id": campaign.id,
"provider": provider.PROVIDER_NAME,
"source_uuid": queued_run.source_uuid,
"caller_number": from_number,
"called_number": phone_number,
"telephony_configuration_id": campaign.telephony_configuration_id,
}
logger.info(f"Final initial_context: {initial_context}")
# Create workflow run with queued_run_id tracking
workflow_run_name = f"WR-CAMPAIGN-{campaign.id}-{queued_run.id}"
try:
workflow_run = await db_client.create_workflow_run(
name=workflow_run_name,
workflow_id=campaign.workflow_id,
mode=workflow_run_mode,
user_id=campaign.created_by,
initial_context=initial_context,
campaign_id=campaign.id,
queued_run_id=queued_run.id, # Link to queued run for retry tracking
)
# Store slot_id mapping in Redis for cleanup later
await rate_limiter.store_workflow_slot_mapping(
workflow_run.id, campaign.organization_id, slot_id
)
# Store from_number mapping for cleanup on call completion
await rate_limiter.store_workflow_from_number_mapping(
workflow_run.id,
campaign.organization_id,
from_number,
telephony_configuration_id=campaign.telephony_configuration_id,
)
except Exception as e:
# Release slot and from_number on error
await rate_limiter.release_concurrent_slot(
campaign.organization_id, slot_id
)
if from_number:
await rate_limiter.release_from_number(
campaign.organization_id,
from_number,
telephony_configuration_id=campaign.telephony_configuration_id,
)
raise
# Add "retry" tag if this is a retry call
if queued_run.context_variables.get("is_retry"):
retry_reason = queued_run.context_variables.get("retry_reason", "unknown")
await db_client.update_workflow_run(
run_id=workflow_run.id,
gathered_context={
"call_tags": ["retry", f"retry_reason_{retry_reason}"]
},
)
# Initiate call via telephony provider
try:
# Construct webhook URL with parameters
backend_endpoint, _ = await get_backend_endpoints()
webhook_endpoint = provider.WEBHOOK_ENDPOINT
webhook_url = (
f"{backend_endpoint}/api/v1/telephony/{webhook_endpoint}"
f"?workflow_id={campaign.workflow_id}"
f"&user_id={campaign.created_by}"
f"&workflow_run_id={workflow_run.id}"
f"&campaign_id={campaign.id}"
f"&organization_id={campaign.organization_id}"
)
call_result = await provider.initiate_call(
to_number=phone_number,
webhook_url=webhook_url,
workflow_run_id=workflow_run.id,
from_number=from_number,
workflow_id=campaign.workflow_id,
user_id=campaign.created_by,
)
# Store provider type and metadata in gathered_context
# (required for WebSocket handler to route to correct provider)
await db_client.update_workflow_run(
run_id=workflow_run.id,
gathered_context={
"provider": provider.PROVIDER_NAME,
**(call_result.provider_metadata or {}),
},
)
logger.info(
f"Call initiated for workflow run {workflow_run.id}, Call ID: {call_result.call_id}"
)
except Exception as e:
logger.error(
f"Failed to initiate call for workflow run {workflow_run.id}: {e}"
)
# Update workflow run as failed
telephony_callback_logs = workflow_run.logs.get(
"telephony_status_callbacks", []
)
telephony_callback_log = {
"status": "failed",
"timestamp": datetime.now(UTC).isoformat(),
"data": {"error": str(e)},
}
telephony_callback_logs.append(telephony_callback_log)
await db_client.update_workflow_run(
run_id=workflow_run.id,
is_completed=True,
state=WorkflowRunState.COMPLETED.value,
gathered_context={
"error": str(e),
},
logs={
"telephony_status_callbacks": telephony_callback_logs,
},
)
# Record call initiation failure in circuit breaker
await circuit_breaker.record_and_evaluate(
campaign.id,
is_failure=True,
workflow_run_id=workflow_run.id,
reason="call_initiation_failed",
)
# Release concurrent slot on failure
mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run.id)
if mapping:
org_id, slot_id = mapping
await rate_limiter.release_concurrent_slot(org_id, slot_id)
await rate_limiter.delete_workflow_slot_mapping(workflow_run.id)
# Release from_number on failure
from_number_mapping = await rate_limiter.get_workflow_from_number_mapping(
workflow_run.id
)
if from_number_mapping:
fn_org_id, fn_number, fn_tcid = from_number_mapping
await rate_limiter.release_from_number(
fn_org_id, fn_number, telephony_configuration_id=fn_tcid
)
await rate_limiter.delete_workflow_from_number_mapping(workflow_run.id)
raise
return workflow_run
async def apply_rate_limit(self, organization_id: int, rate_limit: int) -> None:
"""
Enforces rate limiting - waits if necessary to comply with rate limit
Example usage:
```
# This will wait up to 1 second if needed to respect rate limit
await self.apply_rate_limit(org_id, 1) # 1 call per second
await twilio.initiate_call(...) # Now safe to call
```
"""
max_wait = 1.0 # Maximum time to wait for a slot
start_time = time.time()
while True:
# Try to acquire token
if await rate_limiter.acquire_token(organization_id, rate_limit):
return # Got permission to proceed
# Check how long to wait
wait_time = await rate_limiter.get_next_available_slot(
organization_id, rate_limit
)
# Don't wait forever
if time.time() - start_time + wait_time > max_wait:
raise TimeoutError("Rate limit timeout - try again later")
# Wait for next available slot
await asyncio.sleep(wait_time)
async def acquire_concurrent_slot(
self, organization_id: int, campaign: any, timeout: float = 600
) -> str:
"""
Acquires a concurrent call slot - waits if necessary until a slot is available.
Args:
organization_id: The organization ID
campaign: The campaign object
timeout: Maximum time to wait for a slot (default 10 minutes)
Returns the slot_id which must be released when the call completes.
Raises:
ConcurrentSlotAcquisitionError: If slot cannot be acquired within timeout
"""
# Get concurrent limit for organization
org_concurrent_limit = await self.get_org_concurrent_limit(organization_id)
# Check for campaign-level max_concurrency in orchestrator_metadata
campaign_max_concurrency = None
if campaign.orchestrator_metadata:
campaign_max_concurrency = campaign.orchestrator_metadata.get(
"max_concurrency"
)
# Use the lower of campaign limit and org limit
if campaign_max_concurrency is not None:
max_concurrent = min(campaign_max_concurrency, org_concurrent_limit)
else:
max_concurrent = org_concurrent_limit
# Track wait time for alerting
wait_start = time.time()
# Wait until we can acquire a concurrent slot
while True:
slot_id = await rate_limiter.try_acquire_concurrent_slot(
organization_id, max_concurrent
)
if slot_id:
return slot_id
# Check if we've been waiting too long
wait_time = time.time() - wait_start
if wait_time > timeout:
raise ConcurrentSlotAcquisitionError(
organization_id=organization_id,
campaign_id=campaign.id,
wait_time=wait_time,
)
logger.debug(
f"Attempting to get a slot for {organization_id} {campaign.id}, "
f"waited {wait_time:.1f}s"
)
# Wait before retrying
await asyncio.sleep(1)
async def acquire_from_number(
self,
organization_id: int,
telephony_configuration_id: int | None,
timeout: float = 600,
) -> Optional[str]:
"""
Acquire a from_number from the (org, telephony config) pool with retry.
Waits up to timeout seconds, polling every 1s.
Returns:
The acquired phone number as a string, or None if timeout is exceeded.
"""
wait_start = time.time()
while True:
from_number = await rate_limiter.acquire_from_number(
organization_id, telephony_configuration_id
)
if from_number:
return from_number
wait_time = time.time() - wait_start
if wait_time > timeout:
logger.warning(
f"From number pool exhausted for org {organization_id} "
f"config {telephony_configuration_id} after waiting "
f"{wait_time:.1f}s"
)
return None
logger.debug(
f"All from_numbers in use for org {organization_id} "
f"config {telephony_configuration_id}, waited {wait_time:.1f}s, "
"retrying..."
)
await asyncio.sleep(1)
async def release_call_slot(self, workflow_run_id: int) -> bool:
"""
Release concurrent slot and from_number when a call completes.
Called by Twilio webhooks or workflow completion handlers.
"""
slot_released = False
mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run_id)
if mapping:
org_id, slot_id = mapping
success = await rate_limiter.release_concurrent_slot(org_id, slot_id)
if success:
await rate_limiter.delete_workflow_slot_mapping(workflow_run_id)
logger.info(
f"Released concurrent slot for workflow run {workflow_run_id}"
)
slot_released = True
# Release from_number back to its (org, telephony config) pool
from_number_mapping = await rate_limiter.get_workflow_from_number_mapping(
workflow_run_id
)
if from_number_mapping:
fn_org_id, fn_number, fn_tcid = from_number_mapping
fn_success = await rate_limiter.release_from_number(
fn_org_id, fn_number, telephony_configuration_id=fn_tcid
)
if fn_success:
await rate_limiter.delete_workflow_from_number_mapping(workflow_run_id)
logger.info(
f"Released from_number {fn_number} for workflow run {workflow_run_id}"
)
return slot_released
# Global instance
campaign_call_dispatcher = CampaignCallDispatcher()