diff --git a/api/services/telephony/providers/vobiz/provider.py b/api/services/telephony/providers/vobiz/provider.py index 4cf017d..383da3c 100644 --- a/api/services/telephony/providers/vobiz/provider.py +++ b/api/services/telephony/providers/vobiz/provider.py @@ -2,9 +2,13 @@ Vobiz implementation of the TelephonyProvider interface. """ +import base64 +import hashlib +import hmac import json import random from typing import TYPE_CHECKING, Any, Dict, List, Optional +from urllib.parse import urlparse, urlunparse import aiohttp from fastapi import HTTPException @@ -201,23 +205,20 @@ class VobizProvider(TelephonyProvider): url: str, params: Dict[str, Any], signature: str, - timestamp: str = None, + nonce: str = None, body: str = "", + signature_version: str = "v3", ) -> bool: """ Verify Vobiz webhook signature for security. - Vobiz uses HMAC-SHA256 signature verification with timestamp validation: - - Header: x-vobiz-signature (HMAC-SHA256 hash) - - Header: x-vobiz-timestamp (timestamp for replay protection) - - Signature = HMAC-SHA256(auth_token, timestamp + '.' + body) + Vobiz signs the callback base URL (query parameters stripped) with + the account auth token and a request nonce: + - V2: base64(HMAC-SHA256(auth_token, baseURL + nonce)) + - V3: base64(HMAC-SHA256(auth_token, baseURL + "." + nonce)) """ - import hashlib - import hmac - from datetime import datetime, timezone - - if not signature or not timestamp: - logger.warning("Missing signature or timestamp headers for Vobiz webhook") + if not signature or not nonce: + logger.warning("Missing signature or nonce headers for Vobiz webhook") return False if not self.auth_token: @@ -226,37 +227,33 @@ class VobizProvider(TelephonyProvider): ) return False - try: - # 1. Timestamp validation (within 5 minutes) - webhook_timestamp = int(timestamp) - current_timestamp = int(datetime.now(timezone.utc).timestamp()) - time_diff = abs(current_timestamp - webhook_timestamp) - - if time_diff > 300: # 5 minutes = 300 seconds - logger.warning(f"Vobiz webhook timestamp too old: {time_diff}s > 300s") - return False - - # 2. Signature verification - # Create expected signature: HMAC-SHA256(auth_token, timestamp + '.' + body) - payload = f"{timestamp}.{body}" - expected_signature = hmac.new( - self.auth_token.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256 - ).hexdigest() - - # 3. Compare signatures (timing-safe comparison) - is_valid = hmac.compare_digest(expected_signature, signature) - - if not is_valid: - logger.warning( - f"Vobiz webhook signature mismatch. Expected: {expected_signature[:8]}..., Got: {signature[:8]}..." - ) - - return is_valid - - except Exception as e: - logger.error(f"Error verifying Vobiz webhook signature: {e}") + version = signature_version.lower() + if version not in {"v2", "v3"}: + logger.warning(f"Unsupported Vobiz signature version: {signature_version}") return False + parsed_url = urlparse(url) + base_url = urlunparse( + (parsed_url.scheme, parsed_url.netloc, parsed_url.path, "", "", "") + ) + signed_payload = base_url + (f".{nonce}" if version == "v3" else nonce) + expected_signature = base64.b64encode( + hmac.new( + self.auth_token.encode("utf-8"), + signed_payload.encode("utf-8"), + hashlib.sha256, + ).digest() + ).decode("ascii") + + is_valid = hmac.compare_digest(expected_signature, signature) + + if not is_valid: + logger.warning( + f"Vobiz webhook signature mismatch. Expected: {expected_signature[:8]}..., Got: {signature[:8]}..." + ) + + return is_valid + async def get_webhook_response( self, workflow_id: int, user_id: int, workflow_run_id: int ) -> str: @@ -472,20 +469,34 @@ class VobizProvider(TelephonyProvider): ) -> bool: """ Verify the signature of an inbound Vobiz webhook for security. - Uses HMAC-SHA256 over ``timestamp + '.' + body`` with the auth_token. + Uses Vobiz's documented V3/V2 HMAC-SHA256 callback signatures. """ - signature = headers.get("x-vobiz-signature", "") - timestamp = headers.get("x-vobiz-timestamp") - if not signature: - # FIXME: Vobiz is not sending the x-vobiz-signature. Temporarily - # returning True + normalized_headers = {key.lower(): value for key, value in headers.items()} + + signature = normalized_headers.get( + "x-vobiz-signature-v3" + ) or normalized_headers.get("x-vobiz-signature-ma-v3", "") + nonce = normalized_headers.get("x-vobiz-signature-v3-nonce") + signature_version = "v3" + + if not signature: + signature = normalized_headers.get( + "x-vobiz-signature-v2" + ) or normalized_headers.get("x-vobiz-signature-ma-v2", "") + nonce = normalized_headers.get("x-vobiz-signature-v2-nonce") + signature_version = "v2" + + if not signature: + logger.warning("Inbound Vobiz webhook missing X-Vobiz-Signature-V3/V2") + return False - # Vobiz always signs its webhooks; missing header means the - # request didn't come from Vobiz (or was tampered with). - logger.warning("Inbound Vobiz webhook missing X-Vobiz-Signature") - return True return await self.verify_webhook_signature( - url, webhook_data, signature, timestamp, body + url, + webhook_data, + signature, + nonce, + body, + signature_version=signature_version, ) async def configure_inbound( diff --git a/api/services/telephony/providers/vobiz/routes.py b/api/services/telephony/providers/vobiz/routes.py index 3c13e4b..15c2def 100644 --- a/api/services/telephony/providers/vobiz/routes.py +++ b/api/services/telephony/providers/vobiz/routes.py @@ -93,8 +93,17 @@ async def handle_vobiz_hangup_callback( f"[run {workflow_run_id}] Received Vobiz hangup callback {json.dumps(callback_data)}" ) - # Verify signature if provided - if x_vobiz_signature: + # Verify signature if Vobiz provided any supported signature header. + has_vobiz_signature = any( + header in all_headers + for header in ( + "x-vobiz-signature-v3", + "x-vobiz-signature-ma-v3", + "x-vobiz-signature-v2", + "x-vobiz-signature-ma-v2", + ) + ) + if has_vobiz_signature: # We need the workflow run to get organization for provider credentials workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: @@ -118,11 +127,10 @@ async def handle_vobiz_hangup_callback( backend_endpoint, _ = await get_backend_endpoints() webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/{workflow_run_id}" - is_valid = await provider.verify_webhook_signature( + is_valid = await provider.verify_inbound_signature( webhook_url, callback_data, - x_vobiz_signature, - x_vobiz_timestamp, + all_headers, raw_body, ) @@ -215,8 +223,17 @@ async def handle_vobiz_ring_callback( f"[run {workflow_run_id}] Received Vobiz ring callback {json.dumps(callback_data)}" ) - # Verify signature if provided - if x_vobiz_signature: + # Verify signature if Vobiz provided any supported signature header. + has_vobiz_signature = any( + header in all_headers + for header in ( + "x-vobiz-signature-v3", + "x-vobiz-signature-ma-v3", + "x-vobiz-signature-v2", + "x-vobiz-signature-ma-v2", + ) + ) + if has_vobiz_signature: # We need the workflow run to get organization for provider credentials workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: @@ -242,11 +259,10 @@ async def handle_vobiz_ring_callback( f"{backend_endpoint}/api/v1/telephony/vobiz/ring-callback/{workflow_run_id}" ) - is_valid = await provider.verify_webhook_signature( + is_valid = await provider.verify_inbound_signature( webhook_url, callback_data, - x_vobiz_signature, - x_vobiz_timestamp, + all_headers, raw_body, ) @@ -348,15 +364,23 @@ async def handle_vobiz_hangup_callback_by_workflow( workflow_run, workflow.organization_id ) - if x_vobiz_signature: + has_vobiz_signature = any( + header in all_headers + for header in ( + "x-vobiz-signature-v3", + "x-vobiz-signature-ma-v3", + "x-vobiz-signature-v2", + "x-vobiz-signature-ma-v2", + ) + ) + if has_vobiz_signature: backend_endpoint, _ = await get_backend_endpoints() webhook_url = f"{backend_endpoint}/api/v1/telephony/vobiz/hangup-callback/workflow/{workflow_id}" - is_valid = await provider.verify_webhook_signature( + is_valid = await provider.verify_inbound_signature( webhook_url, callback_data, - x_vobiz_signature, - x_vobiz_timestamp, + all_headers, raw_body, ) diff --git a/api/tests/telephony/vobiz/test_routes.py b/api/tests/telephony/vobiz/test_routes.py index f726eee..7b80d46 100644 --- a/api/tests/telephony/vobiz/test_routes.py +++ b/api/tests/telephony/vobiz/test_routes.py @@ -1,6 +1,6 @@ +import base64 import hashlib import hmac -from datetime import UTC, datetime from types import SimpleNamespace from unittest.mock import AsyncMock, patch from urllib.parse import urlencode @@ -61,19 +61,18 @@ def _request( ) -def _signed_headers( - provider: VobizProvider, *, form_data: dict[str, str] -) -> dict[str, str]: - timestamp = str(int(datetime.now(UTC).timestamp())) - body = urlencode(form_data) - signature = hmac.new( - provider.auth_token.encode("utf-8"), - f"{timestamp}.{body}".encode("utf-8"), - hashlib.sha256, - ).hexdigest() +def _signed_headers(provider: VobizProvider, *, url: str) -> dict[str, str]: + nonce = "12345678901234567890" + signature = base64.b64encode( + hmac.new( + provider.auth_token.encode("utf-8"), + f"{url}.{nonce}".encode("utf-8"), + hashlib.sha256, + ).digest() + ).decode("ascii") return { - "x-vobiz-signature": signature, - "x-vobiz-timestamp": timestamp, + "x-vobiz-signature-v3": signature, + "x-vobiz-signature-v3-nonce": nonce, } @@ -88,7 +87,9 @@ async def test_vobiz_hangup_callback_accepts_signed_form_body(): "Direction": "outbound", "Duration": "12", } - headers = _signed_headers(provider, form_data=form_data) + headers = _signed_headers( + provider, url="https://example.test/api/v1/telephony/vobiz/hangup-callback/123" + ) request = _request( path="/api/v1/telephony/vobiz/hangup-callback/123", form_data=form_data, @@ -122,8 +123,6 @@ async def test_vobiz_hangup_callback_accepts_signed_form_body(): result = await handle_vobiz_hangup_callback( workflow_run_id=123, request=request, - x_vobiz_signature=headers["x-vobiz-signature"], - x_vobiz_timestamp=headers["x-vobiz-timestamp"], ) assert result == {"status": "success"} @@ -139,7 +138,9 @@ async def test_vobiz_ring_callback_accepts_signed_form_body(): "From": "15551230001", "To": "15551230002", } - headers = _signed_headers(provider, form_data=form_data) + headers = _signed_headers( + provider, url="https://example.test/api/v1/telephony/vobiz/ring-callback/123" + ) request = _request( path="/api/v1/telephony/vobiz/ring-callback/123", form_data=form_data, @@ -170,9 +171,57 @@ async def test_vobiz_ring_callback_accepts_signed_form_body(): result = await handle_vobiz_ring_callback( workflow_run_id=123, request=request, - x_vobiz_signature=headers["x-vobiz-signature"], - x_vobiz_timestamp=headers["x-vobiz-timestamp"], ) assert result == {"status": "success"} db_client.update_workflow_run.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_vobiz_verify_webhook_signature_accepts_v3_and_strips_query(): + provider = _provider() + headers = _signed_headers( + provider, url="https://example.test/api/v1/telephony/vobiz/hangup-callback/123" + ) + + assert await provider.verify_webhook_signature( + "https://example.test/api/v1/telephony/vobiz/hangup-callback/123?foo=bar", + {}, + headers["x-vobiz-signature-v3"], + headers["x-vobiz-signature-v3-nonce"], + signature_version="v3", + ) + + +@pytest.mark.asyncio +async def test_vobiz_verify_inbound_signature_accepts_v2(): + provider = _provider() + url = "https://example.test/api/v1/telephony/vobiz/hangup-callback/123" + nonce = "12345678901234567890" + signature = base64.b64encode( + hmac.new( + provider.auth_token.encode("utf-8"), + f"{url}{nonce}".encode("utf-8"), + hashlib.sha256, + ).digest() + ).decode("ascii") + + assert await provider.verify_inbound_signature( + url, + {}, + { + "X-Vobiz-Signature-V2": signature, + "X-Vobiz-Signature-V2-Nonce": nonce, + }, + ) + + +@pytest.mark.asyncio +async def test_vobiz_verify_inbound_signature_rejects_missing_signature(): + provider = _provider() + + assert not await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/vobiz/hangup-callback/123", + {}, + {}, + )