dograh/api/tests/test_from_number_pool_isolation.py
Abhishek 3892b58486
feat: add ultravox realtime and fix signature issue in telephony (#345)
* feat: add ultravox realtime and fix signature issue in telephony

- Add UltraVox realtime
- Fix signature issue on telephony

* fix: fix regression for wss_backend_endpoint
2026-05-23 12:51:55 +05:30

357 lines
13 KiB
Python

"""
Tests verifying that the from_number pool isolates numbers per
telephony_configuration_id within an organization.
When an org has multiple telephony configurations (each with its own pool of
caller IDs), a campaign pinned to config A must never be handed a from_number
that belongs to config B. Otherwise the call is placed via provider A using a
DID owned by config B and either fails or originates from the wrong number.
These tests cover both:
- The rate_limiter, which owns the Redis-backed pool.
- The CampaignCallDispatcher, which must thread the campaign's
telephony_configuration_id through acquire / release / mapping calls.
"""
import os
import uuid
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from api.services.campaign.campaign_call_dispatcher import CampaignCallDispatcher
from api.services.campaign.rate_limiter import RateLimiter
def _unique_id() -> int:
"""A stable-but-unique positive int derived from a uuid for keying tests."""
return uuid.uuid4().int % 10_000_000
@pytest.fixture
async def isolated_rate_limiter():
"""A RateLimiter wired to the same Redis as production but using unique ids
per test, with cleanup of any keys it touched."""
rl = RateLimiter()
redis_client = await rl._get_redis()
created_keys: list[str] = []
original_eval = redis_client.eval
original_zadd = redis_client.zadd
async def tracking_eval(script, numkeys, *args, **kwargs):
if numkeys >= 1:
created_keys.append(args[0])
return await original_eval(script, numkeys, *args, **kwargs)
async def tracking_zadd(name, *args, **kwargs):
created_keys.append(name)
return await original_zadd(name, *args, **kwargs)
redis_client.eval = tracking_eval # type: ignore[assignment]
redis_client.zadd = tracking_zadd # type: ignore[assignment]
yield rl
redis_client.eval = original_eval # type: ignore[assignment]
redis_client.zadd = original_zadd # type: ignore[assignment]
if created_keys:
await redis_client.delete(*set(created_keys))
await rl.close()
# ---------------------------------------------------------------------------
# Rate limiter pool isolation
# ---------------------------------------------------------------------------
class TestRateLimiterFromNumberPoolIsolation:
"""The rate_limiter pool keys must include telephony_configuration_id."""
@pytest.mark.skipif(
"REDIS_URL" not in os.environ,
reason="Requires Redis (set REDIS_URL via .env.test)",
)
@pytest.mark.asyncio
async def test_acquire_only_returns_numbers_for_requested_config(
self, isolated_rate_limiter
):
rl = isolated_rate_limiter
org_id = _unique_id()
config_a = _unique_id()
config_b = _unique_id()
numbers_a = [f"+1555111{i:04d}" for i in range(3)]
numbers_b = [f"+1555222{i:04d}" for i in range(3)]
await rl.initialize_from_number_pool(
org_id, numbers_a, telephony_configuration_id=config_a
)
await rl.initialize_from_number_pool(
org_id, numbers_b, telephony_configuration_id=config_b
)
# Drain and cycle config_a's pool many times; the acquire should never
# hand out a config_b number.
seen: set[str] = set()
for _ in range(20):
n = await rl.acquire_from_number(
org_id, telephony_configuration_id=config_a
)
if n is None:
break
seen.add(n)
await rl.release_from_number(org_id, n, telephony_configuration_id=config_a)
assert seen == set(numbers_a), (
f"Expected only config_a numbers, but acquire returned: {seen}. "
f"Cross-config leak: {seen - set(numbers_a)}"
)
@pytest.mark.skipif(
"REDIS_URL" not in os.environ,
reason="Requires Redis (set REDIS_URL via .env.test)",
)
@pytest.mark.asyncio
async def test_release_returns_number_to_owning_config_pool(
self, isolated_rate_limiter
):
rl = isolated_rate_limiter
org_id = _unique_id()
config_a = _unique_id()
config_b = _unique_id()
numbers_a = ["+15551110001", "+15551110002"]
numbers_b = ["+15552220001", "+15552220002"]
await rl.initialize_from_number_pool(
org_id, numbers_a, telephony_configuration_id=config_a
)
await rl.initialize_from_number_pool(
org_id, numbers_b, telephony_configuration_id=config_b
)
# Acquire all of config_a's numbers (none released).
first = await rl.acquire_from_number(
org_id, telephony_configuration_id=config_a
)
second = await rl.acquire_from_number(
org_id, telephony_configuration_id=config_a
)
assert {first, second} == set(numbers_a)
# config_a is now exhausted — config_b is fully untouched.
# Acquiring for config_a must return None, NOT spill into config_b.
none_for_a = await rl.acquire_from_number(
org_id, telephony_configuration_id=config_a
)
assert none_for_a is None, (
f"Pool for config_a is exhausted but acquire returned {none_for_a}"
"this indicates a cross-config leak."
)
# config_b's pool is fully available.
b_acquired = []
for _ in range(2):
n = await rl.acquire_from_number(
org_id, telephony_configuration_id=config_b
)
assert n is not None
b_acquired.append(n)
assert set(b_acquired) == set(numbers_b)
@pytest.mark.skipif(
"REDIS_URL" not in os.environ,
reason="Requires Redis (set REDIS_URL via .env.test)",
)
@pytest.mark.asyncio
async def test_workflow_from_number_mapping_round_trips_config(
self, isolated_rate_limiter
):
"""The mapping stored at dispatch must include the config so cleanup
can release back to the correct pool."""
rl = isolated_rate_limiter
workflow_run_id = _unique_id()
org_id = _unique_id()
config_id = _unique_id()
from_number = "+15553330001"
await rl.store_workflow_from_number_mapping(
workflow_run_id,
org_id,
from_number,
telephony_configuration_id=config_id,
)
mapping = await rl.get_workflow_from_number_mapping(workflow_run_id)
assert mapping is not None
# Tuple shape: (org_id, from_number, telephony_configuration_id)
assert len(mapping) == 3, (
f"mapping must include telephony_configuration_id, got: {mapping}"
)
assert mapping == (org_id, from_number, config_id)
await rl.delete_workflow_from_number_mapping(workflow_run_id)
# ---------------------------------------------------------------------------
# Dispatcher: must thread telephony_configuration_id end-to-end
# ---------------------------------------------------------------------------
def _make_campaign(
*,
organization_id: int,
telephony_configuration_id: int,
workflow_id: int = 1,
campaign_id: int = 99,
) -> SimpleNamespace:
return SimpleNamespace(
id=campaign_id,
organization_id=organization_id,
workflow_id=workflow_id,
created_by=1,
telephony_configuration_id=telephony_configuration_id,
rate_limit_per_second=100,
processed_rows=0,
orchestrator_metadata={},
)
def _make_queued_run(
*,
queued_run_id: int = 1,
phone_number: str = "+15559990001",
) -> SimpleNamespace:
return SimpleNamespace(
id=queued_run_id,
source_uuid=f"src-{queued_run_id}",
context_variables={"phone_number": phone_number},
)
class TestDispatcherThreadsTelephonyConfig:
"""The dispatcher must pass telephony_configuration_id when acquiring,
storing the mapping, and releasing the from_number."""
@pytest.mark.asyncio
async def test_dispatch_call_acquires_from_number_for_campaign_config(self):
org_id = 7
config_id = 4242
campaign = _make_campaign(
organization_id=org_id, telephony_configuration_id=config_id
)
queued_run = _make_queued_run()
provider = MagicMock()
provider.PROVIDER_NAME = "twilio"
provider.WEBHOOK_ENDPOINT = "twilio/voice"
provider.from_numbers = ["+15551110001"]
provider.initiate_call = AsyncMock(
return_value=SimpleNamespace(call_id="call-1", provider_metadata={})
)
workflow_run = SimpleNamespace(id=555, logs={})
dispatcher = CampaignCallDispatcher()
with (
patch.object(
dispatcher,
"get_provider_for_campaign",
AsyncMock(return_value=provider),
),
patch(
"api.services.campaign.campaign_call_dispatcher.db_client"
) as mock_db,
patch(
"api.services.campaign.campaign_call_dispatcher.rate_limiter"
) as mock_rl,
patch(
"api.services.campaign.campaign_call_dispatcher.get_backend_endpoints",
AsyncMock(return_value=("https://example.com", None)),
),
):
mock_db.get_workflow_by_id = AsyncMock(return_value=SimpleNamespace(id=1))
mock_db.create_workflow_run = AsyncMock(return_value=workflow_run)
mock_db.update_workflow_run = AsyncMock()
mock_rl.acquire_from_number = AsyncMock(return_value="+15551110001")
mock_rl.release_from_number = AsyncMock()
mock_rl.release_concurrent_slot = AsyncMock()
mock_rl.store_workflow_slot_mapping = AsyncMock()
mock_rl.store_workflow_from_number_mapping = AsyncMock()
await dispatcher.dispatch_call(queued_run, campaign, slot_id="slot-1")
# acquire_from_number on rate_limiter must be called with the
# campaign's telephony_configuration_id.
assert mock_rl.acquire_from_number.await_count == 1
call = mock_rl.acquire_from_number.await_args
kwargs = call.kwargs
args = call.args
received_config = kwargs.get("telephony_configuration_id") or (
args[1] if len(args) > 1 else None
)
assert received_config == config_id, (
"dispatch_call must pass campaign.telephony_configuration_id "
f"({config_id}) to rate_limiter.acquire_from_number, got "
f"args={args}, kwargs={kwargs}"
)
# The workflow→from_number mapping must also remember the config so
# the cleanup path can release back to the right pool.
assert mock_rl.store_workflow_from_number_mapping.await_count == 1
store_call = mock_rl.store_workflow_from_number_mapping.await_args
store_kwargs = store_call.kwargs
store_args = store_call.args
stored_config = store_kwargs.get("telephony_configuration_id") or (
store_args[3] if len(store_args) > 3 else None
)
assert stored_config == config_id, (
"store_workflow_from_number_mapping must persist the "
f"telephony_configuration_id ({config_id}); got args={store_args}, "
f"kwargs={store_kwargs}"
)
assert provider.initiate_call.await_count == 1
webhook_url = provider.initiate_call.await_args.kwargs["webhook_url"]
assert "campaign_id=" not in webhook_url, (
"campaign outbound answer_url should not include campaign_id; "
f"got {webhook_url}"
)
@pytest.mark.asyncio
async def test_release_call_slot_uses_stored_telephony_config(self):
"""When a call completes, release_call_slot must release the from_number
to the same telephony config it was acquired from."""
org_id = 7
config_id = 4242
from_number = "+15551110001"
workflow_run_id = 555
dispatcher = CampaignCallDispatcher()
with patch(
"api.services.campaign.campaign_call_dispatcher.rate_limiter"
) as mock_rl:
mock_rl.get_workflow_slot_mapping = AsyncMock(return_value=None)
mock_rl.get_workflow_from_number_mapping = AsyncMock(
return_value=(org_id, from_number, config_id)
)
mock_rl.release_from_number = AsyncMock(return_value=True)
mock_rl.delete_workflow_from_number_mapping = AsyncMock(return_value=True)
await dispatcher.release_call_slot(workflow_run_id)
assert mock_rl.release_from_number.await_count == 1
call = mock_rl.release_from_number.await_args
args = call.args
kwargs = call.kwargs
released_config = kwargs.get("telephony_configuration_id") or (
args[2] if len(args) > 2 else None
)
assert released_config == config_id, (
"release_call_slot must pass telephony_configuration_id "
f"({config_id}) so the number is returned to its pool; got "
f"args={args}, kwargs={kwargs}"
)