fix: fix vobiz webhook signature validation

This commit is contained in:
Abhishek Kumar 2026-05-25 18:30:06 +05:30
parent a725fda274
commit 285de92528
3 changed files with 168 additions and 84 deletions

View file

@ -2,9 +2,13 @@
Vobiz implementation of the TelephonyProvider interface.
"""
import base64
import hashlib
import hmac
import json
import random
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.parse import urlparse, urlunparse
import aiohttp
from fastapi import HTTPException
@ -201,23 +205,20 @@ class VobizProvider(TelephonyProvider):
url: str,
params: Dict[str, Any],
signature: str,
timestamp: str = None,
nonce: str = None,
body: str = "",
signature_version: str = "v3",
) -> bool:
"""
Verify Vobiz webhook signature for security.
Vobiz uses HMAC-SHA256 signature verification with timestamp validation:
- Header: x-vobiz-signature (HMAC-SHA256 hash)
- Header: x-vobiz-timestamp (timestamp for replay protection)
- Signature = HMAC-SHA256(auth_token, timestamp + '.' + body)
Vobiz signs the callback base URL (query parameters stripped) with
the account auth token and a request nonce:
- V2: base64(HMAC-SHA256(auth_token, baseURL + nonce))
- V3: base64(HMAC-SHA256(auth_token, baseURL + "." + nonce))
"""
import hashlib
import hmac
from datetime import datetime, timezone
if not signature or not timestamp:
logger.warning("Missing signature or timestamp headers for Vobiz webhook")
if not signature or not nonce:
logger.warning("Missing signature or nonce headers for Vobiz webhook")
return False
if not self.auth_token:
@ -226,37 +227,33 @@ class VobizProvider(TelephonyProvider):
)
return False
try:
# 1. Timestamp validation (within 5 minutes)
webhook_timestamp = int(timestamp)
current_timestamp = int(datetime.now(timezone.utc).timestamp())
time_diff = abs(current_timestamp - webhook_timestamp)
if time_diff > 300: # 5 minutes = 300 seconds
logger.warning(f"Vobiz webhook timestamp too old: {time_diff}s > 300s")
return False
# 2. Signature verification
# Create expected signature: HMAC-SHA256(auth_token, timestamp + '.' + body)
payload = f"{timestamp}.{body}"
expected_signature = hmac.new(
self.auth_token.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256
).hexdigest()
# 3. Compare signatures (timing-safe comparison)
is_valid = hmac.compare_digest(expected_signature, signature)
if not is_valid:
logger.warning(
f"Vobiz webhook signature mismatch. Expected: {expected_signature[:8]}..., Got: {signature[:8]}..."
)
return is_valid
except Exception as e:
logger.error(f"Error verifying Vobiz webhook signature: {e}")
version = signature_version.lower()
if version not in {"v2", "v3"}:
logger.warning(f"Unsupported Vobiz signature version: {signature_version}")
return False
parsed_url = urlparse(url)
base_url = urlunparse(
(parsed_url.scheme, parsed_url.netloc, parsed_url.path, "", "", "")
)
signed_payload = base_url + (f".{nonce}" if version == "v3" else nonce)
expected_signature = base64.b64encode(
hmac.new(
self.auth_token.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
).digest()
).decode("ascii")
is_valid = hmac.compare_digest(expected_signature, signature)
if not is_valid:
logger.warning(
f"Vobiz webhook signature mismatch. Expected: {expected_signature[:8]}..., Got: {signature[:8]}..."
)
return is_valid
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
) -> str:
@ -472,20 +469,34 @@ class VobizProvider(TelephonyProvider):
) -> bool:
"""
Verify the signature of an inbound Vobiz webhook for security.
Uses HMAC-SHA256 over ``timestamp + '.' + body`` with the auth_token.
Uses Vobiz's documented V3/V2 HMAC-SHA256 callback signatures.
"""
signature = headers.get("x-vobiz-signature", "")
timestamp = headers.get("x-vobiz-timestamp")
if not signature:
# FIXME: Vobiz is not sending the x-vobiz-signature. Temporarily
# returning True
normalized_headers = {key.lower(): value for key, value in headers.items()}
signature = normalized_headers.get(
"x-vobiz-signature-v3"
) or normalized_headers.get("x-vobiz-signature-ma-v3", "")
nonce = normalized_headers.get("x-vobiz-signature-v3-nonce")
signature_version = "v3"
if not signature:
signature = normalized_headers.get(
"x-vobiz-signature-v2"
) or normalized_headers.get("x-vobiz-signature-ma-v2", "")
nonce = normalized_headers.get("x-vobiz-signature-v2-nonce")
signature_version = "v2"
if not signature:
logger.warning("Inbound Vobiz webhook missing X-Vobiz-Signature-V3/V2")
return False
# Vobiz always signs its webhooks; missing header means the
# request didn't come from Vobiz (or was tampered with).
logger.warning("Inbound Vobiz webhook missing X-Vobiz-Signature")
return True
return await self.verify_webhook_signature(
url, webhook_data, signature, timestamp, body
url,
webhook_data,
signature,
nonce,
body,
signature_version=signature_version,
)
async def configure_inbound(

View file

@ -93,8 +93,17 @@ async def handle_vobiz_hangup_callback(
f"[run {workflow_run_id}] Received Vobiz hangup callback {json.dumps(callback_data)}"
)
# Verify signature if provided
if x_vobiz_signature:
# Verify signature if Vobiz provided any supported signature header.
has_vobiz_signature = any(
header in all_headers
for header in (
"x-vobiz-signature-v3",
"x-vobiz-signature-ma-v3",
"x-vobiz-signature-v2",
"x-vobiz-signature-ma-v2",
)
)
if has_vobiz_signature:
# We need the workflow run to get organization for provider credentials
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
@ -118,11 +127,10 @@ async def handle_vobiz_hangup_callback(
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}"
is_valid = await provider.verify_webhook_signature(
is_valid = await provider.verify_inbound_signature(
webhook_url,
callback_data,
x_vobiz_signature,
x_vobiz_timestamp,
all_headers,
raw_body,
)
@ -215,8 +223,17 @@ async def handle_vobiz_ring_callback(
f"[run {workflow_run_id}] Received Vobiz ring callback {json.dumps(callback_data)}"
)
# Verify signature if provided
if x_vobiz_signature:
# Verify signature if Vobiz provided any supported signature header.
has_vobiz_signature = any(
header in all_headers
for header in (
"x-vobiz-signature-v3",
"x-vobiz-signature-ma-v3",
"x-vobiz-signature-v2",
"x-vobiz-signature-ma-v2",
)
)
if has_vobiz_signature:
# We need the workflow run to get organization for provider credentials
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
@ -242,11 +259,10 @@ async def handle_vobiz_ring_callback(
f"{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}"
)
is_valid = await provider.verify_webhook_signature(
is_valid = await provider.verify_inbound_signature(
webhook_url,
callback_data,
x_vobiz_signature,
x_vobiz_timestamp,
all_headers,
raw_body,
)
@ -348,15 +364,23 @@ async def handle_vobiz_hangup_callback_by_workflow(
workflow_run, workflow.organization_id
)
if x_vobiz_signature:
has_vobiz_signature = any(
header in all_headers
for header in (
"x-vobiz-signature-v3",
"x-vobiz-signature-ma-v3",
"x-vobiz-signature-v2",
"x-vobiz-signature-ma-v2",
)
)
if has_vobiz_signature:
backend_endpoint, _ = await get_backend_endpoints()
webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}"
is_valid = await provider.verify_webhook_signature(
is_valid = await provider.verify_inbound_signature(
webhook_url,
callback_data,
x_vobiz_signature,
x_vobiz_timestamp,
all_headers,
raw_body,
)

View file

@ -1,6 +1,6 @@
import base64
import hashlib
import hmac
from datetime import UTC, datetime
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
from urllib.parse import urlencode
@ -61,19 +61,18 @@ def _request(
)
def _signed_headers(
provider: VobizProvider, *, form_data: dict[str, str]
) -> dict[str, str]:
timestamp = str(int(datetime.now(UTC).timestamp()))
body = urlencode(form_data)
signature = hmac.new(
provider.auth_token.encode("utf-8"),
f"{timestamp}.{body}".encode("utf-8"),
hashlib.sha256,
).hexdigest()
def _signed_headers(provider: VobizProvider, *, url: str) -> dict[str, str]:
nonce = "12345678901234567890"
signature = base64.b64encode(
hmac.new(
provider.auth_token.encode("utf-8"),
f"{url}.{nonce}".encode("utf-8"),
hashlib.sha256,
).digest()
).decode("ascii")
return {
"x-vobiz-signature": signature,
"x-vobiz-timestamp": timestamp,
"x-vobiz-signature-v3": signature,
"x-vobiz-signature-v3-nonce": nonce,
}
@ -88,7 +87,9 @@ async def test_vobiz_hangup_callback_accepts_signed_form_body():
"Direction": "outbound",
"Duration": "12",
}
headers = _signed_headers(provider, form_data=form_data)
headers = _signed_headers(
provider, url="https://example.test/api/v1/telephony/vobiz/hangup-callback/123"
)
request = _request(
path="/api/v1/telephony/vobiz/hangup-callback/123",
form_data=form_data,
@ -122,8 +123,6 @@ async def test_vobiz_hangup_callback_accepts_signed_form_body():
result = await handle_vobiz_hangup_callback(
workflow_run_id=123,
request=request,
x_vobiz_signature=headers["x-vobiz-signature"],
x_vobiz_timestamp=headers["x-vobiz-timestamp"],
)
assert result == {"status": "success"}
@ -139,7 +138,9 @@ async def test_vobiz_ring_callback_accepts_signed_form_body():
"From": "15551230001",
"To": "15551230002",
}
headers = _signed_headers(provider, form_data=form_data)
headers = _signed_headers(
provider, url="https://example.test/api/v1/telephony/vobiz/ring-callback/123"
)
request = _request(
path="/api/v1/telephony/vobiz/ring-callback/123",
form_data=form_data,
@ -170,9 +171,57 @@ async def test_vobiz_ring_callback_accepts_signed_form_body():
result = await handle_vobiz_ring_callback(
workflow_run_id=123,
request=request,
x_vobiz_signature=headers["x-vobiz-signature"],
x_vobiz_timestamp=headers["x-vobiz-timestamp"],
)
assert result == {"status": "success"}
db_client.update_workflow_run.assert_awaited_once()
@pytest.mark.asyncio
async def test_vobiz_verify_webhook_signature_accepts_v3_and_strips_query():
provider = _provider()
headers = _signed_headers(
provider, url="https://example.test/api/v1/telephony/vobiz/hangup-callback/123"
)
assert await provider.verify_webhook_signature(
"https://example.test/api/v1/telephony/vobiz/hangup-callback/123?foo=bar",
{},
headers["x-vobiz-signature-v3"],
headers["x-vobiz-signature-v3-nonce"],
signature_version="v3",
)
@pytest.mark.asyncio
async def test_vobiz_verify_inbound_signature_accepts_v2():
provider = _provider()
url = "https://example.test/api/v1/telephony/vobiz/hangup-callback/123"
nonce = "12345678901234567890"
signature = base64.b64encode(
hmac.new(
provider.auth_token.encode("utf-8"),
f"{url}{nonce}".encode("utf-8"),
hashlib.sha256,
).digest()
).decode("ascii")
assert await provider.verify_inbound_signature(
url,
{},
{
"X-Vobiz-Signature-V2": signature,
"X-Vobiz-Signature-V2-Nonce": nonce,
},
)
@pytest.mark.asyncio
async def test_vobiz_verify_inbound_signature_rejects_missing_signature():
provider = _provider()
assert not await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/vobiz/hangup-callback/123",
{},
{},
)