mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-07 07:55:16 +02:00
* feat: add ultravox realtime and fix signature issue in telephony - Add UltraVox realtime - Fix signature issue on telephony * fix: fix regression for wss_backend_endpoint
357 lines
13 KiB
Python
357 lines
13 KiB
Python
"""
|
|
Tests verifying that the from_number pool isolates numbers per
|
|
telephony_configuration_id within an organization.
|
|
|
|
When an org has multiple telephony configurations (each with its own pool of
|
|
caller IDs), a campaign pinned to config A must never be handed a from_number
|
|
that belongs to config B. Otherwise the call is placed via provider A using a
|
|
DID owned by config B and either fails or originates from the wrong number.
|
|
|
|
These tests cover both:
|
|
- The rate_limiter, which owns the Redis-backed pool.
|
|
- The CampaignCallDispatcher, which must thread the campaign's
|
|
telephony_configuration_id through acquire / release / mapping calls.
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from api.services.campaign.campaign_call_dispatcher import CampaignCallDispatcher
|
|
from api.services.campaign.rate_limiter import RateLimiter
|
|
|
|
|
|
def _unique_id() -> int:
|
|
"""A stable-but-unique positive int derived from a uuid for keying tests."""
|
|
return uuid.uuid4().int % 10_000_000
|
|
|
|
|
|
@pytest.fixture
|
|
async def isolated_rate_limiter():
|
|
"""A RateLimiter wired to the same Redis as production but using unique ids
|
|
per test, with cleanup of any keys it touched."""
|
|
rl = RateLimiter()
|
|
redis_client = await rl._get_redis()
|
|
created_keys: list[str] = []
|
|
|
|
original_eval = redis_client.eval
|
|
original_zadd = redis_client.zadd
|
|
|
|
async def tracking_eval(script, numkeys, *args, **kwargs):
|
|
if numkeys >= 1:
|
|
created_keys.append(args[0])
|
|
return await original_eval(script, numkeys, *args, **kwargs)
|
|
|
|
async def tracking_zadd(name, *args, **kwargs):
|
|
created_keys.append(name)
|
|
return await original_zadd(name, *args, **kwargs)
|
|
|
|
redis_client.eval = tracking_eval # type: ignore[assignment]
|
|
redis_client.zadd = tracking_zadd # type: ignore[assignment]
|
|
|
|
yield rl
|
|
|
|
redis_client.eval = original_eval # type: ignore[assignment]
|
|
redis_client.zadd = original_zadd # type: ignore[assignment]
|
|
if created_keys:
|
|
await redis_client.delete(*set(created_keys))
|
|
await rl.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rate limiter pool isolation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRateLimiterFromNumberPoolIsolation:
|
|
"""The rate_limiter pool keys must include telephony_configuration_id."""
|
|
|
|
@pytest.mark.skipif(
|
|
"REDIS_URL" not in os.environ,
|
|
reason="Requires Redis (set REDIS_URL via .env.test)",
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_acquire_only_returns_numbers_for_requested_config(
|
|
self, isolated_rate_limiter
|
|
):
|
|
rl = isolated_rate_limiter
|
|
org_id = _unique_id()
|
|
config_a = _unique_id()
|
|
config_b = _unique_id()
|
|
numbers_a = [f"+1555111{i:04d}" for i in range(3)]
|
|
numbers_b = [f"+1555222{i:04d}" for i in range(3)]
|
|
|
|
await rl.initialize_from_number_pool(
|
|
org_id, numbers_a, telephony_configuration_id=config_a
|
|
)
|
|
await rl.initialize_from_number_pool(
|
|
org_id, numbers_b, telephony_configuration_id=config_b
|
|
)
|
|
|
|
# Drain and cycle config_a's pool many times; the acquire should never
|
|
# hand out a config_b number.
|
|
seen: set[str] = set()
|
|
for _ in range(20):
|
|
n = await rl.acquire_from_number(
|
|
org_id, telephony_configuration_id=config_a
|
|
)
|
|
if n is None:
|
|
break
|
|
seen.add(n)
|
|
await rl.release_from_number(org_id, n, telephony_configuration_id=config_a)
|
|
|
|
assert seen == set(numbers_a), (
|
|
f"Expected only config_a numbers, but acquire returned: {seen}. "
|
|
f"Cross-config leak: {seen - set(numbers_a)}"
|
|
)
|
|
|
|
@pytest.mark.skipif(
|
|
"REDIS_URL" not in os.environ,
|
|
reason="Requires Redis (set REDIS_URL via .env.test)",
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_release_returns_number_to_owning_config_pool(
|
|
self, isolated_rate_limiter
|
|
):
|
|
rl = isolated_rate_limiter
|
|
org_id = _unique_id()
|
|
config_a = _unique_id()
|
|
config_b = _unique_id()
|
|
numbers_a = ["+15551110001", "+15551110002"]
|
|
numbers_b = ["+15552220001", "+15552220002"]
|
|
|
|
await rl.initialize_from_number_pool(
|
|
org_id, numbers_a, telephony_configuration_id=config_a
|
|
)
|
|
await rl.initialize_from_number_pool(
|
|
org_id, numbers_b, telephony_configuration_id=config_b
|
|
)
|
|
|
|
# Acquire all of config_a's numbers (none released).
|
|
first = await rl.acquire_from_number(
|
|
org_id, telephony_configuration_id=config_a
|
|
)
|
|
second = await rl.acquire_from_number(
|
|
org_id, telephony_configuration_id=config_a
|
|
)
|
|
assert {first, second} == set(numbers_a)
|
|
|
|
# config_a is now exhausted — config_b is fully untouched.
|
|
# Acquiring for config_a must return None, NOT spill into config_b.
|
|
none_for_a = await rl.acquire_from_number(
|
|
org_id, telephony_configuration_id=config_a
|
|
)
|
|
assert none_for_a is None, (
|
|
f"Pool for config_a is exhausted but acquire returned {none_for_a} — "
|
|
"this indicates a cross-config leak."
|
|
)
|
|
|
|
# config_b's pool is fully available.
|
|
b_acquired = []
|
|
for _ in range(2):
|
|
n = await rl.acquire_from_number(
|
|
org_id, telephony_configuration_id=config_b
|
|
)
|
|
assert n is not None
|
|
b_acquired.append(n)
|
|
assert set(b_acquired) == set(numbers_b)
|
|
|
|
@pytest.mark.skipif(
|
|
"REDIS_URL" not in os.environ,
|
|
reason="Requires Redis (set REDIS_URL via .env.test)",
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_workflow_from_number_mapping_round_trips_config(
|
|
self, isolated_rate_limiter
|
|
):
|
|
"""The mapping stored at dispatch must include the config so cleanup
|
|
can release back to the correct pool."""
|
|
rl = isolated_rate_limiter
|
|
workflow_run_id = _unique_id()
|
|
org_id = _unique_id()
|
|
config_id = _unique_id()
|
|
from_number = "+15553330001"
|
|
|
|
await rl.store_workflow_from_number_mapping(
|
|
workflow_run_id,
|
|
org_id,
|
|
from_number,
|
|
telephony_configuration_id=config_id,
|
|
)
|
|
|
|
mapping = await rl.get_workflow_from_number_mapping(workflow_run_id)
|
|
assert mapping is not None
|
|
# Tuple shape: (org_id, from_number, telephony_configuration_id)
|
|
assert len(mapping) == 3, (
|
|
f"mapping must include telephony_configuration_id, got: {mapping}"
|
|
)
|
|
assert mapping == (org_id, from_number, config_id)
|
|
|
|
await rl.delete_workflow_from_number_mapping(workflow_run_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dispatcher: must thread telephony_configuration_id end-to-end
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_campaign(
|
|
*,
|
|
organization_id: int,
|
|
telephony_configuration_id: int,
|
|
workflow_id: int = 1,
|
|
campaign_id: int = 99,
|
|
) -> SimpleNamespace:
|
|
return SimpleNamespace(
|
|
id=campaign_id,
|
|
organization_id=organization_id,
|
|
workflow_id=workflow_id,
|
|
created_by=1,
|
|
telephony_configuration_id=telephony_configuration_id,
|
|
rate_limit_per_second=100,
|
|
processed_rows=0,
|
|
orchestrator_metadata={},
|
|
)
|
|
|
|
|
|
def _make_queued_run(
|
|
*,
|
|
queued_run_id: int = 1,
|
|
phone_number: str = "+15559990001",
|
|
) -> SimpleNamespace:
|
|
return SimpleNamespace(
|
|
id=queued_run_id,
|
|
source_uuid=f"src-{queued_run_id}",
|
|
context_variables={"phone_number": phone_number},
|
|
)
|
|
|
|
|
|
class TestDispatcherThreadsTelephonyConfig:
|
|
"""The dispatcher must pass telephony_configuration_id when acquiring,
|
|
storing the mapping, and releasing the from_number."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_call_acquires_from_number_for_campaign_config(self):
|
|
org_id = 7
|
|
config_id = 4242
|
|
campaign = _make_campaign(
|
|
organization_id=org_id, telephony_configuration_id=config_id
|
|
)
|
|
queued_run = _make_queued_run()
|
|
|
|
provider = MagicMock()
|
|
provider.PROVIDER_NAME = "twilio"
|
|
provider.WEBHOOK_ENDPOINT = "twilio/voice"
|
|
provider.from_numbers = ["+15551110001"]
|
|
provider.initiate_call = AsyncMock(
|
|
return_value=SimpleNamespace(call_id="call-1", provider_metadata={})
|
|
)
|
|
|
|
workflow_run = SimpleNamespace(id=555, logs={})
|
|
|
|
dispatcher = CampaignCallDispatcher()
|
|
|
|
with (
|
|
patch.object(
|
|
dispatcher,
|
|
"get_provider_for_campaign",
|
|
AsyncMock(return_value=provider),
|
|
),
|
|
patch(
|
|
"api.services.campaign.campaign_call_dispatcher.db_client"
|
|
) as mock_db,
|
|
patch(
|
|
"api.services.campaign.campaign_call_dispatcher.rate_limiter"
|
|
) as mock_rl,
|
|
patch(
|
|
"api.services.campaign.campaign_call_dispatcher.get_backend_endpoints",
|
|
AsyncMock(return_value=("https://example.com", None)),
|
|
),
|
|
):
|
|
mock_db.get_workflow_by_id = AsyncMock(return_value=SimpleNamespace(id=1))
|
|
mock_db.create_workflow_run = AsyncMock(return_value=workflow_run)
|
|
mock_db.update_workflow_run = AsyncMock()
|
|
|
|
mock_rl.acquire_from_number = AsyncMock(return_value="+15551110001")
|
|
mock_rl.release_from_number = AsyncMock()
|
|
mock_rl.release_concurrent_slot = AsyncMock()
|
|
mock_rl.store_workflow_slot_mapping = AsyncMock()
|
|
mock_rl.store_workflow_from_number_mapping = AsyncMock()
|
|
|
|
await dispatcher.dispatch_call(queued_run, campaign, slot_id="slot-1")
|
|
|
|
# acquire_from_number on rate_limiter must be called with the
|
|
# campaign's telephony_configuration_id.
|
|
assert mock_rl.acquire_from_number.await_count == 1
|
|
call = mock_rl.acquire_from_number.await_args
|
|
kwargs = call.kwargs
|
|
args = call.args
|
|
received_config = kwargs.get("telephony_configuration_id") or (
|
|
args[1] if len(args) > 1 else None
|
|
)
|
|
assert received_config == config_id, (
|
|
"dispatch_call must pass campaign.telephony_configuration_id "
|
|
f"({config_id}) to rate_limiter.acquire_from_number, got "
|
|
f"args={args}, kwargs={kwargs}"
|
|
)
|
|
|
|
# The workflow→from_number mapping must also remember the config so
|
|
# the cleanup path can release back to the right pool.
|
|
assert mock_rl.store_workflow_from_number_mapping.await_count == 1
|
|
store_call = mock_rl.store_workflow_from_number_mapping.await_args
|
|
store_kwargs = store_call.kwargs
|
|
store_args = store_call.args
|
|
stored_config = store_kwargs.get("telephony_configuration_id") or (
|
|
store_args[3] if len(store_args) > 3 else None
|
|
)
|
|
assert stored_config == config_id, (
|
|
"store_workflow_from_number_mapping must persist the "
|
|
f"telephony_configuration_id ({config_id}); got args={store_args}, "
|
|
f"kwargs={store_kwargs}"
|
|
)
|
|
|
|
assert provider.initiate_call.await_count == 1
|
|
webhook_url = provider.initiate_call.await_args.kwargs["webhook_url"]
|
|
assert "campaign_id=" not in webhook_url, (
|
|
"campaign outbound answer_url should not include campaign_id; "
|
|
f"got {webhook_url}"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_release_call_slot_uses_stored_telephony_config(self):
|
|
"""When a call completes, release_call_slot must release the from_number
|
|
to the same telephony config it was acquired from."""
|
|
org_id = 7
|
|
config_id = 4242
|
|
from_number = "+15551110001"
|
|
workflow_run_id = 555
|
|
|
|
dispatcher = CampaignCallDispatcher()
|
|
|
|
with patch(
|
|
"api.services.campaign.campaign_call_dispatcher.rate_limiter"
|
|
) as mock_rl:
|
|
mock_rl.get_workflow_slot_mapping = AsyncMock(return_value=None)
|
|
mock_rl.get_workflow_from_number_mapping = AsyncMock(
|
|
return_value=(org_id, from_number, config_id)
|
|
)
|
|
mock_rl.release_from_number = AsyncMock(return_value=True)
|
|
mock_rl.delete_workflow_from_number_mapping = AsyncMock(return_value=True)
|
|
|
|
await dispatcher.release_call_slot(workflow_run_id)
|
|
|
|
assert mock_rl.release_from_number.await_count == 1
|
|
call = mock_rl.release_from_number.await_args
|
|
args = call.args
|
|
kwargs = call.kwargs
|
|
released_config = kwargs.get("telephony_configuration_id") or (
|
|
args[2] if len(args) > 2 else None
|
|
)
|
|
assert released_config == config_id, (
|
|
"release_call_slot must pass telephony_configuration_id "
|
|
f"({config_id}) so the number is returned to its pool; got "
|
|
f"args={args}, kwargs={kwargs}"
|
|
)
|