feat: limit campaign concurrency to number of CLIs

This commit is contained in:
Abhishek Kumar 2026-02-07 13:45:21 +05:30
parent 6711dcb3ea
commit 3cdede0f45
18 changed files with 846 additions and 462 deletions

View file

@ -11,10 +11,7 @@ from api.db.models import UserModel
from api.enums import OrganizationConfigurationKey
from api.services.auth.depends import get_user
from api.services.campaign.runner import campaign_runner_service
from api.services.campaign.source_validator import (
validate_csv_source,
validate_google_sheet_source,
)
from api.services.campaign.source_sync_factory import get_sync_service
from api.services.quota_service import check_dograh_quota
from api.services.storage import storage_fs
@ -35,6 +32,20 @@ async def _get_org_concurrent_limit(organization_id: int) -> int:
return DEFAULT_ORG_CONCURRENCY_LIMIT
async def _get_from_numbers_count(organization_id: int) -> int:
"""Get the number of configured from_numbers for an organization."""
try:
config = await db_client.get_configuration(
organization_id,
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
if config and config.value:
return len(config.value.get("from_numbers", []))
except Exception:
pass
return 0
class RetryConfigRequest(BaseModel):
enabled: bool = True
max_retries: int = Field(default=2, ge=0, le=10)
@ -163,24 +174,31 @@ async def create_campaign(
raise HTTPException(status_code=404, detail="Workflow not found")
# Validate source data (phone_number column and format)
if request.source_type == "csv":
validation_result = await validate_csv_source(request.source_id)
if not validation_result.is_valid:
raise HTTPException(status_code=400, detail=validation_result.error.message)
elif request.source_type == "google-sheet":
validation_result = await validate_google_sheet_source(
request.source_id, user.selected_organization_id
)
if not validation_result.is_valid:
raise HTTPException(status_code=400, detail=validation_result.error.message)
sync_service = get_sync_service(request.source_type)
validation_result = await sync_service.validate_source(
request.source_id, user.selected_organization_id
)
if not validation_result.is_valid:
raise HTTPException(status_code=400, detail=validation_result.error.message)
# Validate max_concurrency against org limit if provided
# Validate max_concurrency against effective limit (min of org limit and from_numbers count)
if request.max_concurrency is not None:
org_limit = await _get_org_concurrent_limit(user.selected_organization_id)
if request.max_concurrency > org_limit:
from_numbers_count = await _get_from_numbers_count(
user.selected_organization_id
)
effective_limit = (
min(org_limit, from_numbers_count) if from_numbers_count > 0 else org_limit
)
if request.max_concurrency > effective_limit:
if from_numbers_count > 0 and from_numbers_count < org_limit:
raise HTTPException(
status_code=400,
detail=f"max_concurrency ({request.max_concurrency}) cannot exceed {effective_limit}. You have {from_numbers_count} phone number(s) configured. Add more CLIs in telephony configuration to increase concurrency.",
)
raise HTTPException(
status_code=400,
detail=f"max_concurrency ({request.max_concurrency}) cannot exceed organization limit ({org_limit})",
detail=f"max_concurrency ({request.max_concurrency}) cannot exceed organization limit ({effective_limit})",
)
# Build retry_config dict if provided

View file

@ -225,6 +225,7 @@ class RetryConfigResponse(BaseModel):
class CampaignLimitsResponse(BaseModel):
concurrent_call_limit: int
from_numbers_count: int
default_retry_config: RetryConfigResponse
@ -251,7 +252,21 @@ async def get_campaign_limits(user: UserModel = Depends(get_user)):
except Exception:
pass
# Get from_numbers count from telephony configuration
from_numbers_count = 0
try:
telephony_config = await db_client.get_configuration(
user.selected_organization_id,
OrganizationConfigurationKey.TELEPHONY_CONFIGURATION.value,
)
if telephony_config and telephony_config.value:
from_numbers = telephony_config.value.get("from_numbers", [])
from_numbers_count = len(from_numbers)
except Exception:
pass
return CampaignLimitsResponse(
concurrent_call_limit=concurrent_limit,
from_numbers_count=from_numbers_count,
default_retry_config=RetryConfigResponse(**DEFAULT_CAMPAIGN_RETRY_CONFIG),
)

View file

@ -9,7 +9,10 @@ from api.constants import DEFAULT_ORG_CONCURRENCY_LIMIT
from api.db import db_client
from api.db.models import QueuedRunModel, WorkflowRunModel
from api.enums import OrganizationConfigurationKey, WorkflowRunState
from api.services.campaign.errors import ConcurrentSlotAcquisitionError
from api.services.campaign.errors import (
ConcurrentSlotAcquisitionError,
PhoneNumberPoolExhaustedError,
)
from api.services.campaign.rate_limiter import rate_limiter
from api.services.telephony.base import TelephonyProvider
from api.services.telephony.factory import get_telephony_provider
@ -71,6 +74,16 @@ class CampaignCallDispatcher:
logger.info(f"No more queued runs for campaign {campaign_id}")
return 0
# Initialize from_number pool for this org's provider
try:
provider = await self.get_telephony_provider(campaign.organization_id)
if provider.from_numbers:
await rate_limiter.initialize_from_number_pool(
campaign.organization_id, provider.from_numbers
)
except Exception as e:
logger.warning(f"Failed to initialize from_number pool: {e}")
processed_count = 0
for i, queued_run in enumerate(queued_runs):
try:
@ -103,7 +116,7 @@ class CampaignCallDispatcher:
campaign_id=campaign_id, processed_rows=campaign.processed_rows + 1
)
except ConcurrentSlotAcquisitionError:
except (ConcurrentSlotAcquisitionError, PhoneNumberPoolExhaustedError):
# Revert all unprocessed runs (current and remaining) back to queued
# so they can be picked up again when campaign is resumed
for unprocessed_run in queued_runs[i:]:
@ -146,6 +159,8 @@ class CampaignCallDispatcher:
self, queued_run: QueuedRunModel, campaign: any, slot_id: str
) -> Optional[WorkflowRunModel]:
"""Creates workflow run and initiates call. Requires a pre-acquired slot_id."""
from_number = None
# Get workflow details
workflow = await db_client.get_workflow_by_id(campaign.workflow_id)
if not workflow:
@ -168,6 +183,17 @@ class CampaignCallDispatcher:
provider = await self.get_telephony_provider(campaign.organization_id)
workflow_run_mode = provider.PROVIDER_NAME
# Acquire a unique from_number from the pool
from_number = await self.acquire_from_number(campaign.organization_id)
if from_number is None:
# Release concurrent slot before raising
await rate_limiter.release_concurrent_slot(
campaign.organization_id, slot_id
)
raise PhoneNumberPoolExhaustedError(
organization_id=campaign.organization_id
)
logger.info(f"Provider name: {provider.PROVIDER_NAME}")
logger.info(f"Queued run context: {queued_run.context_variables}")
@ -177,6 +203,7 @@ class CampaignCallDispatcher:
**queued_run.context_variables,
"campaign_id": campaign.id,
"provider": provider.PROVIDER_NAME,
"source_uuid": queued_run.source_uuid,
}
logger.info(f"Final initial_context: {initial_context}")
@ -198,11 +225,20 @@ class CampaignCallDispatcher:
await rate_limiter.store_workflow_slot_mapping(
workflow_run.id, campaign.organization_id, slot_id
)
# Store from_number mapping for cleanup on call completion
await rate_limiter.store_workflow_from_number_mapping(
workflow_run.id, campaign.organization_id, from_number
)
except Exception as e:
# Release slot on error
# Release slot and from_number on error
await rate_limiter.release_concurrent_slot(
campaign.organization_id, slot_id
)
if from_number:
await rate_limiter.release_from_number(
campaign.organization_id, from_number
)
raise
# Add "retry" tag if this is a retry call
@ -233,6 +269,7 @@ class CampaignCallDispatcher:
to_number=phone_number,
webhook_url=webhook_url,
workflow_run_id=workflow_run.id,
from_number=from_number,
workflow_id=campaign.workflow_id,
user_id=campaign.created_by,
)
@ -285,6 +322,15 @@ class CampaignCallDispatcher:
await rate_limiter.release_concurrent_slot(org_id, slot_id)
await rate_limiter.delete_workflow_slot_mapping(workflow_run.id)
# Release from_number on failure
from_number_mapping = await rate_limiter.get_workflow_from_number_mapping(
workflow_run.id
)
if from_number_mapping:
fn_org_id, fn_number = from_number_mapping
await rate_limiter.release_from_number(fn_org_id, fn_number)
await rate_limiter.delete_workflow_from_number_mapping(workflow_run.id)
raise
return workflow_run
@ -380,11 +426,42 @@ class CampaignCallDispatcher:
# Wait before retrying
await asyncio.sleep(1)
async def acquire_from_number(
self, organization_id: int, timeout: float = 60
) -> Optional[str]:
"""
Acquire a from_number from the pool with retry.
Waits up to timeout seconds, polling every 1s.
Returns the phone number or None if timeout is exceeded.
"""
wait_start = time.time()
while True:
from_number = await rate_limiter.acquire_from_number(organization_id)
if from_number:
return from_number
wait_time = time.time() - wait_start
if wait_time > timeout:
logger.warning(
f"From number pool exhausted for org {organization_id} "
f"after waiting {wait_time:.1f}s"
)
return None
logger.debug(
f"All from_numbers in use for org {organization_id}, "
f"waited {wait_time:.1f}s, retrying..."
)
await asyncio.sleep(1)
async def release_call_slot(self, workflow_run_id: int) -> bool:
"""
Release concurrent slot when a call completes.
Release concurrent slot and from_number when a call completes.
Called by Twilio webhooks or workflow completion handlers.
"""
slot_released = False
mapping = await rate_limiter.get_workflow_slot_mapping(workflow_run_id)
if mapping:
org_id, slot_id = mapping
@ -394,8 +471,22 @@ class CampaignCallDispatcher:
logger.info(
f"Released concurrent slot for workflow run {workflow_run_id}"
)
return success
return False
slot_released = True
# Release from_number back to pool
from_number_mapping = await rate_limiter.get_workflow_from_number_mapping(
workflow_run_id
)
if from_number_mapping:
fn_org_id, fn_number = from_number_mapping
fn_success = await rate_limiter.release_from_number(fn_org_id, fn_number)
if fn_success:
await rate_limiter.delete_workflow_from_number_mapping(workflow_run_id)
logger.info(
f"Released from_number {fn_number} for workflow run {workflow_run_id}"
)
return slot_released
# Global instance

View file

@ -14,3 +14,14 @@ class ConcurrentSlotAcquisitionError(Exception):
f"Failed to acquire concurrent slot for org {organization_id}, "
f"campaign {campaign_id} after waiting {wait_time:.1f}s"
)
class PhoneNumberPoolExhaustedError(Exception):
"""Raised when no phone numbers are available in the pool for outbound calls."""
def __init__(self, organization_id: int):
self.organization_id = organization_id
super().__init__(
f"All phone numbers are in use for org {organization_id}. "
f"No available from_number in pool."
)

View file

@ -245,6 +245,166 @@ class RateLimiter:
logger.error(f"Error deleting workflow slot mapping: {e}")
return False
# ======== FROM NUMBER POOL METHODS ========
async def initialize_from_number_pool(
self, organization_id: int, from_numbers: list[str]
) -> bool:
"""
Initialize the from_number pool for an organization.
Uses ZADD NX so it won't overwrite numbers that are already in use.
Args:
organization_id: The organization ID
from_numbers: List of phone numbers to add to the pool
"""
if not from_numbers:
return False
redis_client = await self._get_redis()
key = f"from_number_pool:{organization_id}"
try:
# ZADD NX: only add members that don't already exist (preserves in-use scores)
members = {number: 0 for number in from_numbers}
await redis_client.zadd(key, members, nx=True)
await redis_client.expire(key, 3600) # 1 hour TTL
return True
except Exception as e:
logger.error(f"Error initializing from_number pool: {e}")
return False
async def acquire_from_number(self, organization_id: int) -> Optional[str]:
"""
Atomically acquire an available from_number from the pool.
Cleans stale entries (score > 0 and older than 30 min) before acquiring.
Returns the phone number if available, None if all numbers are in use.
"""
redis_client = await self._get_redis()
key = f"from_number_pool:{organization_id}"
now = time.time()
stale_cutoff = now - self.stale_call_timeout
lua_script = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local stale_cutoff = tonumber(ARGV[2])
-- Clean stale entries: members with score > 0 and score < stale_cutoff
local stale = redis.call('ZRANGEBYSCORE', key, 1, stale_cutoff)
for i, member in ipairs(stale) do
redis.call('ZADD', key, 0, member)
end
-- Find an available number (score == 0)
local available = redis.call('ZRANGEBYSCORE', key, 0, 0, 'LIMIT', 0, 1)
if #available == 0 then
return nil
end
-- Mark as in-use with current timestamp
redis.call('ZADD', key, now, available[1])
return available[1]
"""
try:
result = await redis_client.eval(lua_script, 1, key, now, stale_cutoff)
if result:
logger.debug(f"Acquired from_number {result} for org {organization_id}")
return result
except Exception as e:
logger.error(f"Error acquiring from_number: {e}")
return None
async def release_from_number(self, organization_id: int, from_number: str) -> bool:
"""
Release a from_number back to the pool by setting its score to 0.
Harmless if already released (score already 0).
"""
if not from_number:
return False
redis_client = await self._get_redis()
key = f"from_number_pool:{organization_id}"
lua_script = """
local key = KEYS[1]
local from_number = ARGV[1]
local score = redis.call('ZSCORE', key, from_number)
if score then
redis.call('ZADD', key, 0, from_number)
return 1
end
return 0
"""
try:
result = await redis_client.eval(lua_script, 1, key, from_number)
if result:
logger.debug(
f"Released from_number {from_number} for org {organization_id}"
)
return bool(result)
except Exception as e:
logger.error(f"Error releasing from_number: {e}")
return False
async def store_workflow_from_number_mapping(
self, workflow_run_id: int, organization_id: int, from_number: str
) -> bool:
"""
Store the mapping between workflow_run_id and its from_number.
Used for cleanup when calls complete.
"""
redis_client = await self._get_redis()
mapping_key = f"workflow_from_number:{workflow_run_id}"
try:
await redis_client.hset(
mapping_key,
mapping={"org_id": organization_id, "from_number": from_number},
)
await redis_client.expire(mapping_key, 1800) # 30 min TTL
return True
except Exception as e:
logger.error(f"Error storing workflow from_number mapping: {e}")
return False
async def get_workflow_from_number_mapping(
self, workflow_run_id: int
) -> Optional[tuple[int, str]]:
"""
Get the from_number mapping for a workflow run.
Returns (organization_id, from_number) tuple or None if not found.
"""
redis_client = await self._get_redis()
mapping_key = f"workflow_from_number:{workflow_run_id}"
try:
mapping = await redis_client.hgetall(mapping_key)
if mapping and "org_id" in mapping and "from_number" in mapping:
return (int(mapping["org_id"]), mapping["from_number"])
return None
except Exception as e:
logger.error(f"Error getting workflow from_number mapping: {e}")
return None
async def delete_workflow_from_number_mapping(self, workflow_run_id: int) -> bool:
"""
Delete the workflow from_number mapping after releasing the number.
"""
redis_client = await self._get_redis()
mapping_key = f"workflow_from_number:{workflow_run_id}"
try:
deleted = await redis_client.delete(mapping_key)
return bool(deleted)
except Exception as e:
logger.error(f"Error deleting workflow from_number mapping: {e}")
return False
async def close(self):
"""Close Redis connection"""
if self.redis_client:

View file

@ -1,12 +1,127 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from loguru import logger
@dataclass
class ValidationError:
"""Represents a validation error with details."""
message: str
invalid_rows: Optional[List[int]] = None
@dataclass
class ValidationResult:
"""Result of source validation."""
is_valid: bool
error: Optional[ValidationError] = None
class CampaignSourceSyncService(ABC):
"""Base class for campaign data source synchronization"""
@staticmethod
def normalize_headers(headers: List[str]) -> List[str]:
"""Normalize headers by stripping whitespace and lowercasing."""
return [h.strip().lower() for h in headers]
@staticmethod
def validate_source_data(
headers: List[str], rows: List[List[str]]
) -> ValidationResult:
"""
Validate source data for campaign creation.
Args:
headers: List of column headers
rows: List of data rows (excluding header)
Returns:
ValidationResult with is_valid=True if valid, or error details if invalid
"""
normalized_headers = CampaignSourceSyncService.normalize_headers(headers)
# Check for phone_number column
if "phone_number" not in normalized_headers:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="Source must contain a 'phone_number' column"
),
)
phone_number_idx = normalized_headers.index("phone_number")
# Validate phone numbers in all data rows
invalid_rows = []
for row_idx, row in enumerate(
rows, start=2
): # Start at 2 (1-indexed, skip header)
if len(row) <= phone_number_idx:
continue # Skip rows that don't have enough columns
phone_number = row[phone_number_idx].strip()
if phone_number and not phone_number.startswith("+"):
invalid_rows.append(row_idx)
if invalid_rows:
# Limit the number of rows shown in error message
if len(invalid_rows) > 5:
rows_str = f"{', '.join(map(str, invalid_rows[:5]))} and {len(invalid_rows) - 5} more"
else:
rows_str = ", ".join(map(str, invalid_rows))
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Invalid phone numbers in rows: {rows_str}. All phone numbers must include country code (start with '+')",
invalid_rows=invalid_rows,
),
)
# Check for duplicate phone numbers
seen_phones: dict[str, int] = {} # phone -> first row where it appeared
duplicate_rows = []
for row_idx, row in enumerate(rows, start=2):
if len(row) <= phone_number_idx:
continue
phone_number = row[phone_number_idx].strip()
if not phone_number:
continue
if phone_number in seen_phones:
duplicate_rows.append(row_idx)
else:
seen_phones[phone_number] = row_idx
if duplicate_rows:
if len(duplicate_rows) > 5:
rows_str = f"{', '.join(map(str, duplicate_rows[:5]))} and {len(duplicate_rows) - 5} more"
else:
rows_str = ", ".join(map(str, duplicate_rows))
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Duplicate phone numbers found in rows: {rows_str}. Phone numbers in a campaign must be unique.",
invalid_rows=duplicate_rows,
),
)
return ValidationResult(is_valid=True)
@abstractmethod
async def validate_source(
self, source_id: str, organization_id: Optional[int] = None
) -> ValidationResult:
"""Validate source data before campaign creation."""
pass
@abstractmethod
async def sync_source_data(self, campaign_id: int) -> int:
"""
@ -16,11 +131,6 @@ class CampaignSourceSyncService(ABC):
"""
pass
@abstractmethod
async def validate_source_schema(self, source_config: Dict[str, Any]) -> bool:
"""Validates required fields exist in source"""
pass
async def get_source_credentials(
self, organization_id: int, source_type: str
) -> Dict[str, Any]:

View file

@ -1,295 +0,0 @@
"""
Source validation for campaign data sources (CSV, Google Sheets).
Validates that:
- phone_number column exists
- All phone numbers include country code (start with '+')
"""
import csv
from dataclasses import dataclass
from io import StringIO
from typing import List, Optional
import httpx
from loguru import logger
from api.services.storage import storage_fs
@dataclass
class ValidationError:
"""Represents a validation error with details."""
message: str
invalid_rows: Optional[List[int]] = None
@dataclass
class ValidationResult:
"""Result of source validation."""
is_valid: bool
error: Optional[ValidationError] = None
def _validate_source_data(
headers: List[str], rows: List[List[str]]
) -> ValidationResult:
"""
Validate source data for campaign creation.
Args:
headers: List of column headers
rows: List of data rows (excluding header)
Returns:
ValidationResult with is_valid=True if valid, or error details if invalid
"""
# Normalize headers to lowercase for comparison
normalized_headers = [h.strip().lower() for h in headers]
# Check for phone_number column
if "phone_number" not in normalized_headers:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="Source must contain a 'phone_number' column"
),
)
phone_number_idx = normalized_headers.index("phone_number")
# Validate phone numbers in all data rows
invalid_rows = []
for row_idx, row in enumerate(rows, start=2): # Start at 2 (1-indexed, skip header)
if len(row) <= phone_number_idx:
continue # Skip rows that don't have enough columns
phone_number = row[phone_number_idx].strip()
if phone_number and not phone_number.startswith("+"):
invalid_rows.append(row_idx)
if invalid_rows:
# Limit the number of rows shown in error message
if len(invalid_rows) > 5:
rows_str = f"{', '.join(map(str, invalid_rows[:5]))} and {len(invalid_rows) - 5} more"
else:
rows_str = ", ".join(map(str, invalid_rows))
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Invalid phone numbers in rows: {rows_str}. All phone numbers must include country code (start with '+')",
invalid_rows=invalid_rows,
),
)
# Check for duplicate phone numbers
seen_phones: dict[str, int] = {} # phone -> first row where it appeared
duplicate_rows = []
for row_idx, row in enumerate(rows, start=2):
if len(row) <= phone_number_idx:
continue
phone_number = row[phone_number_idx].strip()
if not phone_number:
continue
if phone_number in seen_phones:
duplicate_rows.append(row_idx)
else:
seen_phones[phone_number] = row_idx
if duplicate_rows:
if len(duplicate_rows) > 5:
rows_str = f"{', '.join(map(str, duplicate_rows[:5]))} and {len(duplicate_rows) - 5} more"
else:
rows_str = ", ".join(map(str, duplicate_rows))
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Duplicate phone numbers found in rows: {rows_str}. Phone numbers in a campaign must be unique.",
invalid_rows=duplicate_rows,
),
)
return ValidationResult(is_valid=True)
async def validate_csv_source(file_key: str) -> ValidationResult:
"""
Validate a CSV source file for campaign creation.
Args:
file_key: S3/MinIO file key for the CSV file
Returns:
ValidationResult with is_valid=True if valid, or error details if invalid
"""
# Get download URL using internal endpoint
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
return ValidationResult(
is_valid=False,
error=ValidationError(message=f"Failed to access CSV file: {file_key}"),
)
# Download CSV file
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
csv_content = response.text
except httpx.HTTPError as e:
logger.error(f"Failed to download CSV file for validation: {e}")
return ValidationResult(
is_valid=False,
error=ValidationError(
message="Failed to download CSV file for validation"
),
)
# Parse CSV
try:
csv_file = StringIO(csv_content)
reader = csv.reader(csv_file)
rows = list(reader)
except Exception as e:
logger.error(f"Failed to parse CSV: {e}")
return ValidationResult(
is_valid=False,
error=ValidationError(message=f"Invalid CSV format: {str(e)}"),
)
if not rows or len(rows) < 2:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="CSV file must have a header row and at least one data row"
),
)
headers = rows[0]
data_rows = rows[1:]
return _validate_source_data(headers, data_rows)
async def validate_google_sheet_source(
sheet_url: str, organization_id: int
) -> ValidationResult:
"""
Validate a Google Sheet source for campaign creation.
Args:
sheet_url: Google Sheets URL
organization_id: Organization ID to get integration credentials
Returns:
ValidationResult with is_valid=True if valid, or error details if invalid
"""
import re
from api.db import db_client
from api.services.integrations.nango import NangoService
# Extract sheet ID from URL
pattern = r"/spreadsheets/d/([a-zA-Z0-9-_]+)"
match = re.search(pattern, sheet_url)
if not match:
return ValidationResult(
is_valid=False,
error=ValidationError(message=f"Invalid Google Sheets URL: {sheet_url}"),
)
sheet_id = match.group(1)
# Get Google Sheets integration for the organization
integrations = await db_client.get_integrations_by_organization_id(organization_id)
integration = None
for intg in integrations:
if intg.provider == "google-sheet" and intg.is_active:
integration = intg
break
if not integration:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="Google Sheets integration not found or inactive"
),
)
# Get OAuth token via Nango
try:
nango_service = NangoService()
token_data = await nango_service.get_access_token(
connection_id=integration.integration_id, provider_config_key="google-sheet"
)
access_token = token_data["credentials"]["access_token"]
except Exception as e:
logger.error(f"Failed to get Google Sheets access token: {e}")
return ValidationResult(
is_valid=False,
error=ValidationError(message="Failed to authenticate with Google Sheets"),
)
# Fetch sheet data
sheets_api_base = "https://sheets.googleapis.com/v4/spreadsheets"
async with httpx.AsyncClient() as client:
try:
# Get sheet metadata to find the first sheet name
metadata_url = f"{sheets_api_base}/{sheet_id}"
headers = {"Authorization": f"Bearer {access_token}"}
response = await client.get(metadata_url, headers=headers)
response.raise_for_status()
metadata = response.json()
if not metadata.get("sheets"):
return ValidationResult(
is_valid=False,
error=ValidationError(message="No sheets found in the spreadsheet"),
)
sheet_name = metadata["sheets"][0]["properties"]["title"]
# Fetch all data from sheet
data_url = f"{sheets_api_base}/{sheet_id}/values/{sheet_name}!A:Z"
response = await client.get(data_url, headers=headers)
response.raise_for_status()
data = response.json()
rows = data.get("values", [])
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching Google Sheet: {e.response.status_code}")
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Failed to fetch Google Sheet data: {e.response.status_code}"
),
)
except Exception as e:
logger.error(f"Error fetching Google Sheet: {e}")
return ValidationResult(
is_valid=False,
error=ValidationError(message="Failed to fetch Google Sheet data"),
)
if not rows or len(rows) < 2:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="Google Sheet must have a header row and at least one data row"
),
)
headers = rows[0]
data_rows = rows[1:]
return _validate_source_data(headers, data_rows)

View file

@ -1,19 +1,68 @@
import csv
import hashlib
from io import StringIO
from typing import Any, Dict, List
from typing import List, Optional
import httpx
from loguru import logger
from api.db import db_client
from api.services.campaign.source_sync import CampaignSourceSyncService
from api.services.campaign.source_sync import (
CampaignSourceSyncService,
ValidationError,
ValidationResult,
)
from api.services.storage import storage_fs
class CSVSyncService(CampaignSourceSyncService):
"""Implementation for CSV file synchronization"""
async def _fetch_csv_data(self, file_key: str) -> List[List[str]]:
"""Download and parse CSV file from storage. Returns all rows including header."""
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
raise ValueError(f"Failed to access CSV file: {file_key}")
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
csv_content = response.text
except httpx.HTTPError as e:
logger.error(f"Failed to download CSV file: {e} for url: {signed_url}")
raise ValueError(f"Failed to download CSV file from storage: {str(e)}")
return self._parse_csv(csv_content)
async def validate_source(
self, source_id: str, organization_id: Optional[int] = None
) -> ValidationResult:
"""Validate a CSV source file for campaign creation."""
try:
csv_data = await self._fetch_csv_data(source_id)
except ValueError as e:
return ValidationResult(
is_valid=False,
error=ValidationError(message=str(e)),
)
if not csv_data or len(csv_data) < 2:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="CSV file must have a header row and at least one data row"
),
)
headers = csv_data[0]
data_rows = csv_data[1:]
return self.validate_source_data(headers, data_rows)
async def sync_source_data(self, campaign_id: int) -> int:
"""
Fetches data from CSV file in S3/MinIO and creates queued_runs
@ -23,39 +72,20 @@ class CSVSyncService(CampaignSourceSyncService):
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
# 1. Get download URL using internal endpoint (for container-to-container access)
file_key = campaign.source_id
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
raise ValueError(f"Failed to generate download URL for file: {file_key}")
# 2. Download CSV file
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
csv_content = response.text
except httpx.HTTPError as e:
logger.error(f"Failed to download CSV file: {e} for url: {signed_url}")
raise ValueError(f"Failed to download CSV file from storage: {str(e)}")
# 3. Parse CSV
csv_data = self._parse_csv(csv_content)
csv_data = await self._fetch_csv_data(file_key)
if not csv_data or len(csv_data) < 2:
logger.warning(f"No data found in CSV for campaign {campaign_id}")
return 0
headers = csv_data[0] # First row is headers
rows = csv_data[1:] # Rest is data
headers = self.normalize_headers(csv_data[0])
rows = csv_data[1:]
# 4. Create hash of file_key for consistent source_uuid prefix
# Create hash of file_key for consistent source_uuid prefix
file_hash = hashlib.md5(file_key.encode()).hexdigest()[:8]
# 5. Convert to queued_runs
# Convert to queued_runs
queued_runs = []
for idx, row_values in enumerate(rows, 1):
# Pad row to match headers length
@ -81,14 +111,14 @@ class CSVSyncService(CampaignSourceSyncService):
}
)
# 6. Bulk insert
# Bulk insert
if queued_runs:
await db_client.bulk_create_queued_runs(queued_runs)
logger.info(
f"Created {len(queued_runs)} queued runs for campaign {campaign_id}"
)
# 7. Update campaign total_rows
# Update campaign total_rows
await db_client.update_campaign(
campaign_id=campaign_id,
total_rows=len(queued_runs),
@ -106,35 +136,3 @@ class CSVSyncService(CampaignSourceSyncService):
except Exception as e:
logger.error(f"Failed to parse CSV: {e}")
raise ValueError(f"Invalid CSV format: {str(e)}")
async def validate_source_schema(self, source_config: Dict[str, Any]) -> bool:
"""Validate that required columns exist in CSV"""
required_columns = ["phone_number", "first_name", "last_name"]
file_key = source_config.get("source_id")
if not file_key:
return False
# Get download URL using internal endpoint
signed_url = await storage_fs.aget_signed_url(
file_key, expiration=3600, use_internal_endpoint=True
)
if not signed_url:
return False
# Download just enough to get headers
async with httpx.AsyncClient() as client:
try:
response = await client.get(signed_url)
response.raise_for_status()
# Get just the first line for headers
first_line = response.text.split("\n")[0]
csv_file = StringIO(first_line)
reader = csv.reader(csv_file)
headers = next(reader, [])
return all(col in headers for col in required_columns)
except Exception as e:
logger.error(f"Failed to validate CSV schema: {e}")
return False

View file

@ -1,11 +1,15 @@
import re
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
import httpx
from loguru import logger
from api.db import db_client
from api.services.campaign.source_sync import CampaignSourceSyncService
from api.services.campaign.source_sync import (
CampaignSourceSyncService,
ValidationError,
ValidationResult,
)
from api.services.integrations.nango import NangoService
@ -16,18 +20,10 @@ class GoogleSheetsSyncService(CampaignSourceSyncService):
self.nango_service = NangoService()
self.sheets_api_base = "https://sheets.googleapis.com/v4/spreadsheets"
async def sync_source_data(self, campaign_id: int) -> int:
"""
Fetches data from Google Sheets and creates queued_runs
"""
# Get campaign
campaign = await db_client.get_campaign_by_id(campaign_id)
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
# 1. Get Google Sheets integration for the organization
async def _get_access_token(self, organization_id: int) -> str:
"""Get OAuth access token for Google Sheets via Nango."""
integrations = await db_client.get_integrations_by_organization_id(
campaign.organization_id
organization_id
)
integration = None
for intg in integrations:
@ -38,39 +34,107 @@ class GoogleSheetsSyncService(CampaignSourceSyncService):
if not integration:
raise ValueError("Google Sheets integration not found or inactive")
# 2. Get OAuth token via Nango using the integration_id (which is the Nango connection ID)
token_data = await self.nango_service.get_access_token(
connection_id=integration.integration_id, provider_config_key="google-sheet"
)
access_token = token_data["credentials"]["access_token"]
return token_data["credentials"]["access_token"]
# 3. Extract sheet ID from URL
sheet_id = self._extract_sheet_id(campaign.source_id)
async def _fetch_all_sheet_data(
self, sheet_url: str, organization_id: int
) -> List[List[str]]:
"""Fetch all data from a Google Sheet. Returns all rows including header."""
access_token = await self._get_access_token(organization_id)
sheet_id = self._extract_sheet_id(sheet_url)
# 4. Get sheet metadata (to find data range)
metadata = await self._get_sheet_metadata(sheet_id, access_token)
if not metadata.get("sheets"):
raise ValueError("No sheets found in the spreadsheet")
sheet_name = metadata["sheets"][0]["properties"]["title"]
# 5. Fetch all data from sheet
sheet_data = await self._fetch_sheet_data(
sheet_id,
f"{sheet_name}!A:Z", # Get all columns A-Z
access_token,
return await self._fetch_sheet_data(sheet_id, f"{sheet_name}!A:Z", access_token)
async def validate_source(
self, source_id: str, organization_id: Optional[int] = None
) -> ValidationResult:
"""Validate a Google Sheet source for campaign creation."""
if organization_id is None:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="Organization ID is required for Google Sheets validation"
),
)
# Validate URL format first
pattern = r"/spreadsheets/d/([a-zA-Z0-9-_]+)"
if not re.search(pattern, source_id):
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Invalid Google Sheets URL: {source_id}"
),
)
try:
rows = await self._fetch_all_sheet_data(source_id, organization_id)
except ValueError as e:
return ValidationResult(
is_valid=False,
error=ValidationError(message=str(e)),
)
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching Google Sheet: {e.response.status_code}")
return ValidationResult(
is_valid=False,
error=ValidationError(
message=f"Failed to fetch Google Sheet data: {e.response.status_code}"
),
)
except Exception as e:
logger.error(f"Error fetching Google Sheet: {e}")
return ValidationResult(
is_valid=False,
error=ValidationError(message="Failed to fetch Google Sheet data"),
)
if not rows or len(rows) < 2:
return ValidationResult(
is_valid=False,
error=ValidationError(
message="Google Sheet must have a header row and at least one data row"
),
)
headers = rows[0]
data_rows = rows[1:]
return self.validate_source_data(headers, data_rows)
async def sync_source_data(self, campaign_id: int) -> int:
"""
Fetches data from Google Sheets and creates queued_runs
"""
# Get campaign
campaign = await db_client.get_campaign_by_id(campaign_id)
if not campaign:
raise ValueError(f"Campaign {campaign_id} not found")
rows = await self._fetch_all_sheet_data(
campaign.source_id, campaign.organization_id
)
# 6. Convert to queued_runs
if not sheet_data or len(sheet_data) < 2:
if not rows or len(rows) < 2:
logger.warning(f"No data found in sheet for campaign {campaign_id}")
return 0
headers = sheet_data[0] # First row is headers
rows = sheet_data[1:] # Rest is data
headers = self.normalize_headers(rows[0])
data_rows = rows[1:]
sheet_id = self._extract_sheet_id(campaign.source_id)
queued_runs = []
for idx, row_values in enumerate(rows, 1):
for idx, row_values in enumerate(data_rows, 1):
# Pad row to match headers length
padded_row = row_values + [""] * (len(headers) - len(row_values))
@ -94,14 +158,14 @@ class GoogleSheetsSyncService(CampaignSourceSyncService):
}
)
# 7. Bulk insert
# Bulk insert
if queued_runs:
await db_client.bulk_create_queued_runs(queued_runs)
logger.info(
f"Created {len(queued_runs)} queued runs for campaign {campaign_id}"
)
# 8. Update campaign total_rows
# Update campaign total_rows
await db_client.update_campaign(
campaign_id=campaign_id,
total_rows=len(queued_runs),
@ -158,23 +222,3 @@ class GoogleSheetsSyncService(CampaignSourceSyncService):
if match:
return match.group(1)
raise ValueError(f"Invalid Google Sheets URL: {sheet_url}")
async def validate_source_schema(self, source_config: Dict[str, Any]) -> bool:
"""Validate that required columns exist"""
required_columns = ["phone_number", "first_name", "last_name"]
# Fetch just the header row
sheet_id = self._extract_sheet_id(source_config["source_id"])
access_token = source_config["access_token"]
headers = await self._fetch_sheet_data(
sheet_id,
"A1:Z1", # Just first row
access_token,
)
if not headers:
return False
header_row = headers[0]
return all(col in header_row for col in required_columns)

View file

@ -57,6 +57,7 @@ class TelephonyProvider(ABC):
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
@ -66,6 +67,7 @@ class TelephonyProvider(ABC):
to_number: The destination phone number
webhook_url: The URL to receive call events
workflow_run_id: Optional workflow run ID for tracking
from_number: Optional caller ID to use. If None, provider selects randomly.
**kwargs: Provider-specific additional parameters
Returns:

View file

@ -63,6 +63,7 @@ class CloudonixProvider(TelephonyProvider):
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
@ -76,15 +77,15 @@ class CloudonixProvider(TelephonyProvider):
endpoint = f"{self.base_url}/calls/{self.domain_id}/application"
# Select a random phone number for caller-id (REQUIRED by Cloudonix)
if not self.from_numbers:
raise ValueError(
"No phone numbers configured for Cloudonix provider. "
"At least one phone number is required as 'caller-id' for outbound calls. "
"Please configure phone numbers in the telephony settings."
)
from_number = random.choice(self.from_numbers)
# Use provided from_number or select a random one (REQUIRED by Cloudonix)
if from_number is None:
if not self.from_numbers:
raise ValueError(
"No phone numbers configured for Cloudonix provider. "
"At least one phone number is required as 'caller-id' for outbound calls. "
"Please configure phone numbers in the telephony settings."
)
from_number = random.choice(self.from_numbers)
logger.info(
f"Selected phone number {from_number} for outbound call to {to_number}"
)

View file

@ -57,6 +57,7 @@ class TwilioProvider(TelephonyProvider):
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
@ -67,8 +68,9 @@ class TwilioProvider(TelephonyProvider):
endpoint = f"{self.base_url}/Calls.json"
# Select a random phone number
from_number = random.choice(self.from_numbers)
# Use provided from_number or select a random one
if from_number is None:
from_number = random.choice(self.from_numbers)
logger.info(f"Selected phone number {from_number} for outbound call")
# Prepare call data

View file

@ -56,6 +56,7 @@ class VobizProvider(TelephonyProvider):
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
@ -72,8 +73,9 @@ class VobizProvider(TelephonyProvider):
endpoint = f"{self.base_url}/v1/Account/{self.auth_id}/Call/"
# Select a random phone number
from_number = random.choice(self.from_numbers)
# Use provided from_number or select a random one
if from_number is None:
from_number = random.choice(self.from_numbers)
logger.info(f"Selected Vobiz phone number {from_number} for outbound call")
# Remove + prefix if present (Vobiz expects E.164 without +)

View file

@ -78,6 +78,7 @@ class VonageProvider(TelephonyProvider):
to_number: str,
webhook_url: str,
workflow_run_id: Optional[int] = None,
from_number: Optional[str] = None,
**kwargs: Any,
) -> CallInitiationResult:
"""
@ -88,8 +89,9 @@ class VonageProvider(TelephonyProvider):
endpoint = f"{self.base_url}/v1/calls"
# Select a random phone number
from_number = random.choice(self.from_numbers)
# Use provided from_number or select a random one
if from_number is None:
from_number = random.choice(self.from_numbers)
# Remove '+' prefix for Vonage
from_number = from_number.replace("+", "")
to_number = to_number.replace("+", "")

View file

@ -195,6 +195,24 @@ def mock_rate_limiter():
async def mock_delete_mapping(*args, **kwargs):
pass
async def mock_initialize_from_number_pool(*args, **kwargs):
return True
async def mock_acquire_from_number(*args, **kwargs):
return "+15551234567"
async def mock_release_from_number(*args, **kwargs):
return True
async def mock_store_from_number_mapping(*args, **kwargs):
return True
async def mock_get_from_number_mapping(*args, **kwargs):
return None
async def mock_delete_from_number_mapping(*args, **kwargs):
return True
return {
"acquire_token": mock_acquire_token,
"try_acquire_concurrent_slot": mock_try_acquire_slot,
@ -202,6 +220,12 @@ def mock_rate_limiter():
"store_workflow_slot_mapping": mock_store_mapping,
"get_workflow_slot_mapping": mock_get_mapping,
"delete_workflow_slot_mapping": mock_delete_mapping,
"initialize_from_number_pool": mock_initialize_from_number_pool,
"acquire_from_number": mock_acquire_from_number,
"release_from_number": mock_release_from_number,
"store_workflow_from_number_mapping": mock_store_from_number_mapping,
"get_workflow_from_number_mapping": mock_get_from_number_mapping,
"delete_workflow_from_number_mapping": mock_delete_from_number_mapping,
}
@ -242,6 +266,24 @@ class TestProcessBatchBasic:
mock_rl.delete_workflow_slot_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_slot_mapping"]
)
mock_rl.initialize_from_number_pool = AsyncMock(
side_effect=mock_rate_limiter["initialize_from_number_pool"]
)
mock_rl.acquire_from_number = AsyncMock(
side_effect=mock_rate_limiter["acquire_from_number"]
)
mock_rl.release_from_number = AsyncMock(
side_effect=mock_rate_limiter["release_from_number"]
)
mock_rl.store_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["store_workflow_from_number_mapping"]
)
mock_rl.get_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["get_workflow_from_number_mapping"]
)
mock_rl.delete_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_from_number_mapping"]
)
dispatcher = CampaignCallDispatcher()
@ -307,6 +349,24 @@ class TestProcessBatchConcurrency:
mock_rl.delete_workflow_slot_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_slot_mapping"]
)
mock_rl.initialize_from_number_pool = AsyncMock(
side_effect=mock_rate_limiter["initialize_from_number_pool"]
)
mock_rl.acquire_from_number = AsyncMock(
side_effect=mock_rate_limiter["acquire_from_number"]
)
mock_rl.release_from_number = AsyncMock(
side_effect=mock_rate_limiter["release_from_number"]
)
mock_rl.store_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["store_workflow_from_number_mapping"]
)
mock_rl.get_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["get_workflow_from_number_mapping"]
)
mock_rl.delete_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_from_number_mapping"]
)
dispatcher = CampaignCallDispatcher()
@ -379,6 +439,24 @@ class TestProcessBatchConcurrency:
mock_rl.delete_workflow_slot_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_slot_mapping"]
)
mock_rl.initialize_from_number_pool = AsyncMock(
side_effect=mock_rate_limiter["initialize_from_number_pool"]
)
mock_rl.acquire_from_number = AsyncMock(
side_effect=mock_rate_limiter["acquire_from_number"]
)
mock_rl.release_from_number = AsyncMock(
side_effect=mock_rate_limiter["release_from_number"]
)
mock_rl.store_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["store_workflow_from_number_mapping"]
)
mock_rl.get_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["get_workflow_from_number_mapping"]
)
mock_rl.delete_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_from_number_mapping"]
)
dispatcher = CampaignCallDispatcher()
@ -450,6 +528,24 @@ class TestProcessBatchConcurrency:
mock_rl.delete_workflow_slot_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_slot_mapping"]
)
mock_rl.initialize_from_number_pool = AsyncMock(
side_effect=mock_rate_limiter["initialize_from_number_pool"]
)
mock_rl.acquire_from_number = AsyncMock(
side_effect=mock_rate_limiter["acquire_from_number"]
)
mock_rl.release_from_number = AsyncMock(
side_effect=mock_rate_limiter["release_from_number"]
)
mock_rl.store_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["store_workflow_from_number_mapping"]
)
mock_rl.get_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["get_workflow_from_number_mapping"]
)
mock_rl.delete_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_from_number_mapping"]
)
dispatcher = CampaignCallDispatcher()
@ -513,6 +609,24 @@ class TestProcessBatchConcurrency:
mock_rl.delete_workflow_slot_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_slot_mapping"]
)
mock_rl.initialize_from_number_pool = AsyncMock(
side_effect=mock_rate_limiter["initialize_from_number_pool"]
)
mock_rl.acquire_from_number = AsyncMock(
side_effect=mock_rate_limiter["acquire_from_number"]
)
mock_rl.release_from_number = AsyncMock(
side_effect=mock_rate_limiter["release_from_number"]
)
mock_rl.store_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["store_workflow_from_number_mapping"]
)
mock_rl.get_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["get_workflow_from_number_mapping"]
)
mock_rl.delete_workflow_from_number_mapping = AsyncMock(
side_effect=mock_rate_limiter["delete_workflow_from_number_mapping"]
)
dispatcher = CampaignCallDispatcher()

View file

@ -50,6 +50,7 @@ export default function NewCampaignPage() {
// Advanced settings state
const [showAdvancedSettings, setShowAdvancedSettings] = useState(false);
const [orgConcurrentLimit, setOrgConcurrentLimit] = useState<number>(2);
const [fromNumbersCount, setFromNumbersCount] = useState<number>(0);
const [maxConcurrency, setMaxConcurrency] = useState<string>('');
// Retry config state
const [retryEnabled, setRetryEnabled] = useState(true);
@ -102,6 +103,7 @@ export default function NewCampaignPage() {
if (response.data) {
setOrgConcurrentLimit(response.data.concurrent_call_limit);
setFromNumbersCount(response.data.from_numbers_count);
// Initialize retry config from defaults
const retryConfig = response.data.default_retry_config;
setRetryEnabled(retryConfig.enabled);
@ -124,6 +126,11 @@ export default function NewCampaignPage() {
}
}, [fetchWorkflows, fetchCampaignLimits, user]);
// Effective concurrency limit considering both org limit and available CLIs
const effectiveLimit = fromNumbersCount > 0
? Math.min(orgConcurrentLimit, fromNumbersCount)
: orgConcurrentLimit;
// Handle form submission
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
@ -141,8 +148,12 @@ export default function NewCampaignPage() {
toast.error('Max concurrent calls must be between 1 and 100');
return;
}
if (maxConcurrencyValue > orgConcurrentLimit) {
toast.error(`Max concurrent calls cannot exceed organization limit (${orgConcurrentLimit})`);
if (maxConcurrencyValue > effectiveLimit) {
if (fromNumbersCount > 0 && fromNumbersCount < orgConcurrentLimit) {
toast.error(`Max concurrent calls cannot exceed ${effectiveLimit}. You have ${fromNumbersCount} phone number(s) configured — add more CLIs to increase concurrency.`);
} else {
toast.error(`Max concurrent calls cannot exceed organization limit (${effectiveLimit})`);
}
return;
}
}
@ -349,15 +360,26 @@ export default function NewCampaignPage() {
<Input
id="max-concurrency"
type="number"
placeholder={`Organization limit: ${orgConcurrentLimit}`}
placeholder={`Default: ${effectiveLimit}`}
value={maxConcurrency}
onChange={(e) => setMaxConcurrency(e.target.value)}
min={1}
max={orgConcurrentLimit}
max={effectiveLimit}
/>
<p className="text-sm text-muted-foreground">
Maximum number of simultaneous calls. Leave empty to use organization limit ({orgConcurrentLimit}).
Maximum number of simultaneous calls. Leave empty to use {effectiveLimit}.
{fromNumbersCount > 0 && ` You have ${fromNumbersCount} CLI${fromNumbersCount !== 1 ? 's' : ''} and an org limit of ${orgConcurrentLimit}.`}
</p>
{fromNumbersCount > 0 && fromNumbersCount < orgConcurrentLimit && (
<p className="text-sm text-amber-600 dark:text-amber-400">
Concurrency is limited to {fromNumbersCount} by your configured phone numbers. To use the full org limit of {orgConcurrentLimit}, add more CLIs in <a href="/telephony-configurations" className="underline font-medium">Telephony Configuration</a>.
</p>
)}
{fromNumbersCount === 0 && (
<p className="text-sm text-amber-600 dark:text-amber-400">
No phone numbers configured. Add CLIs in <a href="/telephony-configurations" className="underline font-medium">Telephony Configuration</a> before running the campaign.
</p>
)}
</div>
{/* Retry Configuration */}

File diff suppressed because one or more lines are too long

View file

@ -45,6 +45,7 @@ export type CallType = 'inbound' | 'outbound';
export type CampaignLimitsResponse = {
concurrent_call_limit: number;
from_numbers_count: number;
default_retry_config: RetryConfigResponse;
};
@ -705,10 +706,6 @@ export type ProcessDocumentRequestSchema = {
* S3 key of the uploaded file
*/
s3_key: string;
/**
* Embedding service to use for processing. Options: 'openai' (default, 1536-dim, requires API key) or 'sentence_transformer' (free, 384-dim)
*/
embedding_service?: 'sentence_transformer' | 'openai';
};
export type RetryConfigRequest = {
@ -4354,6 +4351,66 @@ export type OptionsConfigApiV1PublicEmbedConfigTokenOptionsResponses = {
200: unknown;
};
export type GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetData = {
body?: never;
path: {
session_token: string;
};
query?: never;
url: '/api/v1/public/embed/turn-credentials/{session_token}';
};
export type GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetErrors = {
/**
* Not found
*/
404: unknown;
/**
* Validation Error
*/
422: HttpValidationError;
};
export type GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetError = GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetErrors[keyof GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetErrors];
export type GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetResponses = {
/**
* Successful Response
*/
200: TurnCredentialsResponse;
};
export type GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetResponse = GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetResponses[keyof GetPublicTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenGetResponses];
export type OptionsTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenOptionsData = {
body?: never;
path: {
session_token: string;
};
query?: never;
url: '/api/v1/public/embed/turn-credentials/{session_token}';
};
export type OptionsTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenOptionsErrors = {
/**
* Not found
*/
404: unknown;
/**
* Validation Error
*/
422: HttpValidationError;
};
export type OptionsTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenOptionsError = OptionsTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenOptionsErrors[keyof OptionsTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenOptionsErrors];
export type OptionsTurnCredentialsApiV1PublicEmbedTurnCredentialsSessionTokenOptionsResponses = {
/**
* Successful Response
*/
200: unknown;
};
export type InitiateCallApiV1PublicAgentUuidPostData = {
body: TriggerCallRequest;
headers: {