From 137b5e9f89907df0ba938be723aabc7591e74745 Mon Sep 17 00:00:00 2001 From: Abhishek Sharma Date: Tue, 12 May 2026 06:07:31 -0700 Subject: [PATCH] Verify Telnyx webhook signatures (#271) * Verify Telnyx webhook signatures * feat: harden telnyx webhook signature verification --------- Co-authored-by: a692570 Co-authored-by: Sabiha Khan --- api/requirements.txt | 1 + .../telephony/providers/telnyx/provider.py | 119 +++++- .../telephony/providers/telnyx/routes.py | 49 ++- api/tests/telephony/telnyx/__init__.py | 1 + api/tests/telephony/telnyx/test_provider.py | 363 ++++++++++++++++++ 5 files changed, 509 insertions(+), 24 deletions(-) create mode 100644 api/tests/telephony/telnyx/__init__.py create mode 100644 api/tests/telephony/telnyx/test_provider.py diff --git a/api/requirements.txt b/api/requirements.txt index 22a2f0b..4801572 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -18,3 +18,4 @@ bcrypt==5.0.0 email-validator==2.3.0 posthog==7.11.1 fastmcp==3.2.4 +PyNaCl==1.6.2 diff --git a/api/services/telephony/providers/telnyx/provider.py b/api/services/telephony/providers/telnyx/provider.py index 96ee77d..f14e0f1 100644 --- a/api/services/telephony/providers/telnyx/provider.py +++ b/api/services/telephony/providers/telnyx/provider.py @@ -4,14 +4,27 @@ Uses the Telnyx Call Control API v2 for outbound calling with inline WebSocket media streaming. """ +import base64 +import binascii import json import random +import time from typing import TYPE_CHECKING, Any, Dict, List, Optional import aiohttp +import nacl.exceptions +import nacl.signing from fastapi import HTTPException, WebSocketDisconnect from loguru import logger +# 5-min replay window — matches Telnyx SDKs (Python/Node/Go/Ruby/PHP); +# Source: github.com/team-telnyx/telnyx-python src/telnyx/lib/webhook_verification.py +TELNYX_TIMESTAMP_TOLERANCE_SECONDS = 300 + +# Ed25519 sizes per RFC 8032; Telnyx SDKs check these for clearer errors than PyNaCl. +TELNYX_PUBLIC_KEY_BYTES = 32 +TELNYX_SIGNATURE_BYTES = 64 + from api.enums import WorkflowRunMode from api.services.telephony.base import ( CallInitiationResult, @@ -34,6 +47,13 @@ def normalize_event_type(event_type: str) -> str: return (event_type or "").replace("_", ".") +def _get_header(headers: Dict[str, str], name: str) -> str: + for key, value in headers.items(): + if key.lower() == name: + return value + return "" + + class TelnyxProvider(TelephonyProvider): """ Telnyx implementation of TelephonyProvider. @@ -168,13 +188,83 @@ class TelnyxProvider(TelephonyProvider): async def verify_webhook_signature( self, url: str, params: Dict[str, Any], signature: str ) -> bool: - """Required by the abstract interface but not actively called for Telnyx. + """Verify a Telnyx Ed25519 webhook signature. - Telnyx webhook signature verification uses Ed25519 (via the - telnyx-signature-ed25519 header). This can be implemented in the - future using the Telnyx SDK if needed. + Telnyx signs ``{timestamp}|{json_payload}`` and sends the signature in + ``telnyx-signature-ed25519``. The public key is read from provider + configuration, not from the request. ``url`` is unused — Telnyx does + not sign the request URL; the parameter exists to satisfy the base + class interface. + + Docs: + https://developers.telnyx.com/development/api-fundamentals/webhooks/receiving-webhooks """ - return True + timestamp = params.get("telnyx_timestamp") or params.get("timestamp") + raw_body = params.get("_raw_body", "") + + if not signature: + logger.warning("Telnyx webhook missing telnyx-signature-ed25519 header") + return False + if not timestamp: + logger.warning("Telnyx webhook missing telnyx-timestamp header") + return False + + if not self.webhook_public_key: + logger.error("Missing Telnyx webhook_public_key configuration") + return False + + try: + ts_int = int(timestamp) + except (TypeError, ValueError): + logger.warning(f"Invalid Telnyx webhook timestamp format: {timestamp!r}") + return False + + if abs(time.time() - ts_int) > TELNYX_TIMESTAMP_TOLERANCE_SECONDS: + logger.warning( + f"Telnyx webhook timestamp outside " + f"{TELNYX_TIMESTAMP_TOLERANCE_SECONDS}s tolerance: " + f"timestamp={ts_int}, now={int(time.time())}" + ) + return False + + if isinstance(raw_body, bytes): + raw_body = raw_body.decode("utf-8") + + try: + signature_bytes = base64.b64decode(signature, validate=True) + except (binascii.Error, ValueError) as e: + logger.warning(f"Telnyx webhook signature not valid base64: {e}") + return False + + try: + public_key_bytes = base64.b64decode( + self.webhook_public_key.strip(), validate=True + ) + except (binascii.Error, ValueError) as e: + logger.error(f"Telnyx webhook_public_key not valid base64: {e}") + return False + + if len(public_key_bytes) != TELNYX_PUBLIC_KEY_BYTES: + logger.error( + f"Telnyx webhook_public_key wrong length: expected " + f"{TELNYX_PUBLIC_KEY_BYTES}, got {len(public_key_bytes)}" + ) + return False + + if len(signature_bytes) != TELNYX_SIGNATURE_BYTES: + logger.warning( + f"Telnyx webhook signature wrong length: expected " + f"{TELNYX_SIGNATURE_BYTES}, got {len(signature_bytes)}" + ) + return False + + try: + verify_key = nacl.signing.VerifyKey(public_key_bytes) + signed_payload = f"{timestamp}|{raw_body}".encode("utf-8") + verify_key.verify(signed_payload, signature_bytes) + return True + except nacl.exceptions.BadSignatureError: + return False async def get_webhook_response( self, workflow_id: int, user_id: int, workflow_run_id: int @@ -420,9 +510,9 @@ class TelnyxProvider(TelephonyProvider): return NormalizedInboundData( provider=TelnyxProvider.PROVIDER_NAME, call_id=payload.get("call_control_id", ""), - from_number=normalize_telephony_address(from_raw).canonical - if from_raw - else "", + from_number=( + normalize_telephony_address(from_raw).canonical if from_raw else "" + ), to_number=normalize_telephony_address(to_raw).canonical if to_raw else "", direction=direction, call_status=normalize_event_type(data.get("event_type", "")), @@ -444,11 +534,14 @@ class TelnyxProvider(TelephonyProvider): headers: Dict[str, str], body: str = "", ) -> bool: - """Required by the abstract interface. Telnyx signature verification - (Ed25519 via ``telnyx-signature-ed25519``) is not yet implemented — - accepts all inbound webhooks for now. - """ - return True + """Verify the signature of an inbound Telnyx webhook.""" + signature = _get_header(headers, "telnyx-signature-ed25519") + timestamp = _get_header(headers, "telnyx-timestamp") + return await self.verify_webhook_signature( + url, + {"telnyx_timestamp": timestamp, "_raw_body": body}, + signature, + ) async def configure_inbound( self, address: str, webhook_url: Optional[str] diff --git a/api/services/telephony/providers/telnyx/routes.py b/api/services/telephony/providers/telnyx/routes.py index 455e0e8..e2edbe5 100644 --- a/api/services/telephony/providers/telnyx/routes.py +++ b/api/services/telephony/providers/telnyx/routes.py @@ -6,7 +6,7 @@ provider registry — see ProviderSpec.router. import json -from fastapi import APIRouter, Request +from fastapi import APIRouter, HTTPException, Request from loguru import logger from pipecat.utils.run_context import set_current_run_id @@ -56,10 +56,15 @@ async def handle_telnyx_events( """ set_current_run_id(workflow_run_id) - event_data = await request.json() - logger.info( - f"[run {workflow_run_id}] Received Telnyx event: {json.dumps(event_data)}" - ) + try: + raw_body = (await request.body()).decode("utf-8") + except UnicodeDecodeError: + logger.warning( + f"[run {workflow_run_id}] Telnyx webhook body is not valid UTF-8" + ) + raise HTTPException(status_code=400, detail="Webhook body is not valid UTF-8") + + event_data = json.loads(raw_body) # Extract event type from Telnyx envelope. Telnyx sometimes delivers the # type with underscores (``streaming_started``) instead of dots @@ -67,26 +72,48 @@ async def handle_telnyx_events( data = event_data.get("data", {}) event_type = normalize_event_type(data.get("event_type", "")) - # Skip streaming events — they're informational only - if event_type in ("streaming.started", "streaming.stopped"): - logger.debug(f"[run {workflow_run_id}] Telnyx streaming event: {event_type}") - return {"status": "success"} + logger.info( + f"[run {workflow_run_id}] Received Telnyx event: event_type={event_type}" + ) + logger.debug(f"[run {workflow_run_id}] Telnyx event body: {json.dumps(event_data)}") # Get workflow run and provider workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id) if not workflow_run: logger.warning(f"Workflow run {workflow_run_id} not found for Telnyx event") - return {"status": "ignored", "reason": "workflow_run_not_found"} + raise HTTPException(status_code=404, detail="Workflow run not found") workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id) if not workflow: logger.warning(f"Workflow {workflow_run.workflow_id} not found") - return {"status": "ignored", "reason": "workflow_not_found"} + raise HTTPException(status_code=404, detail="Workflow not found") provider = await get_telephony_provider_for_run( workflow_run, workflow.organization_id ) + signature_valid = await provider.verify_inbound_signature( + "", event_data, dict(request.headers), raw_body + ) + if not signature_valid: + logger.warning( + f"[run {workflow_run_id}] Invalid Telnyx webhook signature " + f"(event_type={event_type}, " + f"timestamp={request.headers.get('telnyx-timestamp')}, " + f"body_len={len(raw_body)})" + ) + raise HTTPException(status_code=401, detail="Invalid webhook signature") + + logger.debug( + f"[run {workflow_run_id}] Telnyx webhook signature verified " + f"(event_type={event_type})" + ) + + # Skip streaming events. They are informational only, but still verified. + if event_type in ("streaming.started", "streaming.stopped"): + logger.debug(f"[run {workflow_run_id}] Telnyx streaming event: {event_type}") + return {"status": "success"} + # Parse the callback data into generic format parsed_data = provider.parse_status_callback(event_data) diff --git a/api/tests/telephony/telnyx/__init__.py b/api/tests/telephony/telnyx/__init__.py new file mode 100644 index 0000000..5877d14 --- /dev/null +++ b/api/tests/telephony/telnyx/__init__.py @@ -0,0 +1 @@ +"""Telnyx telephony provider tests.""" diff --git a/api/tests/telephony/telnyx/test_provider.py b/api/tests/telephony/telnyx/test_provider.py new file mode 100644 index 0000000..6462119 --- /dev/null +++ b/api/tests/telephony/telnyx/test_provider.py @@ -0,0 +1,363 @@ +import base64 +import json +import time +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import nacl.signing +import pytest +from fastapi import HTTPException +from starlette.requests import Request + +from api.services.telephony.providers.telnyx.provider import TelnyxProvider +from api.services.telephony.providers.telnyx.routes import handle_telnyx_events + + +def _body() -> str: + return json.dumps( + { + "data": { + "record_type": "event", + "event_type": "call.initiated", + "payload": { + "call_control_id": "call-control-id", + "connection_id": "connection-id", + "direction": "incoming", + "from": "+15551230001", + "to": "+15551230002", + }, + } + }, + separators=(",", ":"), + ) + + +def _provider(public_key: str = "") -> TelnyxProvider: + return TelnyxProvider( + { + "api_key": "placeholder-api-key", + "connection_id": "connection-id", + "webhook_public_key": public_key, + "from_numbers": ["+15551230002"], + } + ) + + +def _signed_headers(body: str, timestamp: str | None = None): + if timestamp is None: + timestamp = str(int(time.time())) + signing_key = nacl.signing.SigningKey.generate() + public_key = base64.b64encode(bytes(signing_key.verify_key)).decode("ascii") + signed_payload = f"{timestamp}|{body}".encode("utf-8") + signature = base64.b64encode(signing_key.sign(signed_payload).signature).decode( + "ascii" + ) + return ( + public_key, + { + "telnyx-signature-ed25519": signature, + "telnyx-timestamp": timestamp, + }, + ) + + +def _request(body: str, headers: dict[str, str]) -> Request: + async def receive(): + return { + "type": "http.request", + "body": body.encode("utf-8"), + "more_body": False, + } + + return Request( + { + "type": "http", + "method": "POST", + "path": "/api/v1/telephony/telnyx/events/123", + "headers": [ + (name.lower().encode("ascii"), value.encode("ascii")) + for name, value in headers.items() + ], + }, + receive, + ) + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_accepts_valid_telnyx_signature(): + body = _body() + public_key, headers = _signed_headers(body) + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body, + ) + + assert result is True + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_tampered_body(): + body = _body() + public_key, headers = _signed_headers(body) + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body.replace("incoming", "outgoing"), + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_missing_signature_header(): + body = _body() + public_key, headers = _signed_headers(body) + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + {"telnyx-timestamp": headers["telnyx-timestamp"]}, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_missing_timestamp_header(): + body = _body() + public_key, headers = _signed_headers(body) + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + {"telnyx-signature-ed25519": headers["telnyx-signature-ed25519"]}, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_missing_config_public_key(): + body = _body() + _, headers = _signed_headers(body) + provider = _provider() + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_reads_headers_case_insensitively(): + body = _body() + public_key, headers = _signed_headers(body) + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + { + "Telnyx-Signature-Ed25519": headers["telnyx-signature-ed25519"], + "Telnyx-Timestamp": headers["telnyx-timestamp"], + }, + body, + ) + + assert result is True + + +@pytest.mark.asyncio +async def test_telnyx_events_route_verifies_signature_before_status_update(): + body = _body() + public_key, headers = _signed_headers(body) + provider = _provider(public_key) + + with ( + patch("api.services.telephony.providers.telnyx.routes.db_client") as db_client, + patch( + "api.services.telephony.providers.telnyx.routes.get_telephony_provider_for_run", + new_callable=AsyncMock, + return_value=provider, + ), + patch( + "api.services.telephony.providers.telnyx.routes._process_status_update", + new_callable=AsyncMock, + ) as process_status, + ): + db_client.get_workflow_run_by_id = AsyncMock( + return_value=SimpleNamespace(workflow_id=7) + ) + db_client.get_workflow_by_id = AsyncMock( + return_value=SimpleNamespace(organization_id=11) + ) + + result = await handle_telnyx_events( + _request(body, headers), workflow_run_id=123 + ) + + assert result == {"status": "success"} + process_status.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_stale_timestamp(): + body = _body() + stale_ts = str(int(time.time()) - 600) + public_key, headers = _signed_headers(body, timestamp=stale_ts) + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_future_timestamp(): + body = _body() + future_ts = str(int(time.time()) + 600) + public_key, headers = _signed_headers(body, timestamp=future_ts) + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_non_integer_timestamp(): + body = _body() + public_key, headers = _signed_headers(body) + headers["telnyx-timestamp"] = "not-a-number" + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_wrong_length_public_key(): + body = _body() + _, headers = _signed_headers(body) + short_key = base64.b64encode(b"x" * 16).decode("ascii") + provider = _provider(short_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_verify_inbound_signature_rejects_wrong_length_signature(): + body = _body() + public_key, headers = _signed_headers(body) + headers["telnyx-signature-ed25519"] = base64.b64encode(b"x" * 32).decode("ascii") + provider = _provider(public_key) + + result = await provider.verify_inbound_signature( + "https://example.test/api/v1/telephony/inbound/run", + json.loads(body), + headers, + body, + ) + + assert result is False + + +@pytest.mark.asyncio +async def test_telnyx_events_route_rejects_invalid_signature_with_401(): + body = _body() + public_key, headers = _signed_headers(body) + provider = _provider(public_key) + + with ( + patch("api.services.telephony.providers.telnyx.routes.db_client") as db_client, + patch( + "api.services.telephony.providers.telnyx.routes.get_telephony_provider_for_run", + new_callable=AsyncMock, + return_value=provider, + ), + patch( + "api.services.telephony.providers.telnyx.routes._process_status_update", + new_callable=AsyncMock, + ) as process_status, + patch.object( + provider, + "verify_inbound_signature", + new_callable=AsyncMock, + return_value=False, + ), + ): + db_client.get_workflow_run_by_id = AsyncMock( + return_value=SimpleNamespace(workflow_id=7) + ) + db_client.get_workflow_by_id = AsyncMock( + return_value=SimpleNamespace(organization_id=11) + ) + + with pytest.raises(HTTPException) as exc_info: + await handle_telnyx_events(_request(body, headers), workflow_run_id=123) + + assert exc_info.value.status_code == 401 + assert exc_info.value.detail == "Invalid webhook signature" + process_status.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_telnyx_events_route_rejects_invalid_utf8_body_with_400(): + invalid_body = b"\xff\xfe\xfd" + + async def receive(): + return {"type": "http.request", "body": invalid_body, "more_body": False} + + request = Request( + { + "type": "http", + "method": "POST", + "path": "/api/v1/telephony/telnyx/events/123", + "headers": [], + }, + receive, + ) + + with pytest.raises(HTTPException) as exc_info: + await handle_telnyx_events(request, workflow_run_id=123) + + assert exc_info.value.status_code == 400 + assert exc_info.value.detail == "Webhook body is not valid UTF-8"