From daa123832e75e20adb7b8226666a2e43c32e755f Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 29 May 2026 10:18:28 +0530 Subject: [PATCH] feat(gateway): add WhatsApp Cloud adapter --- .../app/gateway/whatsapp/__init__.py | 1 + .../app/gateway/whatsapp/adapter_cloud.py | 149 ++++++++++++++++++ .../app/gateway/whatsapp/client_cloud.py | 99 ++++++++++++ .../app/gateway/whatsapp/credentials.py | 31 ++++ 4 files changed, 280 insertions(+) create mode 100644 surfsense_backend/app/gateway/whatsapp/__init__.py create mode 100644 surfsense_backend/app/gateway/whatsapp/adapter_cloud.py create mode 100644 surfsense_backend/app/gateway/whatsapp/client_cloud.py create mode 100644 surfsense_backend/app/gateway/whatsapp/credentials.py diff --git a/surfsense_backend/app/gateway/whatsapp/__init__.py b/surfsense_backend/app/gateway/whatsapp/__init__.py new file mode 100644 index 000000000..5c54d2caf --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/__init__.py @@ -0,0 +1 @@ +"""WhatsApp gateway implementations.""" diff --git a/surfsense_backend/app/gateway/whatsapp/adapter_cloud.py b/surfsense_backend/app/gateway/whatsapp/adapter_cloud.py new file mode 100644 index 000000000..f247db692 --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/adapter_cloud.py @@ -0,0 +1,149 @@ +"""WhatsApp Cloud API platform adapter.""" + +from __future__ import annotations + +from typing import Any + +from app.gateway.base.adapter import ( + BasePlatformAdapter, + ParsedInboundEvent, + PlatformSendResult, +) +from app.gateway.whatsapp.client_cloud import WhatsAppCloudClient +from app.gateway.whatsapp.credentials import WhatsAppCredentials + + +class WhatsAppCloudAdapter(BasePlatformAdapter): + platform = "whatsapp" + + def __init__(self, credentials: WhatsAppCredentials) -> None: + self.credentials = credentials + self.client = WhatsAppCloudClient( + business_token=credentials["business_token"], + phone_number_id=credentials["phone_number_id"], + api_version=credentials.get("api_version"), + ) + + def parse_inbound(self, raw_payload: dict[str, Any]) -> ParsedInboundEvent: + message = _first_message(raw_payload) + if message is None: + return ParsedInboundEvent( + platform=self.platform, + event_kind="other", + external_peer_id=None, + external_peer_kind="unknown", + external_message_id=None, + external_user_id=None, + text=None, + raw_payload=raw_payload, + ) + + contact = _first_contact(raw_payload, message.get("from")) + text = _message_text(message) + wa_id = str(message.get("from") or "") + return ParsedInboundEvent( + platform=self.platform, + event_kind=str(message.get("type") or "message"), + external_peer_id=wa_id or None, + external_peer_kind="direct", + external_message_id=str(message.get("id")) if message.get("id") else None, + external_user_id=wa_id or None, + text=text, + raw_payload=raw_payload, + display_name=(contact.get("profile") or {}).get("name"), + username=None, + metadata={ + "phone_number_id": _metadata(raw_payload).get("phone_number_id"), + "display_phone_number": _metadata(raw_payload).get("display_phone_number"), + "timestamp": message.get("timestamp"), + "message_type": message.get("type"), + }, + ) + + async def send_message( + self, + *, + external_peer_id: str, + text: str, + parse_mode: str | None = None, + reply_to_message_id: str | None = None, + ) -> PlatformSendResult: + return await self.client.send_text( + to=external_peer_id, + text=text, + reply_to_message_id=reply_to_message_id, + ) + + async def edit_message( + self, + *, + external_peer_id: str, + external_message_id: str, + text: str, + parse_mode: str | None = None, + ) -> PlatformSendResult: + raise NotImplementedError("WhatsApp Cloud API does not support message edits") + + async def send_typing_indicator(self, *, inbound_message_id: str) -> None: + await self.client.send_typing_indicator(message_id=inbound_message_id) + + async def validate_credentials(self) -> dict[str, Any]: + return await self.client.validate() + + +def _changes(raw_payload: dict[str, Any]) -> list[dict[str, Any]]: + changes: list[dict[str, Any]] = [] + for entry in raw_payload.get("entry") or []: + if isinstance(entry, dict): + changes.extend( + change for change in (entry.get("changes") or []) if isinstance(change, dict) + ) + return changes + + +def _first_message(raw_payload: dict[str, Any]) -> dict[str, Any] | None: + for change in _changes(raw_payload): + value = change.get("value") or {} + messages = value.get("messages") or [] + if messages and isinstance(messages[0], dict): + return messages[0] + if "message" in raw_payload and isinstance(raw_payload["message"], dict): + return raw_payload["message"] + return None + + +def _first_contact( + raw_payload: dict[str, Any], + wa_id: object, +) -> dict[str, Any]: + for change in _changes(raw_payload): + value = change.get("value") or {} + for contact in value.get("contacts") or []: + if isinstance(contact, dict) and ( + wa_id is None or str(contact.get("wa_id")) == str(wa_id) + ): + return contact + return {} + + +def _metadata(raw_payload: dict[str, Any]) -> dict[str, Any]: + for change in _changes(raw_payload): + value = change.get("value") or {} + metadata = value.get("metadata") + if isinstance(metadata, dict): + return metadata + return {} + + +def _message_text(message: dict[str, Any]) -> str | None: + message_type = message.get("type") + if message_type == "text": + return (message.get("text") or {}).get("body") + if message_type == "button": + return (message.get("button") or {}).get("text") + if message_type == "interactive": + interactive = message.get("interactive") or {} + button_reply = interactive.get("button_reply") or {} + list_reply = interactive.get("list_reply") or {} + return button_reply.get("title") or list_reply.get("title") + return None diff --git a/surfsense_backend/app/gateway/whatsapp/client_cloud.py b/surfsense_backend/app/gateway/whatsapp/client_cloud.py new file mode 100644 index 000000000..e39e022aa --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/client_cloud.py @@ -0,0 +1,99 @@ +"""Small httpx wrapper for the WhatsApp Cloud API.""" + +from __future__ import annotations + +from typing import Any + +import httpx + +from app.config import config +from app.gateway.base.adapter import PlatformSendResult +from app.gateway.ratelimit import wait_for_token +from app.observability.metrics import record_gateway_rate_limit_hit + + +class WhatsAppCloudClient: + def __init__( + self, + *, + business_token: str, + phone_number_id: str, + api_version: str | None = None, + ) -> None: + self.business_token = business_token + self.phone_number_id = phone_number_id + self.api_version = api_version or config.WHATSAPP_GRAPH_API_VERSION + self.base_url = f"https://graph.facebook.com/{self.api_version}" + + async def send_text( + self, + *, + to: str, + text: str, + reply_to_message_id: str | None = None, + ) -> PlatformSendResult: + payload: dict[str, Any] = { + "messaging_product": "whatsapp", + "recipient_type": "individual", + "to": to, + "type": "text", + "text": {"preview_url": True, "body": text}, + } + if reply_to_message_id: + payload["context"] = {"message_id": reply_to_message_id} + data = await self._post(f"/{self.phone_number_id}/messages", json=payload) + message_id = str((data.get("messages") or [{}])[0].get("id") or "") + return PlatformSendResult(external_message_id=message_id, raw_response=data) + + async def send_typing_indicator(self, *, message_id: str) -> dict[str, Any]: + payload = { + "messaging_product": "whatsapp", + "status": "read", + "message_id": message_id, + "typing_indicator": {"type": "text"}, + } + return await self._post(f"/{self.phone_number_id}/messages", json=payload) + + async def validate(self) -> dict[str, Any]: + return await self._get( + f"/{self.phone_number_id}", + params={ + "fields": "verified_name,quality_rating,account_review_status,display_phone_number" + }, + ) + + async def _post(self, path: str, *, json: dict[str, Any]) -> dict[str, Any]: + await self._throttle() + async with httpx.AsyncClient(timeout=20) as client: + response = await client.post( + f"{self.base_url}{path}", + headers={"Authorization": f"Bearer {self.business_token}"}, + json=json, + ) + response.raise_for_status() + return response.json() + + async def _get( + self, + path: str, + *, + params: dict[str, Any] | None = None, + ) -> dict[str, Any]: + await self._throttle() + async with httpx.AsyncClient(timeout=20) as client: + response = await client.get( + f"{self.base_url}{path}", + headers={"Authorization": f"Bearer {self.business_token}"}, + params=params, + ) + response.raise_for_status() + return response.json() + + async def _throttle(self) -> None: + wait_ms = await wait_for_token( + f"wa:phone:{self.phone_number_id}", + capacity=10, + refill_per_sec=10.0, + ) + if wait_ms: + record_gateway_rate_limit_hit(bucket="wa:phone") diff --git a/surfsense_backend/app/gateway/whatsapp/credentials.py b/surfsense_backend/app/gateway/whatsapp/credentials.py new file mode 100644 index 000000000..fba79d470 --- /dev/null +++ b/surfsense_backend/app/gateway/whatsapp/credentials.py @@ -0,0 +1,31 @@ +"""Credential helpers for WhatsApp gateway accounts.""" + +from __future__ import annotations + +from typing import TypedDict + +from app.config import config + + +class WhatsAppCredentials(TypedDict, total=False): + business_token: str + waba_id: str + phone_number_id: str + business_id: str + registration_pin: str + api_version: str + + +def load_system_whatsapp_credentials() -> WhatsAppCredentials: + if not ( + config.WHATSAPP_SHARED_BUSINESS_TOKEN + and config.WHATSAPP_SHARED_PHONE_NUMBER_ID + ): + raise RuntimeError("whatsapp_system_credentials_not_configured") + + return { + "business_token": config.WHATSAPP_SHARED_BUSINESS_TOKEN, + "phone_number_id": config.WHATSAPP_SHARED_PHONE_NUMBER_ID, + "waba_id": config.WHATSAPP_SHARED_WABA_ID, + "api_version": config.WHATSAPP_GRAPH_API_VERSION, + }