mirror of
https://github.com/dograh-hq/dograh.git
synced 2026-06-10 08:05:22 +02:00
380 lines
12 KiB
Python
380 lines
12 KiB
Python
import base64
|
|
import hashlib
|
|
import hmac
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, patch
|
|
from urllib.parse import urlencode
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
from starlette.requests import Request
|
|
|
|
from api.services.telephony.providers.vobiz.provider import VobizProvider
|
|
from api.services.telephony.providers.vobiz.routes import (
|
|
handle_vobiz_hangup_callback,
|
|
handle_vobiz_hangup_callback_by_workflow,
|
|
handle_vobiz_ring_callback,
|
|
)
|
|
|
|
|
|
def _provider() -> VobizProvider:
|
|
return VobizProvider(
|
|
{
|
|
"auth_id": "MA123",
|
|
"auth_token": "vobiz-auth-token",
|
|
"from_numbers": ["+15551230002"],
|
|
}
|
|
)
|
|
|
|
|
|
def _request(
|
|
*,
|
|
path: str,
|
|
form_data: dict[str, str],
|
|
headers: dict[str, str] | None = None,
|
|
) -> Request:
|
|
body = urlencode(form_data).encode("utf-8")
|
|
request_headers = [
|
|
(b"content-type", b"application/x-www-form-urlencoded"),
|
|
*[
|
|
(name.lower().encode("ascii"), value.encode("ascii"))
|
|
for name, value in (headers or {}).items()
|
|
],
|
|
]
|
|
|
|
async def receive():
|
|
return {
|
|
"type": "http.request",
|
|
"body": body,
|
|
"more_body": False,
|
|
}
|
|
|
|
return Request(
|
|
{
|
|
"type": "http",
|
|
"method": "POST",
|
|
"scheme": "https",
|
|
"server": ("example.test", 443),
|
|
"path": path,
|
|
"query_string": b"",
|
|
"headers": request_headers,
|
|
},
|
|
receive,
|
|
)
|
|
|
|
|
|
def _signed_headers(provider: VobizProvider, *, url: str) -> dict[str, str]:
|
|
nonce = "12345678901234567890"
|
|
signature = base64.b64encode(
|
|
hmac.new(
|
|
provider.auth_token.encode("utf-8"),
|
|
f"{url}.{nonce}".encode("utf-8"),
|
|
hashlib.sha256,
|
|
).digest()
|
|
).decode("ascii")
|
|
return {
|
|
"x-vobiz-signature-v3": signature,
|
|
"x-vobiz-signature-v3-nonce": nonce,
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_hangup_callback_accepts_signed_form_body():
|
|
provider = _provider()
|
|
form_data = {
|
|
"CallUUID": "call-123",
|
|
"CallStatus": "completed",
|
|
"From": "15551230001",
|
|
"To": "15551230002",
|
|
"Direction": "outbound",
|
|
"Duration": "12",
|
|
}
|
|
headers = _signed_headers(
|
|
provider, url="https://example.test/api/v1/telephony/vobiz/hangup-callback/123"
|
|
)
|
|
request = _request(
|
|
path="/api/v1/telephony/vobiz/hangup-callback/123",
|
|
form_data=form_data,
|
|
headers=headers,
|
|
)
|
|
|
|
with (
|
|
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
|
|
new_callable=AsyncMock,
|
|
return_value=provider,
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
|
|
new_callable=AsyncMock,
|
|
return_value=("https://example.test", "wss://example.test"),
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes._process_status_update",
|
|
new_callable=AsyncMock,
|
|
) as process_status,
|
|
):
|
|
db_client.get_workflow_run_by_id = AsyncMock(
|
|
return_value=SimpleNamespace(workflow_id=7)
|
|
)
|
|
db_client.get_workflow_by_id = AsyncMock(
|
|
return_value=SimpleNamespace(organization_id=11)
|
|
)
|
|
|
|
result = await handle_vobiz_hangup_callback(
|
|
workflow_run_id=123,
|
|
request=request,
|
|
)
|
|
|
|
assert result == {"status": "success"}
|
|
process_status.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_ring_callback_accepts_signed_form_body():
|
|
provider = _provider()
|
|
form_data = {
|
|
"CallUUID": "call-123",
|
|
"CallStatus": "ringing",
|
|
"From": "15551230001",
|
|
"To": "15551230002",
|
|
}
|
|
headers = _signed_headers(
|
|
provider, url="https://example.test/api/v1/telephony/vobiz/ring-callback/123"
|
|
)
|
|
request = _request(
|
|
path="/api/v1/telephony/vobiz/ring-callback/123",
|
|
form_data=form_data,
|
|
headers=headers,
|
|
)
|
|
|
|
workflow_run = SimpleNamespace(workflow_id=7, logs={})
|
|
|
|
with (
|
|
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
|
|
new_callable=AsyncMock,
|
|
return_value=provider,
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
|
|
new_callable=AsyncMock,
|
|
return_value=("https://example.test", "wss://example.test"),
|
|
),
|
|
):
|
|
db_client.get_workflow_run_by_id = AsyncMock(return_value=workflow_run)
|
|
db_client.get_workflow_by_id = AsyncMock(
|
|
return_value=SimpleNamespace(organization_id=11)
|
|
)
|
|
db_client.update_workflow_run = AsyncMock()
|
|
|
|
result = await handle_vobiz_ring_callback(
|
|
workflow_run_id=123,
|
|
request=request,
|
|
)
|
|
|
|
assert result == {"status": "success"}
|
|
db_client.update_workflow_run.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_verify_webhook_signature_accepts_v3_and_strips_query():
|
|
provider = _provider()
|
|
headers = _signed_headers(
|
|
provider, url="https://example.test/api/v1/telephony/vobiz/hangup-callback/123"
|
|
)
|
|
|
|
assert await provider.verify_webhook_signature(
|
|
"https://example.test/api/v1/telephony/vobiz/hangup-callback/123?foo=bar",
|
|
{},
|
|
headers["x-vobiz-signature-v3"],
|
|
headers["x-vobiz-signature-v3-nonce"],
|
|
signature_version="v3",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_verify_inbound_signature_accepts_v2():
|
|
provider = _provider()
|
|
url = "https://example.test/api/v1/telephony/vobiz/hangup-callback/123"
|
|
nonce = "12345678901234567890"
|
|
signature = base64.b64encode(
|
|
hmac.new(
|
|
provider.auth_token.encode("utf-8"),
|
|
f"{url}{nonce}".encode("utf-8"),
|
|
hashlib.sha256,
|
|
).digest()
|
|
).decode("ascii")
|
|
|
|
assert await provider.verify_inbound_signature(
|
|
url,
|
|
{},
|
|
{
|
|
"X-Vobiz-Signature-V2": signature,
|
|
"X-Vobiz-Signature-V2-Nonce": nonce,
|
|
},
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_verify_inbound_signature_rejects_missing_signature():
|
|
provider = _provider()
|
|
|
|
assert not await provider.verify_inbound_signature(
|
|
"https://example.test/api/v1/telephony/vobiz/hangup-callback/123",
|
|
{},
|
|
{},
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_hangup_callback_rejects_missing_signature():
|
|
"""An unsigned hangup callback must be rejected before status processing."""
|
|
provider = _provider()
|
|
form_data = {
|
|
"CallUUID": "call-123",
|
|
"CallStatus": "completed",
|
|
"From": "15551230001",
|
|
"To": "15551230002",
|
|
"Direction": "outbound",
|
|
"Duration": "12",
|
|
}
|
|
# No x-vobiz-signature-* headers — the callback is unsigned.
|
|
request = _request(
|
|
path="/api/v1/telephony/vobiz/hangup-callback/123",
|
|
form_data=form_data,
|
|
)
|
|
|
|
with (
|
|
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
|
|
new_callable=AsyncMock,
|
|
return_value=provider,
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
|
|
new_callable=AsyncMock,
|
|
return_value=("https://example.test", "wss://example.test"),
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes._process_status_update",
|
|
new_callable=AsyncMock,
|
|
) as process_status,
|
|
):
|
|
db_client.get_workflow_run_by_id = AsyncMock(
|
|
return_value=SimpleNamespace(workflow_id=7)
|
|
)
|
|
db_client.get_workflow_by_id = AsyncMock(
|
|
return_value=SimpleNamespace(organization_id=11)
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await handle_vobiz_hangup_callback(
|
|
workflow_run_id=123,
|
|
request=request,
|
|
)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
process_status.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_ring_callback_rejects_missing_signature():
|
|
"""An unsigned ring callback must be rejected before it is logged."""
|
|
provider = _provider()
|
|
form_data = {
|
|
"CallUUID": "call-123",
|
|
"CallStatus": "ringing",
|
|
"From": "15551230001",
|
|
"To": "15551230002",
|
|
}
|
|
# No x-vobiz-signature-* headers — the callback is unsigned.
|
|
request = _request(
|
|
path="/api/v1/telephony/vobiz/ring-callback/123",
|
|
form_data=form_data,
|
|
)
|
|
|
|
workflow_run = SimpleNamespace(workflow_id=7, logs={})
|
|
|
|
with (
|
|
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
|
|
new_callable=AsyncMock,
|
|
return_value=provider,
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
|
|
new_callable=AsyncMock,
|
|
return_value=("https://example.test", "wss://example.test"),
|
|
),
|
|
):
|
|
db_client.get_workflow_run_by_id = AsyncMock(return_value=workflow_run)
|
|
db_client.get_workflow_by_id = AsyncMock(
|
|
return_value=SimpleNamespace(organization_id=11)
|
|
)
|
|
db_client.update_workflow_run = AsyncMock()
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await handle_vobiz_ring_callback(
|
|
workflow_run_id=123,
|
|
request=request,
|
|
)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
db_client.update_workflow_run.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vobiz_hangup_callback_by_workflow_rejects_missing_signature():
|
|
"""An unsigned by-workflow hangup callback must be rejected before processing."""
|
|
provider = _provider()
|
|
form_data = {
|
|
"CallUUID": "call-123",
|
|
"CallStatus": "completed",
|
|
"From": "15551230001",
|
|
"To": "15551230002",
|
|
"Direction": "outbound",
|
|
"Duration": "12",
|
|
}
|
|
# No x-vobiz-signature-* headers — the callback is unsigned.
|
|
request = _request(
|
|
path="/api/v1/telephony/vobiz/hangup-callback/workflow/7",
|
|
form_data=form_data,
|
|
)
|
|
|
|
with (
|
|
patch("api.services.telephony.providers.vobiz.routes.db_client") as db_client,
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_telephony_provider_for_run",
|
|
new_callable=AsyncMock,
|
|
return_value=provider,
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes.get_backend_endpoints",
|
|
new_callable=AsyncMock,
|
|
return_value=("https://example.test", "wss://example.test"),
|
|
),
|
|
patch(
|
|
"api.services.telephony.providers.vobiz.routes._process_status_update",
|
|
new_callable=AsyncMock,
|
|
) as process_status,
|
|
):
|
|
db_client.get_workflow_by_id = AsyncMock(
|
|
return_value=SimpleNamespace(organization_id=11)
|
|
)
|
|
db_client.get_workflow_run_by_call_id = AsyncMock(
|
|
return_value=SimpleNamespace(id=123, workflow_id=7)
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await handle_vobiz_hangup_callback_by_workflow(
|
|
workflow_id=7,
|
|
request=request,
|
|
)
|
|
|
|
assert exc_info.value.status_code == 403
|
|
process_status.assert_not_awaited()
|