feat(gateway): add WhatsApp Cloud webhook intake

This commit is contained in:
Anish Sarkar 2026-05-29 10:18:58 +05:30
parent 3d9620275b
commit dbd9966219

View file

@ -0,0 +1,205 @@
"""WhatsApp Cloud API webhook routes."""
from __future__ import annotations
import hashlib
import hmac
import json
import logging
import uuid
from datetime import UTC, datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import Response
from app.config import config
from app.db import (
ExternalChatHealthStatus,
ExternalChatPlatform,
get_async_session,
)
from app.gateway.accounts import get_or_create_system_whatsapp_account
from app.gateway.inbox import persist_inbound_event
from app.observability.metrics import (
record_gateway_inbox_write,
record_gateway_outbound,
record_gateway_webhook_parse_error,
)
router = APIRouter(prefix="/gateway/webhooks/whatsapp", tags=["gateway"])
logger = logging.getLogger(__name__)
def _ensure_whatsapp_enabled() -> None:
if config.GATEWAY_WHATSAPP_INTAKE_MODE == "disabled":
raise HTTPException(status_code=404, detail="WhatsApp gateway is disabled")
@router.get("")
async def verify_whatsapp_webhook(
hub_mode: str = Query(alias="hub.mode"),
hub_verify_token: str = Query(alias="hub.verify_token"),
hub_challenge: str = Query(alias="hub.challenge"),
) -> Response:
_ensure_whatsapp_enabled()
if (
hub_mode == "subscribe"
and config.WHATSAPP_WEBHOOK_VERIFY_TOKEN
and hmac.compare_digest(hub_verify_token, config.WHATSAPP_WEBHOOK_VERIFY_TOKEN)
):
return Response(content=hub_challenge, media_type="text/plain")
raise HTTPException(status_code=403, detail="Invalid WhatsApp webhook token")
@router.post("")
async def whatsapp_webhook(
request: Request,
session: AsyncSession = Depends(get_async_session),
) -> Response:
_ensure_whatsapp_enabled()
raw_body = await request.body()
_verify_signature(raw_body, request.headers.get("X-Hub-Signature-256"))
try:
payload = json.loads(raw_body)
except ValueError:
record_gateway_webhook_parse_error()
return Response(status_code=200)
try:
await _process_payload(session, payload)
await session.commit()
except Exception:
await session.rollback()
logger.exception("WhatsApp webhook processing failed")
return Response(status_code=200)
return Response(status_code=200)
def _verify_signature(raw_body: bytes, header_signature: str | None) -> None:
if not config.WHATSAPP_WEBHOOK_APP_SECRET:
raise HTTPException(status_code=500, detail="WhatsApp app secret is not configured")
received = (header_signature or "").removeprefix("sha256=")
expected = hmac.new(
config.WHATSAPP_WEBHOOK_APP_SECRET.encode(),
raw_body,
hashlib.sha256,
).hexdigest()
if not received or not hmac.compare_digest(received, expected):
raise HTTPException(status_code=403, detail="Invalid WhatsApp webhook signature")
async def _process_payload(session: AsyncSession, payload: dict[str, Any]) -> None:
for entry in payload.get("entry") or []:
if not isinstance(entry, dict):
continue
for change in entry.get("changes") or []:
if not isinstance(change, dict):
continue
field = change.get("field")
value = change.get("value") or {}
if field == "messages":
await _process_messages_change(session, payload, entry, change, value)
elif field == "account_update":
await _handle_account_update(session, entry, value)
elif field == "phone_number_quality_update":
await _handle_phone_number_quality_update(session, entry, value)
async def _process_messages_change(
session: AsyncSession,
payload: dict[str, Any],
entry: dict[str, Any],
change: dict[str, Any],
value: dict[str, Any],
) -> None:
statuses = [status for status in value.get("statuses") or [] if isinstance(status, dict)]
for status in statuses:
record_gateway_outbound(
platform="whatsapp",
kind="status",
status=str(status.get("status") or "unknown"),
)
messages = [msg for msg in value.get("messages") or [] if isinstance(msg, dict)]
if not messages:
return
account = await get_or_create_system_whatsapp_account(session)
metadata = value.get("metadata") or {}
if isinstance(metadata, dict):
cursor_state = dict(account.cursor_state or {})
for key in ("phone_number_id", "display_phone_number"):
if metadata.get(key):
cursor_state[key] = metadata[key]
account.cursor_state = cursor_state
for msg in messages:
message_id = str(msg.get("id") or "")
if not message_id:
continue
request_id = f"gateway_{uuid.uuid4().hex[:16]}"
inbox_id = await persist_inbound_event(
session,
account_id=account.id,
platform=ExternalChatPlatform.WHATSAPP,
event_dedupe_key=f"wamid:{message_id}",
external_event_id=message_id,
external_message_id=message_id,
event_kind="message",
raw_payload=_single_message_payload(payload, entry, change, msg),
request_id=request_id,
)
record_gateway_inbox_write(platform="whatsapp", dedup_skipped=inbox_id is None)
async def _handle_account_update(
session: AsyncSession,
entry: dict[str, Any],
value: dict[str, Any],
) -> None:
account = await get_or_create_system_whatsapp_account(session)
cursor_state = dict(account.cursor_state or {})
if entry.get("id"):
cursor_state["waba_id"] = str(entry.get("id"))
cursor_state["account_update"] = value
account.cursor_state = cursor_state
event = str(value.get("event") or value.get("type") or "").upper()
if event in {"DISABLED_UPDATE", "ACCOUNT_RESTRICTION", "PARTNER_REMOVED"}:
account.health_status = ExternalChatHealthStatus.FAILING
account.suspended_at = datetime.now(UTC)
account.suspended_reason = event.lower()
elif event in {"VERIFIED_ACCOUNT", "ACCOUNT_ENABLED", "REINSTATED"}:
account.health_status = ExternalChatHealthStatus.OK
account.suspended_at = None
account.suspended_reason = None
account.last_health_check_at = datetime.now(UTC)
async def _handle_phone_number_quality_update(
session: AsyncSession,
entry: dict[str, Any],
value: dict[str, Any],
) -> None:
account = await get_or_create_system_whatsapp_account(session)
cursor_state = dict(account.cursor_state or {})
if entry.get("id"):
cursor_state["waba_id"] = str(entry.get("id"))
cursor_state["quality_update"] = value
account.cursor_state = cursor_state
account.last_health_check_at = datetime.now(UTC)
def _single_message_payload(
payload: dict[str, Any],
entry: dict[str, Any],
change: dict[str, Any],
message: dict[str, Any],
) -> dict[str, Any]:
value = dict(change.get("value") or {})
value["messages"] = [message]
value.pop("statuses", None)
single_change = {**change, "value": value}
single_entry = {**entry, "changes": [single_change]}
return {"object": payload.get("object"), "entry": [single_entry]}