fix: number pool initialization in multi telephony setup

If there are multiple telephony configurations, the form number should be initialized from the campaigns given telephonic configuration rather than the organization default telephonic configuration.
This commit is contained in:
Abhishek Kumar 2026-05-08 14:48:53 +05:30
parent 81a363b06e
commit 6d93be3ef6
31 changed files with 1105 additions and 238 deletions

View file

@ -106,7 +106,9 @@ class CampaignCallDispatcher:
provider = await self.get_provider_for_campaign(campaign)
if provider.from_numbers:
await rate_limiter.initialize_from_number_pool(
campaign.organization_id, provider.from_numbers
campaign.organization_id,
provider.from_numbers,
telephony_configuration_id=campaign.telephony_configuration_id,
)
except Exception as e:
logger.warning(f"Failed to initialize from_number pool: {e}")
@ -210,8 +212,13 @@ class CampaignCallDispatcher:
provider = await self.get_provider_for_campaign(campaign)
workflow_run_mode = provider.PROVIDER_NAME
# Acquire a unique from_number from the pool
from_number = await self.acquire_from_number(campaign.organization_id)
# Acquire a unique from_number from the pool scoped to this campaign's
# telephony configuration so orgs with multiple configs don't leak
# caller IDs across configs.
from_number = await self.acquire_from_number(
campaign.organization_id,
telephony_configuration_id=campaign.telephony_configuration_id,
)
if from_number is None:
# Release concurrent slot before raising
await rate_limiter.release_concurrent_slot(
@ -257,7 +264,10 @@ class CampaignCallDispatcher:
# Store from_number mapping for cleanup on call completion
await rate_limiter.store_workflow_from_number_mapping(
workflow_run.id, campaign.organization_id, from_number
workflow_run.id,
campaign.organization_id,
from_number,
telephony_configuration_id=campaign.telephony_configuration_id,
)
except Exception as e:
# Release slot and from_number on error
@ -266,7 +276,9 @@ class CampaignCallDispatcher:
)
if from_number:
await rate_limiter.release_from_number(
campaign.organization_id, from_number
campaign.organization_id,
from_number,
telephony_configuration_id=campaign.telephony_configuration_id,
)
raise
@ -364,8 +376,10 @@ class CampaignCallDispatcher:
workflow_run.id
)
if from_number_mapping:
fn_org_id, fn_number = from_number_mapping
await rate_limiter.release_from_number(fn_org_id, fn_number)
fn_org_id, fn_number, fn_tcid = from_number_mapping
await rate_limiter.release_from_number(
fn_org_id, fn_number, telephony_configuration_id=fn_tcid
)
await rate_limiter.delete_workflow_from_number_mapping(workflow_run.id)
raise
@ -464,23 +478,24 @@ class CampaignCallDispatcher:
await asyncio.sleep(1)
async def acquire_from_number(
self, organization_id: int, timeout: float = 600
self,
organization_id: int,
telephony_configuration_id: int | None,
timeout: float = 600,
) -> Optional[str]:
"""
Acquire a from_number from the pool with retry.
Acquire a from_number from the (org, telephony config) pool with retry.
Waits up to timeout seconds, polling every 1s.
Args:
organization_id: ID of the organization for which to acquire the from_number.
timeout: Maximum time in seconds to wait for a from_number before giving up.
Returns:
The acquired phone number as a string, or None if timeout is exceeded.
"""
wait_start = time.time()
while True:
from_number = await rate_limiter.acquire_from_number(organization_id)
from_number = await rate_limiter.acquire_from_number(
organization_id, telephony_configuration_id
)
if from_number:
return from_number
@ -488,13 +503,15 @@ class CampaignCallDispatcher:
if wait_time > timeout:
logger.warning(
f"From number pool exhausted for org {organization_id} "
f"after waiting {wait_time:.1f}s"
f"config {telephony_configuration_id} after waiting "
f"{wait_time:.1f}s"
)
return None
logger.debug(
f"All from_numbers in use for org {organization_id}, "
f"waited {wait_time:.1f}s, retrying..."
f"All from_numbers in use for org {organization_id} "
f"config {telephony_configuration_id}, waited {wait_time:.1f}s, "
"retrying..."
)
await asyncio.sleep(1)
@ -515,13 +532,15 @@ class CampaignCallDispatcher:
)
slot_released = True
# Release from_number back to pool
# Release from_number back to its (org, telephony config) pool
from_number_mapping = await rate_limiter.get_workflow_from_number_mapping(
workflow_run_id
)
if from_number_mapping:
fn_org_id, fn_number = from_number_mapping
fn_success = await rate_limiter.release_from_number(fn_org_id, fn_number)
fn_org_id, fn_number, fn_tcid = from_number_mapping
fn_success = await rate_limiter.release_from_number(
fn_org_id, fn_number, telephony_configuration_id=fn_tcid
)
if fn_success:
await rate_limiter.delete_workflow_from_number_mapping(workflow_run_id)
logger.info(

View file

@ -247,22 +247,31 @@ class RateLimiter:
# ======== FROM NUMBER POOL METHODS ========
@staticmethod
def _from_number_pool_key(
organization_id: int, telephony_configuration_id: int | None
) -> str:
return f"from_number_pool:{organization_id}:{telephony_configuration_id}"
async def initialize_from_number_pool(
self, organization_id: int, from_numbers: list[str]
self,
organization_id: int,
from_numbers: list[str],
telephony_configuration_id: int | None,
) -> bool:
"""
Initialize the from_number pool for an organization.
Initialize the from_number pool for an organization + telephony config.
Uses ZADD NX so it won't overwrite numbers that are already in use.
Args:
organization_id: The organization ID
from_numbers: List of phone numbers to add to the pool
Pools are scoped per (organization_id, telephony_configuration_id) so
that orgs with multiple telephony configurations do not leak caller IDs
across configs.
"""
if not from_numbers:
return False
redis_client = await self._get_redis()
key = f"from_number_pool:{organization_id}"
key = self._from_number_pool_key(organization_id, telephony_configuration_id)
try:
# ZADD NX: only add members that don't already exist (preserves in-use scores)
@ -274,15 +283,18 @@ class RateLimiter:
logger.error(f"Error initializing from_number pool: {e}")
return False
async def acquire_from_number(self, organization_id: int) -> Optional[str]:
async def acquire_from_number(
self, organization_id: int, telephony_configuration_id: int | None
) -> Optional[str]:
"""
Atomically acquire an available from_number from the pool.
Atomically acquire an available from_number from the pool for the given
(organization_id, telephony_configuration_id).
Cleans stale entries (score > 0 and older than 30 min) before acquiring.
Returns the phone number if available, None if all numbers are in use.
"""
redis_client = await self._get_redis()
key = f"from_number_pool:{organization_id}"
key = self._from_number_pool_key(organization_id, telephony_configuration_id)
now = time.time()
stale_cutoff = now - self.stale_call_timeout
@ -321,16 +333,21 @@ class RateLimiter:
logger.error(f"Error acquiring from_number: {e}")
return None
async def release_from_number(self, organization_id: int, from_number: str) -> bool:
async def release_from_number(
self,
organization_id: int,
from_number: str,
telephony_configuration_id: int | None,
) -> bool:
"""
Release a from_number back to the pool by setting its score to 0.
Harmless if already released (score already 0).
Release a from_number back to its (org, telephony config) pool by
setting its score to 0. Harmless if already released (score already 0).
"""
if not from_number:
return False
redis_client = await self._get_redis()
key = f"from_number_pool:{organization_id}"
key = self._from_number_pool_key(organization_id, telephony_configuration_id)
lua_script = """
local key = KEYS[1]
@ -356,19 +373,33 @@ class RateLimiter:
return False
async def store_workflow_from_number_mapping(
self, workflow_run_id: int, organization_id: int, from_number: str
self,
workflow_run_id: int,
organization_id: int,
from_number: str,
telephony_configuration_id: int | None,
) -> bool:
"""
Store the mapping between workflow_run_id and its from_number.
Used for cleanup when calls complete.
Store the mapping between workflow_run_id and its from_number, plus
the telephony_configuration_id so cleanup can release back to the
correct pool.
"""
redis_client = await self._get_redis()
mapping_key = f"workflow_from_number:{workflow_run_id}"
try:
# Redis hashes can't store None — use empty string sentinel for legacy
# campaigns whose telephony_configuration_id has not been backfilled.
tcid_value = (
"" if telephony_configuration_id is None else telephony_configuration_id
)
await redis_client.hset(
mapping_key,
mapping={"org_id": organization_id, "from_number": from_number},
mapping={
"org_id": organization_id,
"from_number": from_number,
"telephony_configuration_id": tcid_value,
},
)
await redis_client.expire(mapping_key, 1800) # 30 min TTL
return True
@ -378,10 +409,11 @@ class RateLimiter:
async def get_workflow_from_number_mapping(
self, workflow_run_id: int
) -> Optional[tuple[int, str]]:
) -> Optional[tuple[int, str, int | None]]:
"""
Get the from_number mapping for a workflow run.
Returns (organization_id, from_number) tuple or None if not found.
Returns (organization_id, from_number, telephony_configuration_id) or
None if not found. telephony_configuration_id is None for legacy entries.
"""
redis_client = await self._get_redis()
mapping_key = f"workflow_from_number:{workflow_run_id}"
@ -389,7 +421,9 @@ class RateLimiter:
try:
mapping = await redis_client.hgetall(mapping_key)
if mapping and "org_id" in mapping and "from_number" in mapping:
return (int(mapping["org_id"]), mapping["from_number"])
raw_tcid = mapping.get("telephony_configuration_id", "")
tcid = int(raw_tcid) if raw_tcid not in (None, "") else None
return (int(mapping["org_id"]), mapping["from_number"], tcid)
return None
except Exception as e:
logger.error(f"Error getting workflow from_number mapping: {e}")

View file

@ -2,9 +2,6 @@ import os
from loguru import logger
from api.constants import (
ENABLE_TRACING,
)
from api.services.pipecat.audio_config import AudioConfig
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
@ -178,7 +175,7 @@ def create_pipeline_task(pipeline, workflow_run_id, audio_config: AudioConfig =
task = PipelineTask(
pipeline,
params=pipeline_params,
enable_tracing=ENABLE_TRACING,
enable_tracing=True,
enable_rtvi=False,
conversation_id=f"{workflow_run_id}",
)

View file

@ -6,7 +6,6 @@ from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from api.constants import (
ENABLE_TRACING,
LANGFUSE_HOST,
LANGFUSE_PUBLIC_KEY,
LANGFUSE_SECRET_KEY,
@ -138,10 +137,12 @@ class _OrgRoutingExporter(SpanExporter):
def ensure_tracing() -> bool:
"""Initialize OTEL tracing if enabled. Returns True if tracing is available.
"""Initialize OTEL tracing. Returns True once the routing exporter is set up.
Installs an ``_OrgRoutingExporter`` so that spans can be routed to
org-specific Langfuse projects at export time.
org-specific Langfuse projects at export time. Spans without a matching
exporter (no env-var defaults, no registered org) are silently dropped, so
this is safe to call unconditionally.
Idempotent safe to call from both the pipeline process and the ARQ worker.
"""
@ -149,9 +150,6 @@ def ensure_tracing() -> bool:
if _tracing_initialized:
return True
if not ENABLE_TRACING:
return False
# Build the default exporter from env-var credentials (may be None)
default_exporter = None
if all([LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY]):
@ -162,11 +160,6 @@ def ensure_tracing() -> bool:
endpoint=f"{LANGFUSE_HOST}/api/public/otel/v1/traces",
headers={"Authorization": f"Basic {langfuse_auth}"},
)
else:
logger.warning(
"ENABLE_TRACING is true but default Langfuse credentials are not configured. "
"Only org-level credentials will be used."
)
_org_routing_exporter = _OrgRoutingExporter(default_exporter)
setup_tracing(service_name="dograh-pipeline", exporter=_org_routing_exporter)

View file

@ -1,3 +1,15 @@
from .daily_report import DailyReportService
from .run_report import (
build_run_report_csv,
generate_campaign_report_csv,
generate_usage_runs_report_csv,
generate_workflow_report_csv,
)
__all__ = ["DailyReportService"]
__all__ = [
"DailyReportService",
"build_run_report_csv",
"generate_campaign_report_csv",
"generate_usage_runs_report_csv",
"generate_workflow_report_csv",
]

View file

@ -1,6 +1,13 @@
"""CSV reports built from completed workflow runs.
Shared by campaign-, workflow-, and organization-usage-scoped reports.
The DB client supplies the row set; this module owns the column layout
so every endpoint emits the same shape.
"""
import csv
import io
from datetime import datetime
from datetime import UTC, datetime
from typing import Any, List, Optional
from api.constants import BACKEND_API_ENDPOINT
@ -25,11 +32,8 @@ def _collect_extracted_variable_keys(runs: List[Any]) -> list[str]:
return list(keys)
def _build_run_report_csv(runs: List[Any]) -> io.StringIO:
"""Build a CSV from completed workflow runs.
Shared between campaign-scoped and workflow-scoped reports.
"""
def build_run_report_csv(runs: List[Any]) -> io.StringIO:
"""Build a CSV from completed workflow runs."""
extracted_var_keys = _collect_extracted_variable_keys(runs)
output = io.StringIO()
@ -98,7 +102,7 @@ async def generate_campaign_report_csv(
runs = await db_client.get_completed_runs_for_report(
campaign_id=campaign_id, start_date=start_date, end_date=end_date
)
return _build_run_report_csv(runs), f"campaign_{campaign_id}_report.csv"
return build_run_report_csv(runs), f"campaign_{campaign_id}_report.csv"
async def generate_workflow_report_csv(
@ -110,4 +114,24 @@ async def generate_workflow_report_csv(
runs = await db_client.get_completed_runs_for_report(
workflow_id=workflow_id, start_date=start_date, end_date=end_date
)
return _build_run_report_csv(runs), f"workflow_{workflow_id}_report.csv"
return build_run_report_csv(runs), f"workflow_{workflow_id}_report.csv"
async def generate_usage_runs_report_csv(
organization_id: int,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
filters: Optional[list[dict]] = None,
) -> tuple[io.StringIO, str]:
"""Generate a CSV report for runs visible on the org-wide usage page.
Honors the same date / filter inputs as the `/usage/runs` listing.
"""
runs = await db_client.get_usage_runs_for_report(
organization_id,
start_date=start_date,
end_date=end_date,
filters=filters,
)
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
return build_run_report_csv(runs), f"usage_runs_{timestamp}.csv"