Verify Telnyx webhook signatures (#271)

* Verify Telnyx webhook signatures

* feat: harden telnyx webhook signature verification

---------

Co-authored-by: a692570 <a692570@users.noreply.github.com>
Co-authored-by: Sabiha Khan <sabihak89@gmail.com>
This commit is contained in:
Abhishek Sharma 2026-05-12 06:07:31 -07:00 committed by GitHub
parent 9389340807
commit 137b5e9f89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 509 additions and 24 deletions

View file

@ -18,3 +18,4 @@ bcrypt==5.0.0
email-validator==2.3.0
posthog==7.11.1
fastmcp==3.2.4
PyNaCl==1.6.2

View file

@ -4,14 +4,27 @@ Uses the Telnyx Call Control API v2 for outbound calling with
inline WebSocket media streaming.
"""
import base64
import binascii
import json
import random
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import aiohttp
import nacl.exceptions
import nacl.signing
from fastapi import HTTPException, WebSocketDisconnect
from loguru import logger
# 5-min replay window — matches Telnyx SDKs (Python/Node/Go/Ruby/PHP);
# Source: github.com/team-telnyx/telnyx-python src/telnyx/lib/webhook_verification.py
TELNYX_TIMESTAMP_TOLERANCE_SECONDS = 300
# Ed25519 sizes per RFC 8032; Telnyx SDKs check these for clearer errors than PyNaCl.
TELNYX_PUBLIC_KEY_BYTES = 32
TELNYX_SIGNATURE_BYTES = 64
from api.enums import WorkflowRunMode
from api.services.telephony.base import (
CallInitiationResult,
@ -34,6 +47,13 @@ def normalize_event_type(event_type: str) -> str:
return (event_type or "").replace("_", ".")
def _get_header(headers: Dict[str, str], name: str) -> str:
for key, value in headers.items():
if key.lower() == name:
return value
return ""
class TelnyxProvider(TelephonyProvider):
"""
Telnyx implementation of TelephonyProvider.
@ -168,13 +188,83 @@ class TelnyxProvider(TelephonyProvider):
async def verify_webhook_signature(
self, url: str, params: Dict[str, Any], signature: str
) -> bool:
"""Required by the abstract interface but not actively called for Telnyx.
"""Verify a Telnyx Ed25519 webhook signature.
Telnyx webhook signature verification uses Ed25519 (via the
telnyx-signature-ed25519 header). This can be implemented in the
future using the Telnyx SDK if needed.
Telnyx signs ``{timestamp}|{json_payload}`` and sends the signature in
``telnyx-signature-ed25519``. The public key is read from provider
configuration, not from the request. ``url`` is unused Telnyx does
not sign the request URL; the parameter exists to satisfy the base
class interface.
Docs:
https://developers.telnyx.com/development/api-fundamentals/webhooks/receiving-webhooks
"""
return True
timestamp = params.get("telnyx_timestamp") or params.get("timestamp")
raw_body = params.get("_raw_body", "")
if not signature:
logger.warning("Telnyx webhook missing telnyx-signature-ed25519 header")
return False
if not timestamp:
logger.warning("Telnyx webhook missing telnyx-timestamp header")
return False
if not self.webhook_public_key:
logger.error("Missing Telnyx webhook_public_key configuration")
return False
try:
ts_int = int(timestamp)
except (TypeError, ValueError):
logger.warning(f"Invalid Telnyx webhook timestamp format: {timestamp!r}")
return False
if abs(time.time() - ts_int) > TELNYX_TIMESTAMP_TOLERANCE_SECONDS:
logger.warning(
f"Telnyx webhook timestamp outside "
f"{TELNYX_TIMESTAMP_TOLERANCE_SECONDS}s tolerance: "
f"timestamp={ts_int}, now={int(time.time())}"
)
return False
if isinstance(raw_body, bytes):
raw_body = raw_body.decode("utf-8")
try:
signature_bytes = base64.b64decode(signature, validate=True)
except (binascii.Error, ValueError) as e:
logger.warning(f"Telnyx webhook signature not valid base64: {e}")
return False
try:
public_key_bytes = base64.b64decode(
self.webhook_public_key.strip(), validate=True
)
except (binascii.Error, ValueError) as e:
logger.error(f"Telnyx webhook_public_key not valid base64: {e}")
return False
if len(public_key_bytes) != TELNYX_PUBLIC_KEY_BYTES:
logger.error(
f"Telnyx webhook_public_key wrong length: expected "
f"{TELNYX_PUBLIC_KEY_BYTES}, got {len(public_key_bytes)}"
)
return False
if len(signature_bytes) != TELNYX_SIGNATURE_BYTES:
logger.warning(
f"Telnyx webhook signature wrong length: expected "
f"{TELNYX_SIGNATURE_BYTES}, got {len(signature_bytes)}"
)
return False
try:
verify_key = nacl.signing.VerifyKey(public_key_bytes)
signed_payload = f"{timestamp}|{raw_body}".encode("utf-8")
verify_key.verify(signed_payload, signature_bytes)
return True
except nacl.exceptions.BadSignatureError:
return False
async def get_webhook_response(
self, workflow_id: int, user_id: int, workflow_run_id: int
@ -420,9 +510,9 @@ class TelnyxProvider(TelephonyProvider):
return NormalizedInboundData(
provider=TelnyxProvider.PROVIDER_NAME,
call_id=payload.get("call_control_id", ""),
from_number=normalize_telephony_address(from_raw).canonical
if from_raw
else "",
from_number=(
normalize_telephony_address(from_raw).canonical if from_raw else ""
),
to_number=normalize_telephony_address(to_raw).canonical if to_raw else "",
direction=direction,
call_status=normalize_event_type(data.get("event_type", "")),
@ -444,11 +534,14 @@ class TelnyxProvider(TelephonyProvider):
headers: Dict[str, str],
body: str = "",
) -> bool:
"""Required by the abstract interface. Telnyx signature verification
(Ed25519 via ``telnyx-signature-ed25519``) is not yet implemented
accepts all inbound webhooks for now.
"""
return True
"""Verify the signature of an inbound Telnyx webhook."""
signature = _get_header(headers, "telnyx-signature-ed25519")
timestamp = _get_header(headers, "telnyx-timestamp")
return await self.verify_webhook_signature(
url,
{"telnyx_timestamp": timestamp, "_raw_body": body},
signature,
)
async def configure_inbound(
self, address: str, webhook_url: Optional[str]

View file

@ -6,7 +6,7 @@ provider registry — see ProviderSpec.router.
import json
from fastapi import APIRouter, Request
from fastapi import APIRouter, HTTPException, Request
from loguru import logger
from pipecat.utils.run_context import set_current_run_id
@ -56,10 +56,15 @@ async def handle_telnyx_events(
"""
set_current_run_id(workflow_run_id)
event_data = await request.json()
logger.info(
f"[run {workflow_run_id}] Received Telnyx event: {json.dumps(event_data)}"
)
try:
raw_body = (await request.body()).decode("utf-8")
except UnicodeDecodeError:
logger.warning(
f"[run {workflow_run_id}] Telnyx webhook body is not valid UTF-8"
)
raise HTTPException(status_code=400, detail="Webhook body is not valid UTF-8")
event_data = json.loads(raw_body)
# Extract event type from Telnyx envelope. Telnyx sometimes delivers the
# type with underscores (``streaming_started``) instead of dots
@ -67,26 +72,48 @@ async def handle_telnyx_events(
data = event_data.get("data", {})
event_type = normalize_event_type(data.get("event_type", ""))
# Skip streaming events — they're informational only
if event_type in ("streaming.started", "streaming.stopped"):
logger.debug(f"[run {workflow_run_id}] Telnyx streaming event: {event_type}")
return {"status": "success"}
logger.info(
f"[run {workflow_run_id}] Received Telnyx event: event_type={event_type}"
)
logger.debug(f"[run {workflow_run_id}] Telnyx event body: {json.dumps(event_data)}")
# Get workflow run and provider
workflow_run = await db_client.get_workflow_run_by_id(workflow_run_id)
if not workflow_run:
logger.warning(f"Workflow run {workflow_run_id} not found for Telnyx event")
return {"status": "ignored", "reason": "workflow_run_not_found"}
raise HTTPException(status_code=404, detail="Workflow run not found")
workflow = await db_client.get_workflow_by_id(workflow_run.workflow_id)
if not workflow:
logger.warning(f"Workflow {workflow_run.workflow_id} not found")
return {"status": "ignored", "reason": "workflow_not_found"}
raise HTTPException(status_code=404, detail="Workflow not found")
provider = await get_telephony_provider_for_run(
workflow_run, workflow.organization_id
)
signature_valid = await provider.verify_inbound_signature(
"", event_data, dict(request.headers), raw_body
)
if not signature_valid:
logger.warning(
f"[run {workflow_run_id}] Invalid Telnyx webhook signature "
f"(event_type={event_type}, "
f"timestamp={request.headers.get('telnyx-timestamp')}, "
f"body_len={len(raw_body)})"
)
raise HTTPException(status_code=401, detail="Invalid webhook signature")
logger.debug(
f"[run {workflow_run_id}] Telnyx webhook signature verified "
f"(event_type={event_type})"
)
# Skip streaming events. They are informational only, but still verified.
if event_type in ("streaming.started", "streaming.stopped"):
logger.debug(f"[run {workflow_run_id}] Telnyx streaming event: {event_type}")
return {"status": "success"}
# Parse the callback data into generic format
parsed_data = provider.parse_status_callback(event_data)

View file

@ -0,0 +1 @@
"""Telnyx telephony provider tests."""

View file

@ -0,0 +1,363 @@
import base64
import json
import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
import nacl.signing
import pytest
from fastapi import HTTPException
from starlette.requests import Request
from api.services.telephony.providers.telnyx.provider import TelnyxProvider
from api.services.telephony.providers.telnyx.routes import handle_telnyx_events
def _body() -> str:
return json.dumps(
{
"data": {
"record_type": "event",
"event_type": "call.initiated",
"payload": {
"call_control_id": "call-control-id",
"connection_id": "connection-id",
"direction": "incoming",
"from": "+15551230001",
"to": "+15551230002",
},
}
},
separators=(",", ":"),
)
def _provider(public_key: str = "") -> TelnyxProvider:
return TelnyxProvider(
{
"api_key": "placeholder-api-key",
"connection_id": "connection-id",
"webhook_public_key": public_key,
"from_numbers": ["+15551230002"],
}
)
def _signed_headers(body: str, timestamp: str | None = None):
if timestamp is None:
timestamp = str(int(time.time()))
signing_key = nacl.signing.SigningKey.generate()
public_key = base64.b64encode(bytes(signing_key.verify_key)).decode("ascii")
signed_payload = f"{timestamp}|{body}".encode("utf-8")
signature = base64.b64encode(signing_key.sign(signed_payload).signature).decode(
"ascii"
)
return (
public_key,
{
"telnyx-signature-ed25519": signature,
"telnyx-timestamp": timestamp,
},
)
def _request(body: str, headers: dict[str, str]) -> Request:
async def receive():
return {
"type": "http.request",
"body": body.encode("utf-8"),
"more_body": False,
}
return Request(
{
"type": "http",
"method": "POST",
"path": "/api/v1/telephony/telnyx/events/123",
"headers": [
(name.lower().encode("ascii"), value.encode("ascii"))
for name, value in headers.items()
],
},
receive,
)
@pytest.mark.asyncio
async def test_verify_inbound_signature_accepts_valid_telnyx_signature():
body = _body()
public_key, headers = _signed_headers(body)
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body,
)
assert result is True
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_tampered_body():
body = _body()
public_key, headers = _signed_headers(body)
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body.replace("incoming", "outgoing"),
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_missing_signature_header():
body = _body()
public_key, headers = _signed_headers(body)
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
{"telnyx-timestamp": headers["telnyx-timestamp"]},
body,
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_missing_timestamp_header():
body = _body()
public_key, headers = _signed_headers(body)
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
{"telnyx-signature-ed25519": headers["telnyx-signature-ed25519"]},
body,
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_missing_config_public_key():
body = _body()
_, headers = _signed_headers(body)
provider = _provider()
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body,
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_reads_headers_case_insensitively():
body = _body()
public_key, headers = _signed_headers(body)
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
{
"Telnyx-Signature-Ed25519": headers["telnyx-signature-ed25519"],
"Telnyx-Timestamp": headers["telnyx-timestamp"],
},
body,
)
assert result is True
@pytest.mark.asyncio
async def test_telnyx_events_route_verifies_signature_before_status_update():
body = _body()
public_key, headers = _signed_headers(body)
provider = _provider(public_key)
with (
patch("api.services.telephony.providers.telnyx.routes.db_client") as db_client,
patch(
"api.services.telephony.providers.telnyx.routes.get_telephony_provider_for_run",
new_callable=AsyncMock,
return_value=provider,
),
patch(
"api.services.telephony.providers.telnyx.routes._process_status_update",
new_callable=AsyncMock,
) as process_status,
):
db_client.get_workflow_run_by_id = AsyncMock(
return_value=SimpleNamespace(workflow_id=7)
)
db_client.get_workflow_by_id = AsyncMock(
return_value=SimpleNamespace(organization_id=11)
)
result = await handle_telnyx_events(
_request(body, headers), workflow_run_id=123
)
assert result == {"status": "success"}
process_status.assert_awaited_once()
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_stale_timestamp():
body = _body()
stale_ts = str(int(time.time()) - 600)
public_key, headers = _signed_headers(body, timestamp=stale_ts)
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body,
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_future_timestamp():
body = _body()
future_ts = str(int(time.time()) + 600)
public_key, headers = _signed_headers(body, timestamp=future_ts)
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body,
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_non_integer_timestamp():
body = _body()
public_key, headers = _signed_headers(body)
headers["telnyx-timestamp"] = "not-a-number"
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body,
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_wrong_length_public_key():
body = _body()
_, headers = _signed_headers(body)
short_key = base64.b64encode(b"x" * 16).decode("ascii")
provider = _provider(short_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body,
)
assert result is False
@pytest.mark.asyncio
async def test_verify_inbound_signature_rejects_wrong_length_signature():
body = _body()
public_key, headers = _signed_headers(body)
headers["telnyx-signature-ed25519"] = base64.b64encode(b"x" * 32).decode("ascii")
provider = _provider(public_key)
result = await provider.verify_inbound_signature(
"https://example.test/api/v1/telephony/inbound/run",
json.loads(body),
headers,
body,
)
assert result is False
@pytest.mark.asyncio
async def test_telnyx_events_route_rejects_invalid_signature_with_401():
body = _body()
public_key, headers = _signed_headers(body)
provider = _provider(public_key)
with (
patch("api.services.telephony.providers.telnyx.routes.db_client") as db_client,
patch(
"api.services.telephony.providers.telnyx.routes.get_telephony_provider_for_run",
new_callable=AsyncMock,
return_value=provider,
),
patch(
"api.services.telephony.providers.telnyx.routes._process_status_update",
new_callable=AsyncMock,
) as process_status,
patch.object(
provider,
"verify_inbound_signature",
new_callable=AsyncMock,
return_value=False,
),
):
db_client.get_workflow_run_by_id = AsyncMock(
return_value=SimpleNamespace(workflow_id=7)
)
db_client.get_workflow_by_id = AsyncMock(
return_value=SimpleNamespace(organization_id=11)
)
with pytest.raises(HTTPException) as exc_info:
await handle_telnyx_events(_request(body, headers), workflow_run_id=123)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Invalid webhook signature"
process_status.assert_not_awaited()
@pytest.mark.asyncio
async def test_telnyx_events_route_rejects_invalid_utf8_body_with_400():
invalid_body = b"\xff\xfe\xfd"
async def receive():
return {"type": "http.request", "body": invalid_body, "more_body": False}
request = Request(
{
"type": "http",
"method": "POST",
"path": "/api/v1/telephony/telnyx/events/123",
"headers": [],
},
receive,
)
with pytest.raises(HTTPException) as exc_info:
await handle_telnyx_events(request, workflow_run_id=123)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "Webhook body is not valid UTF-8"